[SPARK-52187][SQL] Introduce Join pushdown for DSv2
What changes were proposed in this pull request?
With this PR I am introducing the Join pushdown interface for DSv2 connectors and it's implementation for JDBC connectors.
The interface itself, SupportsPushDownJoin has the following API:
public interface SupportsPushDownJoin extends ScanBuilder {
boolean isRightSideCompatibleForJoin(SupportsPushDownJoin other);
boolean pushJoin(
SupportsPushDownJoin other,
JoinType joinType,
Optional<Predicate> condition,
StructType leftRequiredSchema,
StructType rightRequiredSchema
);
StructType getOutputSchema();
}
If isRightSideCompatibleForJoin is true, then the join will be tried to be pushed down (it can still fail though).
getOutputSchema returns the new schema of the ScanBuilder after the join has been pushed down.
With this implementation, only Inner joins are supported. Left and Right joins should be added as well. Cross joins won't be supported since they can increase the amount of data that is being read.
Also, none of the dialects currently supports the join push down. It is only available for H2 dialect. The join push down capability is guarded by SQLConf spark.sql.optimizer.datasourceV2JoinPushdown, JDBC option pushDownJoin and JDBC dialect method supportsJoin.
For the following JDBC query:
SELECT
p.cc_call_center_id,
q.cc_call_center_id,
q.cc_city
FROM
call_center p
JOIN
call_center q
ON p.cc_call_center_id = q.cc_call_center_id
the generated SQL query on spark side would be:
SELECT
"subquery_176_col_0",
"subquery_176_col_1",
"subquery_176_col_2" FROM (
SELECT
"subquery_174"."cc_call_center_id" AS "subquery_176_col_0",
"subquery_175"."cc_call_center_id" AS "subquery_176_col_1",
"subquery_175"."cc_city" AS "subquery_176_col_2"
FROM (
SELECT "cc_call_center_sk", ... FROM "CALL_CENTER" WHERE ("cc_call_center_id" IS NOT NULL)
)"subquery_174"
INNER JOIN (
SELECT "cc_call_center_sk", ... FROM "CALL_CENTER" WHERE ("cc_call_center_id" IS NOT NULL)
) "subquery_175"
ON "subquery_174"."cc_call_center_id" = "subquery_175"."cc_call_center_id"
) SPARK_GEN_SUBQ_114
Why are the changes needed?
DSv2 connectors can't push down the join operator.
Does this PR introduce any user-facing change?
This PR itself no since the behaviour is not implemented for any of the connectors (besides H2 which is testing JDBC dialect).
How was this patch tested?
New tests and some local testing with TPCDS queries.
Was this patch authored or co-authored using generative AI tooling?
@cloud-fan Shall we support join pushdown for DSV2 ?
There is a linter failure
Checkstyle checks failed at following occurrences:
Error: src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java:[50] (sizes) LineLength: Line is longer than 100 characters (found 108).
Error: src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java:[51] (sizes) LineLength: Line is longer than 100 characters (found 110).
@cloud-fan I will fix it. By the way, I think we should wait for a review from @agubichev. I suggest not merging this yet.
@cloud-fan can we merge?
thanks, merging to master!
Can we enable ANSI in this suite? https://github.com/apache/spark/actions/runs/16406624611/job/46353606361 Seems it fails with non-ansi build.