delta icon indicating copy to clipboard operation
delta copied to clipboard

[Feature Request] OPTIMIZED WRITE

Open sezruby opened this issue 2 years ago • 1 comments

Feature request

Overview

Support Optimized Write described in https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-optimized-writes-work

Motivation

With frequent append jobs or MERGE/UPDATE operation, underlying data can be easily fragmented. OPTIMIZE and Auto Compaction cannot leverage all scenarios as it's involving additional commit.

We can utilize the existing functionalities of adaptive query execution, e.g. : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala https://github.com/apache/spark/blob/d9129205a0133650e3fc6f335846cc6fe88c9ea9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L1551

As RebalancePartitions works with AQE, we need to add additional plans for OptimizeWrite.

Further details

If Optimize Write is enabled, we can modify the physical plan for writing data in TransactionalWrite.scala

Things to do

  1. Repartition
    • RoundRobinPartitioning for non-partitioned data
    • HashPartitioning for partitioned data, with partition values
  2. Rebalance partitions - Adaptive Partition Reader
    • After repartition(shuffle), we can merge or split partitions using PartitionSpec
      • PartialReducerPartitionSpec - to split large partitions
      • CoalescedPartitionSpec - to merge small partitions
    • To rebalance partitions, we can use or copy the logic in ShufflePartitionsUtil.scala: https://github.com/apache/spark/blob/9e1d00c521964a6bbf7e0126fd6dcf0020509420/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
  3. Remove unnecessary repartitions
    • There can be repartition operations in the plan which users added.
    • Unnecessary repartitions can be removed by CollapseRepartition in general
    • So we just need to remove the repartition on top of the query plan.
  • exclude OPTIMIZE/ZORDER operation - no need to do OptimizeWrite

Example for Optimize Write for partitioned data

image

Process
  • Each color block shows the amount of row data from each previous partition1, 2, 3
  • As the data is partitioned by partCol, use HashPartitioning(partCol, 4) for exchange (assume spark.sql.shuffle.partitios=4)
    • The data can be skewed. (e.g. less rows of partCol=B, more rows of partCol=E)
  • Merge small partitions first
  • Split large partitions if needed
  • To avoid too small like P4-2, introduced optimizeWrite.smallPartitionFactor / optimizeWrite.mergedPartitionFactor
    • same as in Spark implementation
    • make it configurable, but would be internal as it's too much detail for users.
Expected output file layout

image

  • Without OptimizeWrite, there will be 3 x 4 (partition1,2,3 x partition value D,A,B,E) - 2 (no green for partCol=A,B) = 10
    • Written by 3 Spark tasks
  • With df.repartition(partCol), there will be just one file for each partition - 2 large and 2 small files
    • Written by 4 Spark tasks (one for each partition, if hash values are all different like the example)
  • With OptimizeWrite, 7 of rebalanced files will be generated. [P1-1, P1-2, P1-3, P2, P3, P4-1:2, P4-3]
    • Written by 6 Spark tasks, as we rebalanced => better job distribution, expect some performance gain of write job.
    • P2, P3 will be written by a spark task, as they are coalesced. The data will be sorted by partition value within the partition before writing.
Limitations
  • As this feature utilizes the existing Spark rebalance implementation, we need to use the PartitionSpec accordingly for rebalancing.
  • Cannot split each block from "previous stage" after Exchange, even it's too large. This can result in skewed output file size even using OptimizeWrite, if the input plan has inefficient output RDD / skewed data.
    • e.g. 1 large partition with same partition value cannot be split
  • Cannot reorder each block for merging small blocks as we can just specify the range of partition id.

Things to consider

  • ~add LogicalPlan & Rule[LogicalPlan] & SparkStrategy & SparkPlan~ OR add SparkPlan only
  • how to handle ordered data - we can skip OptimizeWrite if SortExec of non-partitioning columns is in the plan
  • RoundRobin partitioning can be inefficient for non-partitioned data. It distributes all rows.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • [x] Yes. I can contribute this feature independently.

NOTE: We have a working prototype for OptimizeWrite Delta 1.0 / Spark 3.1

sezruby avatar May 27 '22 19:05 sezruby

Why is there a second PR for this when @sezruby PR #1198 has been open and not reviewed for 16 months? @scottsand-db , @vkorukanti what is going on here? Why is #2145 being pushed through instead?

rasidhan avatar Oct 10 '23 19:10 rasidhan