Hash partitioning should satisfy hash subset partitioning
Is your feature request related to a problem or challenge?
When DF checks if our partitioning is satisfied or if a repartition is needed it only checks if the expressions the partitions are hashed on is equal to the required expressions: hash_required = hash_current.
Hash partitioning is also satisfied if we are partitioned on a superset of the required hash expressions: hash_required "is subset of" hash_current. Thus, in certain situations it would be ideal if we did not insert a repartition if we have a superset of the required partitioning.
This comes with the caveat of skewed data. Say we are hash via Hash(a) and Hash(a, b) is required. We implicitly satisfy Hash(a, b) via this superset property but we may still want to insert a repartition if our data is heavily skewed when repartitioned via Hash(a) as the repartitioning would help distribute this skew. Here is an example, say we are partitioned via Hash(a) and have 3 total cpus (target_partitions):
Partition 1:
a b
A1 B1
A1 B1
A1 B1
A1 B1
A1 B2
A1 B2
A1 B2
A1 B2
A1 B3
A1 B3
A1 B3
Partiton 2:
A2 B1
A2 B1
A2 B1
Partiton 3:
A3 B1
A3 B1
A3 B1
Then in our plan we hit a operator that requires partitioning via Hash(a, b). Although our existing Hash(a) implicitly satisfies the required Hash(a, b) since all occurrences of a (a, b) combination are contained within a single partition, we would get a much better data distribution if we repartitioned on Hash(a, b).
Partition 1:
a b
A1 B1
A1 B1
A1 B1
A1 B1
Partiton 2:
A1 B2
A1 B2
A1 B2
A1 B2
A2 B1
A2 B1
A2 B1
Partiton 3:
A3 B1
A3 B1
A3 B1
A1 B3
A1 B3
A1 B3
Thus, this needs to be considered in the implementation.
Describe the solution you'd like
There are two approaches I have thought of and would like to discuss.
- An option that can be a boolean that turns this behavior on or off. This is simple to implement and low risk.
- We can read the amount of rows in parquet metadata to make a heuristic of when this behavior should turn on or not. Thus, if we are already distributed well we wont repartition, otherwise insert the repartition. This comes with the complexity of make a good heuristic and more code added.
Describe alternatives you've considered
I listed the two approaches above and am considering them equally. Would love more thoughts.
Additional context
The elimination of shuffles would have a huge impact for distributed datafusion.
cc: @NGA-TRAN
take
I agree with the assessment that data that is partitioned on Hash(a) is also partitioned on Hash(a,b) as well as your assessment of the potential skew problem.
I think skew is likely only a problem either for these scenarios
- Very low cardinality data (where there are fewer distinct values than partitions)
- Single value skew (where one of the values occurs far more often than the others)
An option that can be a boolean that turns this behavior on or off. This is simple to implement and low risk.
I think starting with a flag to force repartitioning would be a good start.
We can read the amount of rows in parquet metadata to make a heuristic of when this behavior should turn on or not. Thus, if we are already distributed well we wont repartition, otherwise insert the repartition. This comes with the complexity of make a good heuristic and more code added.
I agree with the tradeoffs.
There are already some other heuristics in the configuration settings that are similar, where they have to trade off "keep the existing sort" or repartition to get maximum parallelism"
https://datafusion.apache.org/user-guide/configs.html
For example
| datafusion.optimizer.prefer_existing_sort | false |
|---|
In keeping with DataFusion's extensible nature, I wonder if we should make some sort of trait that encapsulates these heuristics (not this issue, for a follow on PR)
pub trait OptimizerHeuristics {
// should the optimizer repartition
fn should_repartition(&self, input: &ExecutionPlan, ...)
...
}
🤔
@gene-bordegaray : Very nice description of the problem and the the propose solutions with trade-offs. I agree with @alamb that we should go with the flag option first.