[BUG] GPU Parquet scan filter pushdown fails with timestamp/INT96 column
Describe the bug We are seeing an issue on Databricks 10.4 Rapids plugin 24.02 where a query is failing when doing predicate pushdown filtering on a column that is a timestamp/INT96. The exception is:
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3(GpuParquetScan.scala:1252)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.$anonfun$buildBaseColumnarReaderForCoalescing$3$adapted(GpuParquetScan.scala:1252)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.buildBaseColumnarReaderForCoalescing(GpuParquetScan.scala:1252)
at com.nvidia.spark.rapids.MultiFilePartitionReaderFactoryBase.createColumnarReader(GpuMultiFileReader.scala:281)
at com.nvidia.spark.rapids.shims.GpuDataSourceRDD.compute(GpuDataSourceRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:161)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: FilterPredicate column: clientTs's declared type (java.lang.Long) does not match the schema found in file metadata. Column clientTs is of type: INT96
Valid types for this column are: [class org.apache.parquet.io.api.Binary]
at org.apache.parquet.filter2.predicate.ValidTypeMap.assertTypeValid(ValidTypeMap.java:125)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:179)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:149)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:89)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:192)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:120)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:56)
at org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:306)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:61)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:930)
at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:735)
at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$9(GpuParquetScan.scala:758)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$1(GpuParquetScan.scala:753)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:29)
at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.filterBlocks(GpuParquetScan.scala:689)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory.com$nvidia$spark$rapids$GpuParquetMultiFilePartitionReaderFactory$$filterBlocksForCoalescingReader(GpuParquetScan.scala:1179)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.$anonfun$call$1(GpuParquetScan.scala:1216)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1215)
at com.nvidia.spark.rapids.GpuParquetMultiFilePartitionReaderFactory$CoalescingFilterRunner.call(GpuParquetScan.scala:1204)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
It happens with
I think different files might have different parquet schemas for this column. I'm not sure if this is part of the problem or not, just noticed it
24/10/17 20:10:53 INFO MultiFileCloudParquetPartitionReader: File schema for the next file XXX.snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
optional int64 clientTs (TIMESTAMP(MICROS,true));
...
}
) doesn't match current YYYY .snappy.parquet schema ParquetSchemaWrapper(message spark_schema {
optional int96 clientTs;
...
}
Tracked down this issue, it's caused by the Parquet scan mutating the same Hadoop configuration instance that was received from the broadcast. When files don't have the same schema, it can cause the filter to change. INT96 is not a column that Spark can push down filters for, so if one file has a column as INT96 and another file has the same column as TIMESTAMP(MICROS) then the first file will have the filter conditions on the INT96 column removed, but the latter file will not. Since the filter is posted to the Hadoop configuration as part of Parquet read processing, there's a race with multiple threads mutating the same conf and potentially causing a filter referencing the INT96 column to try to be processed, triggering the crash.