delta
delta copied to clipboard
Support Optimized Write
Description
Support OptimizeWrite described in https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-optimized-writes-work
Fixes #1158
If OptimizeWrite is enabled, inject OptimizeWriteExchangeExec
on top of the write plan and remove ShuffleExchangeExec
or CoalesceExchange
operation at the top of the plan to avoid unnecessary shuffle / stage.
In OptimizeWriteExchangeExec,
- Repartition data
-
RoundRobinPartitining
for non partitioned data,HashPartitioning
for partitioned data. - Use
spark.sql.shuffle.partitions
for partitioning. We can introduce a new config likespark.sql.adaptive.coalescePartitions.initialPartitionNum
if needed.
-
- Rebalance partitions for write
- Step1 - merge small partitions (
CoalescedPartitionSpec
) - Step2 - split large partitions (
PartialReducerPartitionSpec
) - targetSize config:
spark.databricks.delta.optimizeWrite.binSize
(default: 128MB)
- Step1 - merge small partitions (
How to enable
Ref: https://docs.databricks.com/delta/optimizations/auto-optimize.html#enable-auto-optimize We can enable OptimizeWrite using Spark session config or table property.
- Spark session config
-
spark.databricks.delta.optimizeWrite.enabled
= true - applied for write operations of all Delta tables)
-
- Table property
-
delta.autoOptimize.optimizeWrite
= true
-
Spark session config is prior to the table property.
How was this patch tested?
Unit tests (+ more tests will be added)
Does this PR introduce any user-facing changes?
Yes, support OptimizeWrite
Thank you @sezruby for creating this PR! This is a very useful feature. We are currently busy with the next release of Delta Lake. Will be reviewing the PR after the release.
Can you please fix the conflicts?
Gentle ping on this again, just started using this in our production environment and would be great not to have to maintain my own Delta fork 😅
Please let me know if someone is ready to review. I'll rebase the PR then.
any update on this PR?
Any update on this PR?.
Hi @tdas @scottsand-db Any update? So no plan to deliver this feature to OSS delta?
I am using your PR for optimize write in my EMR streaming use cases.
I want that optimize write should create around 128m parquet files but it is topping at 64m.
Configuration I am using is as below "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.databricks.delta.autoCompact.enabled": "true", "spark.databricks.delta.optimizeWrite.enabled": "true", "spark.databricks.delta.autoCompact.minNumFiles": "500", "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128m", "spark.sql.adaptive.coalescePartitions.minPartitionNum": "1", "spark.sql.adaptive.coalescePartitions.initialPartitionNum": "200", "spark.databricks.delta.optimizeWrite.binSize": "134217728
2023-09-22 08:22:54 61.9 MiB part-00000-eb8c145c-7ac4-420c-8b85-c6fd58861e40.c000.snappy.parquet 2023-09-22 08:24:55 43.7 MiB part-00001-4fbd8215-5129-48aa-b858-d2ab9d2ca7a6.c000.snappy.parquet 2023-09-22 08:24:58 54.8 MiB part-00000-de4d1a37-543c-4c80-a2dd-9cbc06af7b3c.c000.snappy.parquet 2023-09-22 08:27:06 52.7 MiB part-00000-c6cf2c15-8dc3-4346-a03c-eb83171fdf46.c000.snappy.parquet 2023-09-22 08:27:06 59.3 MiB part-00001-a5f3db2e-e352-45b0-aa1b-bbf1570ebb6f.c000.snappy.parquet 2023-09-22 08:28:56 69.1 MiB part-00000-697cca77-e7ea-4267-9f7b-3fd406678b31.c000.snappy.parquet 2023-09-22 08:31:03 52.8 MiB part-00000-3dfbf27e-53f3-4f35-8955-7675d0ec86bf.c000.snappy.parquet 2023-09-22 08:31:05 60.5 MiB part-00001-6b7bc1d7-9882-4ba0-a01b-619a0a424ae4.c000.snappy.parquet 2023-09-22 08:32:58 32.5 MiB part-00001-ea4d882a-f182-48d4-a3e8-bc23260e617e.c000.snappy.parquet 2023-09-22 08:33:03 54.1 MiB part-00000-4c53bf10-eede-45c3-b178-f9d4f2895956.c000.snappy.parquet 2023-09-22 08:35:00 45.7 MiB part-00001-138bdaab-8d77-4faf-af49-489c4cfad4f6.c000.snappy.parquet 2023-09-22 08:35:01 51.7 MiB part-00000-62a2bdfa-9dea-41ff-9f8c-4e3ebcb12d04.c000.snappy.parquet 2023-09-22 11:42:10 52.0 MiB part-00000-64bccc7b-2664-4e67-b478-413a5a7554db.c000.snappy.parquet 2023-09-22 11:42:12 57.9 MiB part-00001-b79e83d9-c65e-438b-a555-aa2028320ea5.c000.snappy.parquet 2023-09-22 11:45:20 56.4 MiB part-00001-1ef411fb-160c-4cba-a5e7-cdee540295a5.c000.snappy.parquet 2023-09-22 11:45:20 57.0 MiB part-00004-fa51fd84-c4d3-4980-9a88-5566283d3234.c000.snappy.parquet 2023-09-22 11:45:20 57.5 MiB part-00005-80e03284-a8a6-4937-857c-1c434527c956.c000.snappy.parquet 2023-09-22 11:45:20 57.8 MiB part-00000-a347b170-d15f-4328-b402-ccdc099e94da.c000.snappy.parquet 2023-09-22 11:45:20 57.9 MiB part-00006-d566e257-f0cf-4cf9-b968-cad449784226.c000.snappy.parquet 2023-09-22 11:45:20 57.9 MiB part-00009-18a54bcf-3d53-4766-b485-62332266c974.c000.snappy.parquet 2023-09-22 11:45:20 58.5 MiB part-00003-8cf66203-9a32-41b5-a617-a3a9e9b2e465.c000.snappy.parquet 2023-09-22 11:45:21 38.7 MiB part-00010-1b8e65ae-b366-46fa-818e-d8ac95fd5904.c000.snappy.parquet 2023-09-22 11:45:22 57.1 MiB part-00007-05dc33ff-5125-435f-b0f1-f4215fba9d5b.c000.snappy.parquet 2023-09-22 11:45:22 58.9 MiB part-00008-5c99002a-1d7b-4db5-8c14-8e3f966773ec.c000.snappy.parquet 2023-09-22 11:45:23 57.6 MiB part-00002-6844ca06-b437-486f-b15b-ebf93fc0e740.c000.snappy.parquet
Any suggestions?
@adityakumar84 you can try to increase the binSize config. It's used for row format size in memory. The result parquet size, we cannot control it precisely as it depends on data property / parquet compression ratio. To make more accurate, we need to collect parquet compression ratio history from previous wrtie job... etc