delta
delta copied to clipboard
[BUG] Delta Lake MERGE INTO will hang Spark session without result (10000 columns).
Bug
Which Delta project/connector is this regarding?
I am working with Delta Lake 3.1.0 and Spark 3.5.1.
Describe the problem
I am experimenting with Delta Lake as the primary storage solution for my tabular data, which is updated daily. I tried to mimic the basic use case - an existing target table is updated by the new data that can change existing data, i.e. upserts. I am using a MERGE INTO operation, where the target table is my Delta table and the table with updates is simply saved as a Parquet file.
My tables are special in the number of columns - there are up to 10000 columns, most of them are binary. One column contains an identifier of a row, represented as a string hash, which is used in the matching condition of the merge operation.
I am experimenting with a small main table having 5000 rows, which is 10 MB of one Parquet file on disk and the same table, but stored as a plain Parquet, which has several small 3.5 MB files.
My merge operation takes extremely long, probably stuck without any computation. What am I missing? I haven't tried partitioning since the size of the whole table is just 10 MB. I expected this operation to be extremely fast even in the case of non-optimized tables.
I will appreciate your help, thank you!
UPD: added the code to reproduce the issue.
Spark version: 3.5.1. Delta Lake (delta-spark) version: 3.1.0. Instance: r5n.16xlarge
The only output that I get is
24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1094.0 KiB
24/05/20 14:18:21 WARN DAGScheduler: Broadcasting large task binary with size 1097.6 KiB
Then the Spark session hangs and does nothing. The code does not finish.
Steps to reproduce
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta import *
def get_spark():
builder = SparkSession.builder.master("local[4]").appName('SparkDelta') \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.driver.memory", "32g") \
.config("spark.jars.packages",
"io.delta:delta-spark_2.12:3.1.0,"
"io.delta:delta-storage:3.1.0") \
spark = builder.getOrCreate()
return spark
spark = get_spark()
### GENERATE SOME FAKE DATA
import pandas as pd
import numpy as np
num_rows = 5000
num_cols = 10000
array = np.random.rand(num_rows, num_cols)
df = pd.DataFrame(array)
df = df.reset_index()
df.to_csv('features.csv')
### DATA TO DELTA LAKE TABLE
df_spark = spark.read.format('csv').load('features.csv')
df_spark.write.format('delta').mode("overwrite").save('deltalake_features')
# Creating data to merge - just first 100 rows
df_slice = df.iloc[:100, 0:2]
df_slice.to_csv('features_slice_100.csv')
df_spark_slice = spark.read.format('csv').load('features_slice_100.csv')
### MERGE INTO code
target_delta_table = DeltaTable.forPath(spark, 'deltalake_features')
df_updates = df_spark_slice
df_target = target_delta_table.toDF()
(
target_delta_table.alias('target')
.merge(
df_updates.alias('updates'),
(
(F.col('target._c1') == F.col('updates._c1'))
)
)
.whenMatchedUpdate(set={
'_c2': F.lit(42)
})
.execute()
)
Observed results
The Spark session hangs for no reason.
Expected results
I expect the merge to finish successfully.
Further details
If I do a simple inner join, the operation takes just 1 minute to complete, so I'd like to see the same speed here.
Environment information
- Delta Lake version: 3.1.0
- Spark version: 3.5.1
- Scala version: 2.12.18