delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG]Optimize command fails when using liquid clustering on local Delta Lake & PySpark

Open donielix opened this issue 1 year ago • 3 comments

Bug

Which Delta project/connector is this regarding?

  • [ ] Spark
  • [X] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Describe the problem

When attempting to optimize a Delta Table configured with liquid clustering, an error occurs during the execution of the optimize().executeCompaction() method.

Steps to reproduce

from pyspark.sql import SparkSession
from delta.pip_utils import configure_spark_with_delta_pip

# Initializes a SparkSession configured with Delta
builder = (
    SparkSession.builder.config(
        "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .enableHiveSupport()
)
spark= configure_spark_with_delta_pip(
    spark_session_builder=builder
).getOrCreate()

# Initialize an empty Delta Table with liquid clustering
dt = (
    DeltaTable.createIfNotExists(spark)
    .tableName("testtable")
    .addColumn("id", dataType="bigint", nullable=False)
    .addColumn("date", dataType="date", nullable=False)
    .addColumn("name", dataType="string", nullable=False)
    .addColumn("amount", dataType="double")
    .addColumn("year_month", dataType="string", nullable=False)
    .clusterBy("year_month")
    .execute()
)

# Pushes some test data into newly created Delta table
spark.sql(
    """
    INSERT INTO testtable VALUES
    (1, '2024-01-01', 'Jack', 30.5, '2024-01'),
    (2, '2024-02-10', 'Claude', 11.2, '2024-02'),
    (3, '2024-02-25', 'Mick', 10.1, '2024-02')
    """
)

# Optimizes the Delta Table (this triggers the error)
dt.optimize().executeCompaction()

Observed results

When running above snippet, I get an extended error traceback.


Py4JJavaError Traceback (most recent call last) Cell In[9], line 1 ----> 1 dt.optimize().executeCompaction()

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/delta/tables.py:1391, in DeltaOptimizeBuilder.executeCompaction(self) 1382 @since(2.0) # type: ignore[arg-type] 1383 def executeCompaction(self) -> DataFrame: 1384 """ 1385 Compact the small files in selected partitions. 1386 1387 :return: DataFrame containing the OPTIMIZE execution metrics 1388 :rtype: pyspark.sql.DataFrame 1389 """ 1390 return DataFrame( -> 1391 self._jbuilder.executeCompaction(), 1392 getattr(self._spark, "_wrapped", self._spark) # type: ignore[attr-defined] 1393 )

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.call(self, *args) 1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"):

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception..deco(*a, **kw) 177 def deco(*a: Any, **kw: Any) -> Any: 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: 181 converted = convert_exception(e.java_exception)

File ~/projects/ecoempy/.venv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o123.executeCompaction. : org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:387) at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$1(OptimizeTableCommand.scala:276) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168) at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordFrameProfile(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136) at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordOperation(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125) at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115) at org.apache.spark.sql.delta.commands.OptimizeExecutor.recordDeltaOperation(OptimizeTableCommand.scala:217) at org.apache.spark.sql.delta.commands.OptimizeExecutor.optimize(OptimizeTableCommand.scala:255) at org.apache.spark.sql.delta.commands.OptimizeTableCommand.run(OptimizeTableCommand.scala:180) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83) at org.apache.spark.sql.Dataset.(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89) at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset(AnalysisHelper.scala:92) at org.apache.spark.sql.delta.util.AnalysisHelper.toDataset$(AnalysisHelper.scala:91) at io.delta.tables.DeltaOptimizeBuilder.toDataset(DeltaOptimizeBuilder.scala:43) at io.delta.tables.DeltaOptimizeBuilder.$anonfun$execute$1(DeltaOptimizeBuilder.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:470) at io.delta.tables.DeltaOptimizeBuilder.execute(DeltaOptimizeBuilder.scala:85) at io.delta.tables.DeltaOptimizeBuilder.executeCompaction(DeltaOptimizeBuilder.scala:67) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.util.concurrent.ExecutionException: Boxed Error at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) Caused by: java.lang.AssertionError: assertion failed: Cannot do Hilbert clustering by zero or one column! at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.sql.delta.skipping.HilbertClustering$.getClusteringExpression(MultiDimClustering.scala:108) at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster(MultiDimClustering.scala:78) at org.apache.spark.sql.delta.skipping.SpaceFillingCurveClustering.cluster$(MultiDimClustering.scala:68) at org.apache.spark.sql.delta.skipping.HilbertClustering$.cluster(MultiDimClustering.scala:106) at org.apache.spark.sql.delta.skipping.MultiDimClustering$.cluster(MultiDimClustering.scala:59) at org.apache.spark.sql.delta.commands.OptimizeExecutor.runOptimizeBinJob(OptimizeTableCommand.scala:428) at org.apache.spark.sql.delta.commands.OptimizeExecutor.$anonfun$optimize$6(OptimizeTableCommand.scala:277) at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:384) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ... 8 more

Expected results

I'd expect to run the optimize command successfully

Further details

Environment information

  • Python version: 3.10.14
  • Delta Lake version: 3.2.0
  • Spark version: 3.5.1

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [X] No. I cannot contribute a bug fix at this time.

donielix avatar May 13 '24 13:05 donielix

@donielix Could you paste the full error callstack? cc. @zedtang

vkorukanti avatar May 13 '24 20:05 vkorukanti

@donielix Could you paste the full error callstack? cc. @zedtang

Updated the issue with traceback

donielix avatar May 13 '24 20:05 donielix

Thanks for reporting! It's due to hilbert clustering not supporting clustering on 1 column, and we should fall back to use zorder in that case.

I sent out a fix: https://github.com/delta-io/delta/pull/3109

zedtang avatar May 17 '24 00:05 zedtang