shc
shc copied to clipboard
HBase behavior during joins with shc
In the documentation, it's mentioned that where
clauses, if using the row key vs. a regular column, will avoid a full scan. My question is what is the behavior when two HBase tables are joined together in a dataframe? Is there a way to avoid a full scan in that situation?
I ask because I can see a large shuffle phase (over 4 TB) which suggests full scans are going on vs. using the rowkeys.
Some specifics:
- table_a is an HBase table that has a rowkey which is a sha1 -- we'll call this sha1_1
- table_b is an HBase table that has a rowkey which is a string that is comprised of two different types of sha1s, e.g. so the row key is <sha1_2>-<sha1_3>
- table_c is from Postgres, and I want to join to it using sha1_1 from table_a
Each table is in a DataFrame, and a view has been created on it. The join then looks something like this:
select *
from table_c as tc
join table_b as tb on (tc.sha1_2 = ta.sha1_2)
join table_a as ta on (tb.sha1_1 = ta.rowkey)
So, in the first join, I essentially want to do an HBase prefix scan using sha1_2, so it shouldn't need to do a sequential scan. I'm using an explicit column at the moment for equality, but I could specify a regex against the rowkey conceptually.
In the second join, I want to use the rowkey directly.
The catalog for both tables defines the rowkey as a string which is a sha1 for table_a and a combination of <sha1_2>-<sha1_3) for table_b.
Is there a way to do this? Or will joining always force a sequential scan?
My application is such that I only am looking at a portion of table_b (which I am filtering out before hand when constructing that data frame), so when joining to table_c, I would really like to ensure that a sequential scan is avoided.
The join of table_c to table_b is probably less consequential, since I'm pre-filtering it, but I am still curious as to how scan behavior works in terms of join
s.
Thanks, Ken
+1, I'm also interested in the behavior here, and if an optimization is possible here. Analyzing the query with df.debug seems to suggest it's not doing any pushed filters other than IsNotNull:
PushedFilters: [IsNotNull(rowkey)]
When a query comes in, Spark driver will complies it, then SHC can get the required columns
and filters
from Spark catalyst engine. Spark will push down those filters
to HBase with SHC, then HBase will do the filtering for us, and records not matching the query condition are filtered out. Hbase only returns the required columns
back to Spark.
When doing join table_b as tb on (tc.sha1_2 = ta.sha1_2)
, there is only one filter [IsNotNull] which shc gets from Spark catalyst engine, so for this case, there will be a fully scan in Hbase side. Hbase return all rows in Hbase table to Spark, then Spark will do join
.
Note: But this kind of WHERE
clauses are converted
to filters like GreaterThan
and LessThan
by Spark: WHERE column > x and column < y
and WHERE column = x.
More examples: catalog definition is here Case 1:
val df1 = withCatalog(catalog) // read data from hbase
val df2 = withCatalog(catalog) // read data from hbase
val ret = df1.join(df2, Seq("col0")) // join
ret.explain()
println(ret.count())
For case 1, the required column returned by HBase back to Spark are [col0#41,col1#42,col2#43,col3#44,col4#45,col5#46L,col6#47,col7#48,col8#49]
. PushedFilters
is [IsNotNull(col0)]
.
Case 2:
val df1 = withCatalog(catalog) // read data from hbase
val df2 = withCatalog(catalog) // read data from hbase
val s1 = df1.filter($"col0" <= "row010" && $"col0" > "row005") //doing some filtering
val s2 = df2.filter($"col0" <= "row020" && $"col0" > "row001") //doing some filtering
val ret = s1.join(s2, Seq("col0")).select("col0")
ret.explain()
println(ret.count())
For case 2, the only required column returned by HBase back to Spark is "col0
". PushedFilters
are [GreaterThan(col0,row001), IsNotNull(col0), GreaterThan(col0,row005), LessThanOrEqual(col0,row010..., ..
and [GreaterThan(col0,row005), GreaterThan(col0,row001), LessThanOrEqual(col0,row010), IsNotNull(col0..., ..
OK. So if I understand correctly, it's a Spark limitation that shc can't optimize for the join
case by using a get/bulk-scan instead of a sequential one? e.g. Spark would need to push down more filters for the join
case in order for shc to be able to take advantage of that?
It seems like this is a very common use case -- joining on a primary key (or part of a primary key) -- so it is one that I would be very interested in seeing implemented. (Imagine Postgres, for example, with no index scans on joins.)
Thanks.
Is there any more more information on this? i agree with @khampson this functionally seems like a important use case. it would be nice to be able to pass an rdd of keys to a bulk get operation, rather then scanning the whole hbase table.
@weiqingy I attended your one of the session https://www.youtube.com/watch?v=MDWgPK6XfEo and expecting push down of JOIN query on data source (HBase), Is it implemented in SHC? I am seeing no push down of JOIN filter on HBase table. I've enabled all CBO properties in Spark but no luck.
spark.sql.cbo.enabled=true spark.sql.cbo.joinReorder.enabled=true spark.sql.cbo.joinReorder.dp.star.filter=true spark.sql.cbo.starSchemaDetection=true spark.sql.crossJoin.enabled=true spark.sql.optimizer.metadataOnly=true
Please note I am using latest version (v1.1.1-2.1) of SHC in my testing.