datafusion-comet
datafusion-comet copied to clipboard
feat: Support sort merge join with a join condition
Which issue does this PR close?
Closes #398.
Rationale for this change
What changes are included in this PR?
How are these changes tested?
Some tests doesn't pass, its good to check them in DF, SMJ joined filter is still in progress
Yea, that's why I marked this as a draft PR now.
just checked DF on one of the failed queries
#[tokio::test]
async fn test_smj_left_filtered() -> Result<()> {
let ctx: SessionContext = SessionContext::new();
let sql = "set datafusion.optimizer.prefer_hash_join = false;";
let _ = ctx.sql(sql).await?.collect().await?;
let sql = "set datafusion.execution.batch_size = 100";
let _ = ctx.sql(sql).await?.collect().await?;
let sql = "
select * from (
with t as (
select id, id % 5 id1 from (select unnest(range(0,10)) id)
), t1 as (
select id % 10 id, id + 2 id1 from (select unnest(range(0,10)) id)
)
select * from t left join t1 on t.id1 = t1.id and t.id > t1.id1
) order by 1, 2, 3, 4
";
let actual = ctx.sql(sql).await?.collect().await?;
let expected: Vec<&str> = vec![
"+----+-----+----+-----+",
"| id | id1 | id | id1 |",
"+----+-----+----+-----+",
"| 0 | 0 | | |",
"| 1 | 1 | | |",
"| 2 | 2 | | |",
"| 3 | 3 | | |",
"| 4 | 4 | | |",
"| 5 | 0 | 0 | 2 |",
"| 6 | 1 | 1 | 3 |",
"| 7 | 2 | 2 | 4 |",
"| 8 | 3 | 3 | 5 |",
"| 9 | 4 | 4 | 6 |",
"+----+-----+----+-----+",
];
datafusion_common::assert_batches_eq!(expected, &actual);
Ok(())
}
it passes
looks like Comet produces duplicates
Right join fails in DF
#[tokio::test]
async fn test_smj_left_filtered() -> Result<()> {
let ctx: SessionContext = SessionContext::new();
let sql = "set datafusion.optimizer.prefer_hash_join = false;";
let _ = ctx.sql(sql).await?.collect().await?;
let sql = "set datafusion.execution.batch_size = 100";
let _ = ctx.sql(sql).await?.collect().await?;
let sql = "
select * from (
with t as (
select id, id % 5 id1 from (select unnest(range(0,10)) id)
), t1 as (
select id % 10 id, id + 2 id1 from (select unnest(range(0,10)) id)
)
select * from t right join t1 on t.id1 = t1.id and t.id > t1.id1
) order by 1, 2, 3, 4
";
let actual = ctx.sql(sql).await?.collect().await?;
let expected: Vec<&str> = vec![
"+----+-----+----+-----+",
"| id | id1 | id | id1 |",
"+----+-----+----+-----+",
"| 5 | 0 | 0 | 2 |",
"| 6 | 1 | 1 | 3 |",
"| 7 | 2 | 2 | 4 |",
"| 8 | 3 | 3 | 5 |",
"| 9 | 4 | 4 | 6 |",
"| | | 5 | 7 |",
"| | | 6 | 8 |",
"| | | 7 | 9 |",
"| | | 8 | 10 |",
"| | | 9 | 11 |",
"+----+-----+----+-----+",
];
datafusion_common::assert_batches_eq!(expected, &actual);
Ok(())
}
Filed https://github.com/apache/datafusion/issues/10882
Codecov Report
Attention: Patch coverage is 36.36364% with 7 lines in your changes missing coverage. Please review.
Project coverage is 34.20%. Comparing base (
9d8730d) to head (b9cd9da). Report is 6 commits behind head on main.
| Files | Patch % | Lines |
|---|---|---|
| .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 36.36% | 5 Missing and 2 partials :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## main #553 +/- ##
=============================================
- Coverage 55.16% 34.20% -20.96%
- Complexity 857 888 +31
=============================================
Files 109 112 +3
Lines 10542 43071 +32529
Branches 2010 9509 +7499
=============================================
+ Hits 5815 14733 +8918
- Misses 3714 25352 +21638
- Partials 1013 2986 +1973
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
All tests are passed now.
cc @andygrove @huaxingao
I will have another look today
Please take a look (again). Thanks.
@andygrove @huaxingao @comphead
Thanks @comphead
I need to update plan stability results...