datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Hash partitioning should satisfy hash subset partitioning

Open gene-bordegaray opened this issue 4 weeks ago • 2 comments

Is your feature request related to a problem or challenge?

When DF checks if our partitioning is satisfied or if a repartition is needed it only checks if the expressions the partitions are hashed on is equal to the required expressions: hash_required = hash_current.

Hash partitioning is also satisfied if we are partitioned on a superset of the required hash expressions: hash_required "is subset of" hash_current. Thus, in certain situations it would be ideal if we did not insert a repartition if we have a superset of the required partitioning.

This comes with the caveat of skewed data. Say we are hash via Hash(a) and Hash(a, b) is required. We implicitly satisfy Hash(a, b) via this superset property but we may still want to insert a repartition if our data is heavily skewed when repartitioned via Hash(a) as the repartitioning would help distribute this skew. Here is an example, say we are partitioned via Hash(a) and have 3 total cpus (target_partitions):

Partition 1:
a	b
A1	B1
A1	B1
A1	B1
A1	B1
A1	B2
A1	B2
A1	B2
A1	B2
A1	B3
A1	B3
A1	B3

Partiton 2:
A2	B1
A2	B1
A2	B1

Partiton 3:
A3	B1
A3	B1
A3	B1

Then in our plan we hit a operator that requires partitioning via Hash(a, b). Although our existing Hash(a) implicitly satisfies the required Hash(a, b) since all occurrences of a (a, b) combination are contained within a single partition, we would get a much better data distribution if we repartitioned on Hash(a, b).

Partition 1:
a	b
A1	B1
A1	B1
A1	B1
A1	B1

Partiton 2:
A1	B2
A1	B2
A1	B2
A1	B2
A2	B1
A2	B1
A2	B1

Partiton 3:
A3	B1
A3	B1
A3	B1
A1	B3
A1	B3
A1	B3

Thus, this needs to be considered in the implementation.

Describe the solution you'd like

There are two approaches I have thought of and would like to discuss.

  1. An option that can be a boolean that turns this behavior on or off. This is simple to implement and low risk.
  2. We can read the amount of rows in parquet metadata to make a heuristic of when this behavior should turn on or not. Thus, if we are already distributed well we wont repartition, otherwise insert the repartition. This comes with the complexity of make a good heuristic and more code added.

Describe alternatives you've considered

I listed the two approaches above and am considering them equally. Would love more thoughts.

Additional context

The elimination of shuffles would have a huge impact for distributed datafusion.

gene-bordegaray avatar Dec 10 '25 20:12 gene-bordegaray

cc: @NGA-TRAN

gene-bordegaray avatar Dec 10 '25 20:12 gene-bordegaray

take

gene-bordegaray avatar Dec 11 '25 18:12 gene-bordegaray

I agree with the assessment that data that is partitioned on Hash(a) is also partitioned on Hash(a,b) as well as your assessment of the potential skew problem.

I think skew is likely only a problem either for these scenarios

  1. Very low cardinality data (where there are fewer distinct values than partitions)
  2. Single value skew (where one of the values occurs far more often than the others)

An option that can be a boolean that turns this behavior on or off. This is simple to implement and low risk.

I think starting with a flag to force repartitioning would be a good start.

We can read the amount of rows in parquet metadata to make a heuristic of when this behavior should turn on or not. Thus, if we are already distributed well we wont repartition, otherwise insert the repartition. This comes with the complexity of make a good heuristic and more code added.

I agree with the tradeoffs.

There are already some other heuristics in the configuration settings that are similar, where they have to trade off "keep the existing sort" or repartition to get maximum parallelism"

https://datafusion.apache.org/user-guide/configs.html

For example

datafusion.optimizer.prefer_existing_sort false

In keeping with DataFusion's extensible nature, I wonder if we should make some sort of trait that encapsulates these heuristics (not this issue, for a follow on PR)

pub trait OptimizerHeuristics {
  // should the optimizer repartition 
  fn should_repartition(&self, input: &ExecutionPlan, ...) 
...
}

🤔

alamb avatar Dec 12 '25 12:12 alamb

@gene-bordegaray : Very nice description of the problem and the the propose solutions with trade-offs. I agree with @alamb that we should go with the flag option first.

NGA-TRAN avatar Dec 12 '25 14:12 NGA-TRAN