delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG] UnsupportedOperationException when reading a table with deletion vectors

Open zerodarkzone opened this issue 2 years ago • 5 comments

Bug

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Describe the problem

I have a delta table with deletion vectors and with the following features enabled written by Databricks (Databricks-Runtime/13.3.x-photon-scala2.12) image

When I try to read it using pyspark 3.4.1 with delta-lake 2.4.0 which I think is compliant with the requested reader protocol versión, I'm getting a java.lang.UnsupportedOperationException error.

Steps to reproduce

  1. Write a table with deletion vectors enabled using Databricks-Runtime/13.3.x-photon-scala2.12
  2. Try to read that table using the open source delta-lake package versión 2.4.0

Observed results

When trying to do a simple "show" command, it responds with the following error:

23/10/27 15:15:57 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 356)]
java.lang.UnsupportedOperationException
	at org.apache.spark.sql.vectorized.ColumnarBatchRow.update(ColumnarBatchRow.java:193)
	at org.apache.spark.sql.catalyst.InternalRow.setLong(InternalRow.scala:50)
	at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$13(DeltaParquetFileFormat.scala:268)
	at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$13$adapted(DeltaParquetFileFormat.scala:266)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.delta.DeltaParquetFileFormat.$anonfun$iteratorWithAdditionalMetadataColumns$8(DeltaParquetFileFormat.scala:266)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator$$anon$1.next(RecordReaderIterator.scala:62)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Expected results

It should show the content of the table.

Environment information

  • Delta Lake version: 2.4.0
  • Spark version: 3.4.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [x] No. I cannot contribute a bug fix at this time.

zerodarkzone avatar Oct 27 '23 20:10 zerodarkzone

It seems like the API InternalRow.setLong() doesn't work in certain cases. Do you have any settings related to the parquet reader when reading from Delta 2.4? If there is a small repro case, it would help us debug the issue.

vkorukanti avatar Oct 27 '23 21:10 vkorukanti

Hi, I'm gonna try to setup a small repo to replicate de issue. But for now, these are some of the configurations we are using related to parquet.

spark.sql.legacy.timeParserPolicy: LEGACY
spark.sql.parquet.int96RebaseModeInRead: CORRECTED
spark.sql.parquet.int96RebaseModeInWrite: CORRECTED
spark.sql.parquet.datetimeRebaseModeInWrite: CORRECTED
spark.sql.parquet.datetimeRebaseModeInRead: CORRECTED

zerodarkzone avatar Oct 27 '23 21:10 zerodarkzone

@zerodarkzone do you have spark.sql.parquet.enableVectorizedReader disabled? Also, what are the data types of columns in your table?

vkorukanti avatar Oct 27 '23 23:10 vkorukanti

I dont have it disabled. We have integers, strings, timestamps, dates, some decimals and two columns in particular are an array of structs which contains some decimals and strings.

zerodarkzone avatar Oct 30 '23 17:10 zerodarkzone

@andreaschat-db was able to repro this issue. This is happening for wide tables which have more than 100 columns. When we are reading more than 100 columns, Sparks code generator makes a decision (1, 2) to not use the codegen. When not using codegen, Spark sets options to get rows instead of columnar batches from the Parquet reader. This causes the vectorized Parquet reader to return row abstraction over each column in the columnar batch. This row abstraction doesn't allow modification of the contents.

I will be posting a fix shortly. A couple of workarounds:

  1. disable the vectorized parquet reader: spark.sql.parquet.enableVectorizedReader=false
  2. Set the table width threshold for codegen to a high number (depending upon the number of columns in your table): spark.sql.codegen.maxFields (default value is 100).

vkorukanti avatar Dec 19 '23 16:12 vkorukanti