spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-39607][SQL][DSV2] Distribution and ordering support V2 function in writing

Open pan3793 opened this issue 2 years ago • 4 comments

What changes were proposed in this pull request?

Add new feature, make distribution and ordering support V2 function in writing.

Currently, the rule V2Writes support converting ApplyTransform to TransformExpression (unevaluable), this PR makes V2Writes supports converting TransformExpression to ApplyFunctionExpression/Invoke/StaticInvoke (evaluable).

Why are the changes needed?

SPARK-33779 introduced API for DSv2 writer to claim distributions and orderings of data before writing, w/ SPARK-34026, Spark can translate IdentityTransform to catalyst expression in distributions and orderings expressions.

But for some databases like ClickHouse, which allows table partition defined by an expression, e.g. PARTITIONED BY num % 10 it's useful to support translating ApplyTransform so Spark can organize the data to fit the target storage requirement before writing.

Does this PR introduce any user-facing change?

Yes, user can use V2 function as partition transform in DSv2 connector.

How was this patch tested?

UT added.

pan3793 avatar Jun 26 '22 11:06 pan3793

cc @cloud-fan @sunchao @ulysses-you @huaxingao

pan3793 avatar Jun 27 '22 01:06 pan3793

Can one of the admins verify this patch?

AmplabJenkins avatar Jun 27 '22 10:06 AmplabJenkins

The rule V2Writers is placed in the optimizing phase but does something maybe should happen in analyzing phase like resolve function and implicit type cast. The current approach is trying to minimize the code change, I'm not sure if it's the right direction.

pan3793 avatar Jul 15 '22 09:07 pan3793

@cloud-fan @sunchao sorry for bothering you again, please take a look when you have time.

pan3793 avatar Jul 20 '22 04:07 pan3793

I'd love to take a look tomorrow as well (not blocking).

aokolnychyi avatar Aug 18 '22 06:08 aokolnychyi

In general, this feature looks reasonable, but it's interesting to discuss the behavior of "v2 write required distribution" with this new feature.

Let's assume the required distribution is ClusteredDistribution, its doc says

/**
 * A distribution where tuples that share the same values for clustering expressions are co-located
 * in the same partition.
 *
 * @since 3.2.0
 */
@Experimental
public interface ClusteredDistribution extends Distribution

This means, the clustering expressions are the keys, and Spark makes sure records with the same keys go to the same partition. What Spark does is: for each record, calculate the keys, hash the keys and assign a partition ID for the record based on the hash of the keys.

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

That said, I think most users will not use bucket transform as the clustering expressions. If they do, it's there choice and Spark won't do anything wrong.

What do you think? @sunchao @aokolnychyi

cloud-fan avatar Aug 24 '22 15:08 cloud-fan

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

@cloud-fan I think you raised a good point. With the double-hashing mentioned above the parallelism could even be less than the number of buckets due to collision (but I guess this is just a minor thing since the chance is low). Even though the actual number of Spark tasks may be much larger than the number of buckets, most of the tasks will receive empty input in this scenario.

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

This means it now relies on Spark's hash function for bucketing though, which could be different from other engines. I think it would cause compatibility issues, right?

That said, I think most users will not use bucket transform as the clustering expressions.

Hmm I'm not sure whether this is true. @aokolnychyi may know more from Iceberg side.

sunchao avatar Aug 24 '22 18:08 sunchao

In general, let's say a V2 transform function maps a key to a value, given a set of keys, the value space should always be <= the key space. Therefore, it's seems better for Spark to shuffle the records based on the keys, while still uses the V2 transform function provided to determine the actual partition/bucket a record should belong to.

Maybe we can introduce a special case for KeyGroupedPartitioning in ShuffleExchangeExec, which only consider keys, rather values after V2 transform function evaluation, when computing hash and deciding the partition ID for an input record.

sunchao avatar Aug 24 '22 19:08 sunchao

This means it now relies on Spark's hash function for bucketing though, which could be different from other engines.

Let's think about it this way: The v2 data source only needs Spark to local-sort the data by bucket id, which means the required ordering will be a v2 function that generates bucket id. Then the v2 writer generates the bucket id again using the same v2 function during data writing. Or the v2 writer can just use a hash map to keep open file handlers so that Spark doesn't need to sort the data. The extra clustering is only to reduce the number of files we write out.

The Spark hash algorithm only matters when reading bucketed tables and trying to avoid shuffles. I think this is handled well already. Spark will shuffle a v2 table scan even if it's bucketed if the other side of the join is a normal table scan with shuffle.

cloud-fan avatar Aug 25 '22 01:08 cloud-fan

Makes sense. So you mean V2 data source providers can report only the clustered columns through RequiresDistributionAndOrdering while still use the V2 transform functions on those columns in the V2 writer tasks.

sunchao avatar Aug 25 '22 06:08 sunchao

@sunchao yes

cloud-fan avatar Aug 25 '22 06:08 cloud-fan

This means, the clustering expressions are the keys, and Spark makes sure records with the same keys go to the same partition. What Spark does is: for each record, calculate the keys, hash the keys and assign a partition ID for the record based on the hash of the keys.

How can we use this feature to implement bucket writing? We can use the expression (a v2 function) that calculates the bucket ID as the clustering expressions. Then Spark will make sure records with the same bucket ID will be in the same partition. However, the problem of this approach is low parallelism (at most number of buckets).

A different approach is to use the bucket columns as the clustering expressions. Spark will make sure records with the same bucket columns values will be in the same partition. Then the v2 write can require a local sort with bucket id (a v2 function) so that records with the same bucket ID will be grouped together.

@cloud-fan, I agree with your summary. It seems to me like a classic trade-off between fewer files (clustering by bucket ID) and better parallelism (local sort by bucket ID). I believe the current Spark API is flexible enough so that data sources can request either of those depending on their internal logic. The third alternative is to leverage an ordered distribution by bucket ID + some other key. In that case, Spark will do skew estimation while determining ranges. Each option has its own benefits and drawbacks so just allowing data sources to pick what works best should be enough.

To sum up, I feel the logic in this PR works and the existing API should cover all discussed cases.

What do you think, @cloud-fan @sunchao?

aokolnychyi avatar Aug 25 '22 20:08 aokolnychyi

I guess AQE can also help us to coalesce and split too big partitions during writes.

aokolnychyi avatar Aug 25 '22 20:08 aokolnychyi

I guess AQE can also help us to coalesce and split too big partitions during writes.

This should be covered by SPARK-37523.

Thanks @cloud-fan @sunchao @aokolnychyi for reviewing and discussing, and is there anything I can do before this PR gets in?

pan3793 avatar Aug 28 '22 14:08 pan3793

thanks, merging to master!

cloud-fan avatar Aug 29 '22 06:08 cloud-fan

late LGTM too, sorry for the delay on this PR @pan3793 !

sunchao avatar Aug 29 '22 16:08 sunchao