delta icon indicating copy to clipboard operation
delta copied to clipboard

Support Optimized Write

Open sezruby opened this issue 2 years ago • 1 comments

Description

Support OptimizeWrite described in https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-optimized-writes-work

Fixes #1158

If OptimizeWrite is enabled, inject OptimizeWriteExchangeExec on top of the write plan and remove ShuffleExchangeExec or CoalesceExchange operation at the top of the plan to avoid unnecessary shuffle / stage.

In OptimizeWriteExchangeExec,

  1. Repartition data
    • RoundRobinPartitining for non partitioned data, HashPartitioning for partitioned data.
    • Use spark.sql.shuffle.partitions for partitioning. We can introduce a new config like spark.sql.adaptive.coalescePartitions.initialPartitionNum if needed.
  2. Rebalance partitions for write
    • Step1 - merge small partitions (CoalescedPartitionSpec)
    • Step2 - split large partitions (PartialReducerPartitionSpec)
    • targetSize config: spark.databricks.delta.optimizeWrite.binSize (default: 128MB)

How to enable

Ref: https://docs.databricks.com/delta/optimizations/auto-optimize.html#enable-auto-optimize We can enable OptimizeWrite using Spark session config or table property.

  1. Spark session config
    • spark.databricks.delta.optimizeWrite.enabled = true
    • applied for write operations of all Delta tables)
  2. Table property
    • delta.autoOptimize.optimizeWrite = true

Spark session config is prior to the table property.

How was this patch tested?

Unit tests (+ more tests will be added)

Does this PR introduce any user-facing changes?

Yes, support OptimizeWrite

sezruby avatar Jun 13 '22 16:06 sezruby

Thank you @sezruby for creating this PR! This is a very useful feature. We are currently busy with the next release of Delta Lake. Will be reviewing the PR after the release.

vkorukanti avatar Jun 14 '22 17:06 vkorukanti

Can you please fix the conflicts?

scottsand-db avatar Sep 15 '22 15:09 scottsand-db

Gentle ping on this again, just started using this in our production environment and would be great not to have to maintain my own Delta fork 😅

Kimahriman avatar Apr 09 '23 14:04 Kimahriman

Please let me know if someone is ready to review. I'll rebase the PR then.

sezruby avatar Apr 10 '23 17:04 sezruby

any update on this PR?

isunli avatar May 16 '23 16:05 isunli

Any update on this PR?.

cb-sukumarnataraj avatar Sep 01 '23 13:09 cb-sukumarnataraj

Hi @tdas @scottsand-db Any update? So no plan to deliver this feature to OSS delta?

sezruby avatar Sep 06 '23 16:09 sezruby

I am using your PR for optimize write in my EMR streaming use cases.

I want that optimize write should create around 128m parquet files but it is topping at 64m.

Configuration I am using is as below "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.databricks.delta.autoCompact.enabled": "true", "spark.databricks.delta.optimizeWrite.enabled": "true", "spark.databricks.delta.autoCompact.minNumFiles": "500", "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128m", "spark.sql.adaptive.coalescePartitions.minPartitionNum": "1", "spark.sql.adaptive.coalescePartitions.initialPartitionNum": "200", "spark.databricks.delta.optimizeWrite.binSize": "134217728

2023-09-22 08:22:54 61.9 MiB part-00000-eb8c145c-7ac4-420c-8b85-c6fd58861e40.c000.snappy.parquet 2023-09-22 08:24:55 43.7 MiB part-00001-4fbd8215-5129-48aa-b858-d2ab9d2ca7a6.c000.snappy.parquet 2023-09-22 08:24:58 54.8 MiB part-00000-de4d1a37-543c-4c80-a2dd-9cbc06af7b3c.c000.snappy.parquet 2023-09-22 08:27:06 52.7 MiB part-00000-c6cf2c15-8dc3-4346-a03c-eb83171fdf46.c000.snappy.parquet 2023-09-22 08:27:06 59.3 MiB part-00001-a5f3db2e-e352-45b0-aa1b-bbf1570ebb6f.c000.snappy.parquet 2023-09-22 08:28:56 69.1 MiB part-00000-697cca77-e7ea-4267-9f7b-3fd406678b31.c000.snappy.parquet 2023-09-22 08:31:03 52.8 MiB part-00000-3dfbf27e-53f3-4f35-8955-7675d0ec86bf.c000.snappy.parquet 2023-09-22 08:31:05 60.5 MiB part-00001-6b7bc1d7-9882-4ba0-a01b-619a0a424ae4.c000.snappy.parquet 2023-09-22 08:32:58 32.5 MiB part-00001-ea4d882a-f182-48d4-a3e8-bc23260e617e.c000.snappy.parquet 2023-09-22 08:33:03 54.1 MiB part-00000-4c53bf10-eede-45c3-b178-f9d4f2895956.c000.snappy.parquet 2023-09-22 08:35:00 45.7 MiB part-00001-138bdaab-8d77-4faf-af49-489c4cfad4f6.c000.snappy.parquet 2023-09-22 08:35:01 51.7 MiB part-00000-62a2bdfa-9dea-41ff-9f8c-4e3ebcb12d04.c000.snappy.parquet 2023-09-22 11:42:10 52.0 MiB part-00000-64bccc7b-2664-4e67-b478-413a5a7554db.c000.snappy.parquet 2023-09-22 11:42:12 57.9 MiB part-00001-b79e83d9-c65e-438b-a555-aa2028320ea5.c000.snappy.parquet 2023-09-22 11:45:20 56.4 MiB part-00001-1ef411fb-160c-4cba-a5e7-cdee540295a5.c000.snappy.parquet 2023-09-22 11:45:20 57.0 MiB part-00004-fa51fd84-c4d3-4980-9a88-5566283d3234.c000.snappy.parquet 2023-09-22 11:45:20 57.5 MiB part-00005-80e03284-a8a6-4937-857c-1c434527c956.c000.snappy.parquet 2023-09-22 11:45:20 57.8 MiB part-00000-a347b170-d15f-4328-b402-ccdc099e94da.c000.snappy.parquet 2023-09-22 11:45:20 57.9 MiB part-00006-d566e257-f0cf-4cf9-b968-cad449784226.c000.snappy.parquet 2023-09-22 11:45:20 57.9 MiB part-00009-18a54bcf-3d53-4766-b485-62332266c974.c000.snappy.parquet 2023-09-22 11:45:20 58.5 MiB part-00003-8cf66203-9a32-41b5-a617-a3a9e9b2e465.c000.snappy.parquet 2023-09-22 11:45:21 38.7 MiB part-00010-1b8e65ae-b366-46fa-818e-d8ac95fd5904.c000.snappy.parquet 2023-09-22 11:45:22 57.1 MiB part-00007-05dc33ff-5125-435f-b0f1-f4215fba9d5b.c000.snappy.parquet 2023-09-22 11:45:22 58.9 MiB part-00008-5c99002a-1d7b-4db5-8c14-8e3f966773ec.c000.snappy.parquet 2023-09-22 11:45:23 57.6 MiB part-00002-6844ca06-b437-486f-b15b-ebf93fc0e740.c000.snappy.parquet

Any suggestions?

adityakumar84 avatar Sep 22 '23 07:09 adityakumar84

@adityakumar84 you can try to increase the binSize config. It's used for row format size in memory. The result parquet size, we cannot control it precisely as it depends on data property / parquet compression ratio. To make more accurate, we need to collect parquet compression ratio history from previous wrtie job... etc

sezruby avatar Sep 26 '23 01:09 sezruby