spark-rapids icon indicating copy to clipboard operation
spark-rapids copied to clipboard

[BUG] Spark UT framework: partial schema intersection - select missing subfield causes java.lang.IndexOutOfBoundsException

Open Feng-Jiang28 opened this issue 1 year ago • 3 comments

Description:

This bug is similar as the https://github.com/NVIDIA/spark-rapids/issues/11619 contacts parquet is defined as following and has saved here: contacts.zip

+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+
|id |name                |address        |pets|friends             |relatives                   |employer                       |relations                   |p  |
+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+
|0  |{Jane, X., Doe}     |123 Main Street|1   |[{Susan, Z., Smith}]|{brother -> {John, Y., Doe}}|{0, {abc, 123 Business Street}}|{{John, Y., Doe} -> brother}|1  |
|1  |{John, Y., Doe}     |321 Wall Street|3   |[]                  |{sister -> {Jane, X., Doe}} |{1, null}                      |{{Jane, X., Doe} -> sister} |1  |
|2  |{Janet, null, Jones}|567 Maple Drive|null|null                |null                        |null                           |null                        |2  |
|3  |{Jim, null, Jones}  |6242 Ash Street|null|null                |null                        |null                           |null                        |2  |
+---+--------------------+---------------+----+--------------------+----------------------------+-------------------------------+----------------------------+---+

Code to reproduce:

val dataSourceName = "parquet"
val path = "/home/fejiang/Desktop"

val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
  "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
  "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>,STRING>,`p` INT")
spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")
val query = spark.sql("select name.middle, address from contacts where p=2")
query.show()

Spark:

scala> val dataSourceName = "parquet"
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> 

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> val query = spark.sql("select name.middle, address from contacts where p=2")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
+------+---------------+                                                        
|middle|        address|
+------+---------------+
|  null|567 Maple Drive|
|  null|6242 Ash Street|
+------+---------------+


Rapids:

scala> val dataSourceName = "parquet"
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> 

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> val query = spark.sql("select name.middle, address from contacts where p=2")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
24/10/17 13:49:40 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> name#1.middle AS middle#21 will run on GPU
      *Expression <GetStructField> name#1.middle will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/10/17 13:49:41 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 2)    
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:1493)
	at org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:1450)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:582)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:527)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:521)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$5(GpuParquetScan.scala:709)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$4(GpuParquetScan.scala:705)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuParquetFileFilterHandler.$anonfun$filterBlocks$1(GpuParquetScan.scala:704)

Feng-Jiang28 avatar Oct 17 '24 09:10 Feng-Jiang28

Spark is producing an incorrect answer. Why is it only returning a middle column? also why is it showing a value for all rows and ignoring the p=2 filter? There is an issue with our code that we need to look into. I just want to be sure that there is not something in Spark that is horribly wrong too. What version of Spark did you use for this test?

revans2 avatar Oct 17 '24 14:10 revans2

Okay I just ran the query on Spark 3.4.2 and I get the answer that I would expect.

    / __/__  ___ _____/ /__
   _\ \/ _ \/ _ `/ __/  '_/
  /___/ .__/\_,_/_/ /_/\_\   version 3.4.2
     /_/
        
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_422)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val dataSourceName = "parquet"
dataSourceName: String = parquet

scala> val path = ".../contacts"
path: String = .../contacts

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
    |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
    |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
    |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
    |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
    |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path).createOrReplaceTempView("contacts")

scala> val query = spark.sql("select name.middle, address from contacts where p=2")
query: org.apache.spark.sql.DataFrame = [middle: string, address: string]

scala> query.show()
+------+---------------+                                                        
|middle|        address|
+------+---------------+
|  null|567 Maple Drive|
|  null|6242 Ash Street|
+------+---------------+

@Feng-Jiang28 was that a copy/paste error? Or were you using a different version of Spark?

revans2 avatar Oct 17 '24 14:10 revans2

@revans2 It was a copy paste issue, thanks for pointing it out.

Feng-Jiang28 avatar Oct 18 '24 08:10 Feng-Jiang28