delta
delta copied to clipboard
[Feature Request] OPTIMIZED WRITE
Feature request
Overview
Support Optimized Write described in https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-optimized-writes-work
Motivation
With frequent append jobs or MERGE/UPDATE operation, underlying data can be easily fragmented. OPTIMIZE and Auto Compaction cannot leverage all scenarios as it's involving additional commit.
We can utilize the existing functionalities of adaptive query execution, e.g. : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala https://github.com/apache/spark/blob/d9129205a0133650e3fc6f335846cc6fe88c9ea9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L1551
As RebalancePartitions works with AQE, we need to add additional plans for OptimizeWrite.
Further details
If Optimize Write is enabled, we can modify the physical plan for writing data in TransactionalWrite.scala
Things to do
- Repartition
-
RoundRobinPartitioning
for non-partitioned data -
HashPartitioning
for partitioned data, with partition values
-
- Rebalance partitions - Adaptive Partition Reader
- After repartition(shuffle), we can merge or split partitions using PartitionSpec
-
PartialReducerPartitionSpec
- to split large partitions -
CoalescedPartitionSpec
- to merge small partitions
-
- To rebalance partitions, we can use or copy the logic in ShufflePartitionsUtil.scala: https://github.com/apache/spark/blob/9e1d00c521964a6bbf7e0126fd6dcf0020509420/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
- After repartition(shuffle), we can merge or split partitions using PartitionSpec
- Remove unnecessary repartitions
- There can be repartition operations in the plan which users added.
- Unnecessary repartitions can be removed by
CollapseRepartition
in general - So we just need to remove the repartition on top of the query plan.
- exclude OPTIMIZE/ZORDER operation - no need to do OptimizeWrite
Example for Optimize Write for partitioned data
Process
- Each color block shows the amount of row data from each previous partition1, 2, 3
- As the data is partitioned by
partCol
, useHashPartitioning(partCol, 4)
for exchange (assume spark.sql.shuffle.partitios=4)- The data can be skewed. (e.g. less rows of
partCol
=B, more rows ofpartCol
=E)
- The data can be skewed. (e.g. less rows of
- Merge small partitions first
- Split large partitions if needed
- To avoid too small like
P4-2
, introduced optimizeWrite.smallPartitionFactor / optimizeWrite.mergedPartitionFactor- same as in Spark implementation
- make it configurable, but would be internal as it's too much detail for users.
Expected output file layout
- Without OptimizeWrite, there will be 3 x 4 (partition1,2,3 x partition value D,A,B,E) - 2 (no green for partCol=A,B) = 10
- Written by 3 Spark tasks
- With df.repartition(partCol), there will be just one file for each partition - 2 large and 2 small files
- Written by 4 Spark tasks (one for each partition, if hash values are all different like the example)
- With OptimizeWrite, 7 of rebalanced files will be generated.
[P1-1, P1-2, P1-3, P2, P3, P4-1:2, P4-3]
- Written by 6 Spark tasks, as we rebalanced => better job distribution, expect some performance gain of write job.
- P2, P3 will be written by a spark task, as they are coalesced. The data will be sorted by partition value within the partition before writing.
Limitations
- As this feature utilizes the existing Spark rebalance implementation, we need to use the PartitionSpec accordingly for rebalancing.
- Cannot split each block from "previous stage" after Exchange, even it's too large. This can result in skewed output file size even using OptimizeWrite, if the input plan has inefficient output RDD / skewed data.
- e.g. 1 large partition with same partition value cannot be split
- Cannot reorder each block for merging small blocks as we can just specify the range of partition id.
Things to consider
- ~add LogicalPlan & Rule[LogicalPlan] & SparkStrategy & SparkPlan~ OR add SparkPlan only
- how to handle ordered data - we can skip OptimizeWrite if SortExec of non-partitioning columns is in the plan
- RoundRobin partitioning can be inefficient for non-partitioned data. It distributes all rows.
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
- [x] Yes. I can contribute this feature independently.
NOTE: We have a working prototype for OptimizeWrite Delta 1.0 / Spark 3.1
Why is there a second PR for this when @sezruby PR #1198 has been open and not reviewed for 16 months? @scottsand-db , @vkorukanti what is going on here? Why is #2145 being pushed through instead?