spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] GPU Parquet scan filter pushdown fails with timestamp/INT96 column

Open tgravescs opened this issue 1 year ago • 2 comments

Describe the bug We are seeing an issue on Databricks 10.4 Rapids plugin 24.02 where a query is failing when doing predicate pushdown filtering on a column that is a timestamp/INT96. The exception is:

 java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3(GpuParquetScan.scala:1252)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3$adapted(GpuParquetScan.scala:1252)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.buildBaseColumnarReaderForCoalescing(GpuParquetScan.scala:1252)
	at com.nvidia.spark.rapids.MultiFilePartitionReaderFactoryBase.createColumnarReader(GpuMultiFileReader.scala:281)
	at com.nvidia.spark.rapids.shims.GpuDataSourceRDD.compute(GpuDataSourceRDD.scala:59)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:161)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:95)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
	at org.apache.parquet.filter2.predicate.ValidTypeMap.assertTypeValid(ValidTypeMap.java:125)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:179)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
	at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:120)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
	at org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:306)
	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
	at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
	at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
	at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:930)
	at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:735)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$9(GpuParquetScan.scala:758)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$1(GpuParquetScan.scala:753)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.filterBlocks(GpuParquetScan.scala:689)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.com$nvidia$spark$rapids$GpuParquetMultiFilePartitionReaderFactory$$filterBlocksForCoalescingReader(GpuParquetScan.scala:1179)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.$anonfun$call$1(GpuParquetScan.scala:1216)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1215)
	at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1204)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more

It happens with

tgravescs avatar Oct 17 '24 19:10 tgravescs

I think different files might have different parquet schemas for this column. I'm not sure if this is part of the problem or not, just noticed it

24/10/17 20:10:53 INFO MultiFileCloudParquetPartitionReader: File schema for the next file XXX.snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
  optional int64 clientTs (TIMESTAMP(MICROS,true));
 ...
}
) doesn't match current YYYY .snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
  optional int96 clientTs;
  ...
}

tgravescs avatar Oct 17 '24 20:10 tgravescs

Tracked down this issue, it's caused by the Parquet scan mutating the same Hadoop configuration instance that was received from the broadcast. When files don't have the same schema, it can cause the filter to change. INT96 is not a column that Spark can push down filters for, so if one file has a column as INT96 and another file has the same column as TIMESTAMP(MICROS) then the first file will have the filter conditions on the INT96 column removed, but the latter file will not. Since the filter is posted to the Hadoop configuration as part of Parquet read processing, there's a race with multiple threads mutating the same conf and potentially causing a filter referencing the INT96 column to try to be processed, triggering the crash.

jlowe avatar Oct 24 '24 14:10 jlowe