feat: add operators to support duplicate eliminated joins
This PR implements the DuplicateEliminatedGetRel and the DuplicateEliminatedJoinRel. Both relations are necessary to support duplicate eliminated joins, which is a join type necessary for unnesting arbitrary subqueries.
They are introduced in-depth in the unnesting arbitrary subqueries paper.
I also have a POC PR for the DuckDB substrait repo, which already appropriately round-trips queries with the definitions proposed here.
The main question I have is if it is more desirable to have the DuplicateEliminatedJoinRel as a separate relation or if it attributes should be merged into joinrel.
For clarity/reference, in DuckDB the Duplicate Eliminated Join, is literally the LogicalComparisonJoin with a LogicalOperatorType::LOGICAL_DELIM_JOIN.
The possible join types for the logical operator type are:
LOGICAL_JOIN = 50,
LOGICAL_DELIM_JOIN = 51,
LOGICAL_COMPARISON_JOIN = 52,
LOGICAL_ANY_JOIN = 53,
LOGICAL_CROSS_PRODUCT = 54,
LOGICAL_POSITIONAL_JOIN = 55,
LOGICAL_ASOF_JOIN = 56,
LOGICAL_DEPENDENT_JOIN = 57
The comparison Join only uses the following types:
LOGICAL_DELIM_JOIN = 51,
LOGICAL_COMPARISON_JOIN = 52,
LOGICAL_ASOF_JOIN = 56,
LOGICAL_DEPENDENT_JOIN = 57
Hence, a different possibility would be to add an enum to joinrel with only LOGICAL_DELIM_JOIN and LOGICAL_COMPARISON_JOIN for now.
ACTION NEEDED
Substrait follows the Conventional Commits specification for release automation.
The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.
First, thanks for creating this PR. I appreciate the time and effort to describe these concepts. I'm struggling to understand the linked paper but I've spent an hour on it and have to move on so I'll do my best at paraphrasing my understanding and hope you can help me. I suspect there is a good chance I am completely misunderstanding.
Does this sound correct (I'm not asking you to describe it this way, I'm just checking my understanding, by hopefully rephrasing the definition in a much more mechanical way):
The duplicate eliminated join operator wraps a normal hash equijoin operator and has two outputs. The first output is the regular join output. The second output is the keys of the hashtable created on the build side.
Yes, that sounds about right, you could think that the hashtable result is being pushed into the Get Duplicate Eliminated operator, and the join will produce a result as a regular join, that will be pushed to it's parent.
In addition, this operator must consume the build side (and create the hashtable) first (before consuming the probe side) because the hashtable output will be used to generate the probe side input.
Yes! The probe side contains the Duplicate Eliminated Get which depends on the consumption of the build side of the Duplicate Eliminated Join.
The duplicate eliminated get operator takes a duplicate eliminated join operator as input. The output of this operator is the hashtable from the duplicate eliminated join operator.
It's not only the hashtable, but rather the deduplicated side of the Duplicate Eliminated Join.
@westonpace, Many thanks for taking the time to review the PR. Unnesting arbitrary subqueries is a difficult topic in itself, and unfortunately, the available material on the subject is somewhat lacking. However, the original paper and Mark's presentations are by far the two best resources for understanding it.
Please let me know if I answered your questions and, if so, whether you would like any rephrasing of the text.
@pdet I watched the video, it is very helpful. I think I'm close to understanding what is going on now.
So, it's not the hastable but actually the deduplicated side. You can see that the Duplicate Eliminated Get has a repeated Expression.FieldReference column_ids = 3; representing the deduplicated columns.
I understand that key columns are deduplicated during a hash join. I'm not sure I understand how non-key columns are necessarily deduplicated. Is the duplicate eliminating join doing more work to satisfy these non-key columns? For example, when collecting some payload column P does the duplicate eliminating join store P in the hashtable as normal payload and then ALSO store P in a hashset that it can later return to duplicate eliminated get operator?
@pdet I watched the video, it is very helpful. I think I'm close to understanding what is going on now.
So, it's not the hastable but actually the deduplicated side. You can see that the Duplicate Eliminated Get has a repeated Expression.FieldReference column_ids = 3; representing the deduplicated columns.
I understand that key columns are deduplicated during a hash join. I'm not sure I understand how non-key columns are necessarily deduplicated. Is the duplicate eliminating join doing more work to satisfy these non-key columns? For example, when collecting some payload column P does the duplicate eliminating join store P in the hashtable as normal payload and then ALSO store P in a hashset that it can later return to duplicate eliminated get operator?
Only the join-keys are deduplicated. There is not a scenario where the non-key columns are deduplicated.
For reference, in plan_delim_join.cpp you can see that vector<unique_ptr<Expression>> distinct_expressions; is created but never populated. The groups are then passed to the PhysicalHashAggregate.
Only the join-keys are deduplicated. There is not a scenario where the non-key columns are deduplicated.
Got it, I understand the operator now.
The main question I have is if it is more desirable to have the DuplicateEliminatedJoinRel as a separate relation or if it attributes should be merged into joinrel.
For my opinion, I like this how you have it (as a separate relation) because I don't think this is a "standard logical operator" but more a "physical operator". I think there will be high level query engines / planners / producers that never interact with or support this join type.
This is the fist example, so far, of a relation having multiple outputs. So I'd be interested to hear others opinions on the topic.
I can think of several other ways to tackle the issue (e.g. don't require duplicate eliminated get to use a reference rel, create some "output numbering" scheme for multiple outputs so duplicate eliminated get rel isn't needed at all, etc. etc.) That being said, for a new physical operator, I don't want to hold up the PR while we have esoteric discussions about what multiple outputs should look like.
My primary concern at the moment is that the wording is a little confusing. I think we can focus more on the "what" and less on the "why". I'll make a new review with this emphasis.
@westonpace I've removed the column references from the Duplicate Eliminated Get and also updated the POC in https://github.com/duckdb/substrait/pull/91 to verify it still works.
Regarding the Duplicate Eliminated Side from the Duplicate Eliminated Join, I checked with @mytherin, and that is necessary because the deduplicated side can be either the build or the probe side, so it's not possible to infer it from the build side of the join type. I've also adjusted the text in the physical_relations.md to reflect that.
Thanks again for all the time you invested reviewing the PR and your suggestions; I highly appreciate them!
I haven't yet reviewed this and would like some time to do so.
Can you provide an example plan with these operators as you see them used. I'm struggling with seeing these two relations as set relations but want to make sure I see them used in context first. Thanks!
@jacques-n Here's a plan along with the steps I used to get DuckDB to spit it out:
Will take a look @EpsilonPrime .
Do we have a substrait example plan?
I'm struggling with seeing these two relations as set relations
They are not. They are physical optimizations. In the same way a hash equi-join doesn't really look like a set relation.
The logical set operation is "remove all duplicates". X columns go in. X columns go out. Every row of duplicates is removed. You can represent this logically in at least two ways (a set relation with SET_OP_UNION_DISTINCT which has two inputs and they are the same or a self join).
The optimization is just reusing the hashtable from an existing hash-join that already has the columns you want to DISTINCTify in the key side (basically the same idea as a self-join).
I checked with @Mytherin, and that is necessary because the deduplicated side can be either the build or the probe side, so it's not possible to infer it from the build side of the join type. I've also adjusted the text in the physical_relations.md to reflect that.
@pdet how should this be implemented if the deduplicated side is the probe side? How does the hash join deduplicate the probe side?
Will take a look @EpsilonPrime .
Do we have a substrait example plan?
The substrait that DuckDB would generate for the TPC-H Q02 that @EpsilonPrime shared would be the following: https://gist.github.com/pdet/e8fe07a495b057653481f4191bf9de3d
I checked with @Mytherin, and that is necessary because the deduplicated side can be either the build or the probe side, so it's not possible to infer it from the build side of the join type. I've also adjusted the text in the physical_relations.md to reflect that.
@pdet how should this be implemented if the deduplicated side is the probe side? How does the hash join deduplicate the probe side?
In practice a hash aggregate is created regardless of the deduplicated side being the probe or the build side.
The Duplicate Eliminated Side goes into a hash aggregation, and gets deduplicated. The same data also goes into the join hashtable/probe side, where it is not deduplicated. Essentially, the Duplicate Eliminated Side goes into two places in this operator.
As shown below, the input chunk goes into two child operators of the Duplicate Eliminated Join.
SinkResultType PhysicalRightDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk,
OperatorSinkInput &input) const {
auto &lstate = input.local_state.Cast<RightDelimJoinLocalState>();
OperatorSinkInput join_sink_input {*join->sink_state, *lstate.join_state, input.interrupt_state};
join->Sink(context, chunk, join_sink_input);
OperatorSinkInput distinct_sink_input {*distinct->sink_state, *lstate.distinct_state, input.interrupt_state};
distinct->Sink(context, chunk, distinct_sink_input);
return SinkResultType::NEED_MORE_INPUT;
}
Similarly, when the deduplicated side is the probe side, it goes into a distinct, but also to a column data collection.
SinkResultType PhysicalLeftDelimJoin::Sink(ExecutionContext &context, DataChunk &chunk,
OperatorSinkInput &input) const {
auto &lstate = input.local_state.Cast<LeftDelimJoinLocalState>();
lstate.lhs_data.Append(lstate.append_state, chunk);
OperatorSinkInput distinct_sink_input {*distinct->sink_state, *lstate.distinct_state, input.interrupt_state};
distinct->Sink(context, chunk, distinct_sink_input);
return SinkResultType::NEED_MORE_INPUT;
}
I've spent some time reviewing this and I'd like to state my understanding:
There are two new operators. A more generic name for each operator might be:
HashEquiJoin_SaveBuildHashTableKeys and AggregateDistinct_ReadSavedHashTableFromElsewhere
The HashEquiJoin will save the build hash table keys and the AggregateDistinct will read the saved hash table keys from elsewhere as opposed to actually reading it's input.
I have often referred this to a sideband operation: an operator communicates a secondary dataset with another operator through a path other than the DAG. I've seen this pattern used for examples like this, agg+sort, and dynamic partition pruning/bloomfilter pushdown. I think my main question is what are our goals doing this in Substrait core?
I'll make the assumption that we're trying to make this as compatible as possible (which is why we're doing in core as opposed to via a 3rd party extension). Given that, I think it is best to look at the two variations on normal operations as optimizations. If we think about it that way, I think we can actually define these as simple optimization extensions. One optimization extension would be used for the existing HashEquiJoinRel that is "save the build side keys to sideband:<ID>". The second one would be an optimization extension for AggregateRel that would be "use the output from sideband:<ID> instead of reading from your input".
If we followed this pattern, it means that a plan from duckdb could be used in other systems (they could simply ignore the optimization extensions) and a plan from other systems could be run in duckdb (whether it used the optimization extension or not). Additionally, this pattern should roundtrip in duckdb with full fidelity. (I believe that this would also be a relatively simple thing to implement in the DuckDB Substrait extension... e.g. a simple transformation from the current proposed format in both directions)
Thoughts?
I like the idea of the reusable operators (although I think of them as AggregateDistinct_SaveBuildHashTableKeys and HashEquiJoin_ReadSavedHashTableFromElsewhere). We could add an optimization metadata to HashEquiJoin (where to get the saved hash table) as the implementation of one of them. Which means we probably could do something similar for the aggregation (adding a metadata to save the hash data). The format of the hash table could be left to the engine if we don't want to do cross-system optimization (which would also require defining where to store it along with what it looks like). The end proposal would look more like "save a numbered hashtable" and "use a numbered hashtable" for any such operations that could take advantage of it.
One issue with ignoring the optimizations may be correctness (such as TPC-H 15's rounding error problem with floats caused by computing the same subtree twice but with a potential different rounding result. But that's an existing problem that can be solved elsewhere (such as with Decimal) or by using ReferenceRel.
The other is potentially performance. I'm not sure but a plan rewritten to be more efficient with these optimizations may end up being much slower on systems without it.
although I think of them as AggregateDistinct_SaveBuildHashTableKeys and HashEquiJoin_ReadSavedHashTableFromElsewhere
I agree that your version makes more sense given data flow. Unfortunately the nature of the hashtable in the join is that has more information than a hash table in a distinct op (each join hash table record links to the carry-along records whereas the distinct op does not maintain those links). Given that, the hashtable from the join can help the aggregate but the hashtable from the aggregate can't help the join (basically). At least that has been my experience.
One issue with ignoring the optimizations may be correctness
I don't see an issue with correctness. Beyond the float comment, where do you see that being an issue?
float...potential different rounding result
float accuracy is generally unreliable--I don't think there is anything special here. Any operation that is shred into multiple parallel ops (be it threads, machines, etc) with float will give a different result depending on the order of parallel roll-ups (which are indeterminate in most systems). Every multithreaded system I've worked with allow that variation even for multiple executions of the same plan.
plan rewritten to be more efficient with these optimizations may end up being much slower on systems without it.
This doesn't seem like a problem. Optimizations by definition are something that if you ignore them, you're likely to perform slower.
I haven't added any documentation but do optimization hints as described in #705 seem like they would suffice as a replacement for these two new operators?