delta
delta copied to clipboard
[BUG]Optimize command fails when using liquid clustering on local Delta Lake & PySpark
Bug
Which Delta project/connector is this regarding?
- [ ] Spark
- [X] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
When attempting to optimize a Delta Table configured with liquid clustering, an error occurs during the execution of the optimize().executeCompaction() method.
Steps to reproduce
from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip
# Initializes a SparkSession configured with Delta
builder = (
SparkSession.builder.config(
"spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"
)
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.enableHiveSupport()
)
spark= configure_spark_with_delta_pip(
spark_session_builder=builder
).getOrCreate()
# Initialize an empty Delta Table with liquid clustering
dt = (
DeltaTable.createIfNotExists(spark)
.tableName("testtable")
.addColumn("id", dataType="bigint", nullable=False)
.addColumn("date", dataType="date", nullable=False)
.addColumn("name", dataType="string", nullable=False)
.addColumn("amount", dataType="double")
.addColumn("year_month", dataType="string", nullable=False)
.clusterBy("year_month")
.execute()
)
# Pushes some test data into newly created Delta table
spark.sql(
"""
INSERT INTO testtable VALUES
(1, '2024-01-01', 'Jack', 30.5, '2024-01'),
(2, '2024-02-10', 'Claude', 11.2, '2024-02'),
(3, '2024-02-25', 'Mick', 10.1, '2024-02')
"""
)
# Optimizes the Delta Table (this triggers the error)
dt.optimize().executeCompaction()
Observed results
When running above snippet, I get an extended error traceback.
Py4JJavaError Traceback (most recent call last) Cell In[9], line 1 ----> 1 dt.optimize().executeCompaction()
File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/delta/tables.py:1391, in DeltaOptimizeBuilder.executeCompaction(self) 1382 @since(2.0) # type: ignore[arg-type] 1383 def executeCompaction(self) -> DataFrame: 1384 """ 1385 Compact the small files in selected partitions. 1386 1387 :return: DataFrame containing the OPTIMIZE execution metrics 1388 :rtype: pyspark.sql.DataFrame 1389 """ 1390 return DataFrame( -> 1391 self._jbuilder.executeCompaction(), 1392 getattr(self._spark, "_wrapped", self._spark) # type: ignore[attr-defined] 1393 )
File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.
File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o123.executeCompaction.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$1(OptimizeTableCommand.scala:276)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordFrameProfile(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordOperation(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordDeltaOperation(OptimizeTableCommand.scala:217)
at org.apache.spark.sql.delta.commands.OptimizeExecutor.optimize(OptimizeTableCommand.scala:255)
at org.apache.spark.sql.delta.commands.OptimizeTableCommand.run(OptimizeTableCommand.scala:180)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.
Expected results
I'd expect to run the optimize command successfully
Further details
Environment information
- Python version: 3.10.14
- Delta Lake version: 3.2.0
- Spark version: 3.5.1
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [X] No. I cannot contribute a bug fix at this time.
@donielix Could you paste the full error callstack? cc. @zedtang
@donielix Could you paste the full error callstack? cc. @zedtang
Updated the issue with traceback
Thanks for reporting! It's due to hilbert clustering not supporting clustering on 1 column, and we should fall back to use zorder in that case.
I sent out a fix: https://github.com/delta-io/delta/pull/3109