[VL] A test case in GlutenParquetFilterSuite fails when IOThreads is off
Related to https://github.com/apache/incubator-gluten/pull/7165
Error:
2024-09-09T09:36:39.2455195Z - Gluten - Filter applied on merged Parquet schema with new column should work *** FAILED ***
2024-09-09T09:36:39.2456279Z Parquet-mr reader
2024-09-09T09:36:39.2456759Z Results do not match for query:
2024-09-09T09:36:39.2461008Z Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
2024-09-09T09:36:39.2462947Z Timezone Env:
2024-09-09T09:36:39.2463188Z
2024-09-09T09:36:39.2463408Z == Parsed Logical Plan ==
2024-09-09T09:36:39.2463745Z 'Project ['s]
2024-09-09T09:36:39.2464121Z +- Filter (s#237716.c = 1)
2024-09-09T09:36:39.2465076Z +- RelationV2[s#237716] parquet file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table3,file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table4
2024-09-09T09:36:39.2465883Z
2024-09-09T09:36:39.2466100Z == Analyzed Logical Plan ==
2024-09-09T09:36:39.2466395Z s: struct<a:int,c:int>
2024-09-09T09:36:39.2466765Z Project [s#237716]
2024-09-09T09:36:39.2467064Z +- Filter (s#237716.c = 1)
2024-09-09T09:36:39.2467968Z +- RelationV2[s#237716] parquet file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table3,file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table4
2024-09-09T09:36:39.2468791Z
2024-09-09T09:36:39.2469007Z == Optimized Logical Plan ==
2024-09-09T09:36:39.2469457Z Filter (isnotnull(s#237716.c) AND (s#237716.c = 1))
2024-09-09T09:36:39.2470448Z +- RelationV2[s#237716] parquet file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table3,file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table4
2024-09-09T09:36:39.2471476Z
2024-09-09T09:36:39.2471682Z == Physical Plan ==
2024-09-09T09:36:39.2471948Z VeloxColumnarToRowExec
2024-09-09T09:36:39.2472330Z +- ^(16300) ProjectExecTransformer [s#237716]
2024-09-09T09:36:39.2472919Z +- ^(16300) FilterExecTransformer (isnotnull(s#237716.c) AND (s#237716.c = 1))
2024-09-09T09:36:39.2475560Z +- ^(16300) BatchScanExecTransformer[s#237716] ParquetScan DataFilters: [isnotnull(s#237716.c), (s#237716.c = 1)], Format: parquet, Location: InMemoryFileIndex(2 paths)[file:/tmp/spark-45aa5e8a-4d2c-47b2-ac58-1aa3c2322dc9/table3, file:/tmp..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [IsNotNull(s.c), EqualTo(s.c,1)], PushedGroupBy: [], ReadSchema: struct<s:struct<a:int,c:int>>, PushedFilters: [IsNotNull(s.c), EqualTo(s.c,1)], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: []
2024-09-09T09:36:39.2477696Z
2024-09-09T09:36:39.2477892Z == Results ==
2024-09-09T09:36:39.2478110Z
2024-09-09T09:36:39.2478298Z == Results ==
2024-09-09T09:36:39.2478643Z !== Correct Answer - 1 == == Spark Answer - 0 ==
2024-09-09T09:36:39.2479029Z struct<> struct<>
2024-09-09T09:36:39.2479357Z ![[null,1]] (QueryTest.scala:244)
2024-09-09T09:36:39.2682483Z +---+---+
2024-09-09T09:36:39.2682926Z | a| b|
2024-09-09T09:36:39.2683339Z +---+---+
2024-09-09T09:36:39.2683697Z | 1| 1|
2024-09-09T09:36:39.2684039Z | 2| 0|
2024-09-09T09:36:39.2684368Z | 3| 1|
2024-09-09T09:36:39.2684697Z | 4| 0|
2024-09-09T09:36:39.2685018Z | 5| 1|
2024-09-09T09:36:39.2685403Z +---+---+
Log: https://productionresultssa5.blob.core.windows.net/actions-results/8966a77b-436d-4c02-a405-c38f94ce497f/workflow-job-run-5566dbed-4228-5701-78de-60eacfd83432/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-10T01%3A17%3A30Z&sig=0KjdkINRN97W1Vz0XhfmWet%2B%2Ftw5JujK6ERIZa8DsE0%3D&ske=2024-09-10T10%3A05%3A26Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-09T22%3A05%3A26Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-10T01%3A07%3A25Z&sv=2024-05-04
In the UT, after a split is processed, its scanSpec changes from root (s (a, c, ), ) to root (s (a, c constant, ), ).
when IOThreads is on, split preload feature works, scanSpec will be set by moveAdaptationFrom, and the constant filter meta of current datasource's scanSpec does not affect preloaded datasource's scanSpec, it's ok.
when IOThreads is off, only one datasource, second split with root (s (a, c constant, ), ) produce null.
root cause is Spark merge two parquet part file's schema when spark.sql.parquet.mergeSchema=true, file1 schema is s struct<a int>, file2 schema is s struct<c int>, merged schema is s struct<a int, c int>.
filter s.c = 1 generate GetStructFields(s, 1) expr, Velox will set missing column to null.
root cause is Spark merge two parquet part file's schema when
spark.sql.parquet.mergeSchema=true, file1 schema iss struct<a int>, file2 schema iss struct<c int>, merged schema iss struct<a int, c int>.filter
s.c = 1generate GetStructFields(s, 1) expr, Velox will set missing column to null.
Thank you for your detailed explanation. Looks super helpful.