[BUG] `from_json` generated inconsistent result comparing with CPU for input column with nested json strings
Describe the bug
from_json generated inconsistent result comparing with CPU for input column with nested json strings.
Steps/Code to reproduce bug Here is a repro case:
scala> spark.conf.set("spark.rapids.sql.expression.JsonToStructs", "true")
scala> import org.apache.spark.sql.types.{IntegerType,StringType,StructType,StructField}
scala> import org.apache.spark.sql.Row
scala> val data = Seq(Row(1, """{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}""", """{"time":"26/08/2015"}""", """{"Zipcode":[706,123,4646],"ZipCodeType":"SPECIAL","City":"San Jose","State":[{"Zipcode":706,"ZipCodeType":"SPECIAL","City":"San Jose","State":"CA"}]}"""), Row(2, """{"Zipcode":706,"ZipCodeType":"SPECIAL","City":"San Jose","State":"CA"}""", """{"time":"01/08/2015"}""", ""), Row(3, "", "", ""), Row(4, null, null, null))
scala> val schema = StructType(Array(StructField("id",IntegerType,true), StructField("json",StringType,true), StructField("time_strF",StringType,true), StructField("nest_jsonF",StringType,true)))
scala> val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
scala> val out_schema = StructType(Array(StructField("Zipcode",StringType,true), StructField("ZipCodeType",StringType,true), StructField("City",StringType,true), StructField("State",StringType,true)))
scala> val out_struct = df.withColumn("output", from_json(col("nest_jsonF"), out_schema))
scala> out_struct.select("output").show(false)
For GPU, the output is:
+---------------------------------------------------------------------+
|output |
+---------------------------------------------------------------------+
|{[706, 123, 4646], SPECIAL, San Jose, [{706, SPECIAL, San Jose, CA}]}|
|null |
|null |
|null |
+---------------------------------------------------------------------+
For CPU, the output is:
+-------------------------------------------------------------------------------------------------------------+
|output |
+-------------------------------------------------------------------------------------------------------------+
|{[706,123,4646], SPECIAL, San Jose, [{"Zipcode":706,"ZipCodeType":"SPECIAL","City":"San Jose","State":"CA"}]}|
|null |
|null |
|null |
+-------------------------------------------------------------------------------------------------------------+
Expected behavior Same as above CPU output.
Environment details (please complete the following information)
- Environment location: Standalone
- Spark 3.3.1
I just picked this issue up and ran the test in the issue description. The behavior has changed since this issue was filed. The query falls back to CPU due to:
cannot run on GPU because expression JsonToStructs from_json(StructField(Zipcode,StringType,true), StructField(ZipCodeType,StringType,true), StructField(City,StringType,true), StructField(State,StringType,true), nest_jsonF#7, Some(UTC))
produces an unsupported type StructType(StructField(Zipcode,StringType,true),StructField(ZipCodeType,StringType,true),StructField(City,StringType,true),StructField(State,StringType,true));
from_json on GPU only supports MapType<StringType, StringType> input schema
Changing the output schema to Map<String,String> fails.
val out_struct = df.withColumn("output", from_json(col("nest_jsonF"), DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)))
out_struct.select("output").show(false)
Caused by: ai.rapids.cudf.CudfException: CUDF failure at: /home/jenkins/agent/workspace/jenkins-spark-rapids-jni_nightly-pre_release-184-cuda11/src/main/cpp/src/map_utils.cu:94: Incorrect output size computation.
The first issue here is that GpuOverrides checks are incorrect and disable the JSON to struct functionality. We should allow StructType here. This change was introduced in https://github.com/nvidia/spark-rapids/commit/2b2835ef36c0b6cf10363c7e730501f6f1213d7d
(a, conf, p, r) => new UnaryExprMeta[JsonToStructs](a, conf, p, r) {
override def tagExprForGpu(): Unit =
a.schema match {
case MapType(_: StringType, _: StringType, _) => ()
case _ =>
willNotWorkOnGpu("from_json on GPU only supports MapType<StringType, StringType> " +
"input schema")
}
Ok, I now realize I have gone full circle on this and now understand why the tests were xfailed and why the feature was disabled.
To support parsing JSON to struct and to support reading some parts of the JSON as string (per the example here), we will need something like https://github.com/rapidsai/cudf/issues/14239
we ask to read the state column as a string, but cuDF returns List<Struct> and we do not have access to the JSON key names, just the values.
COLUMN 3 - LIST
OFFSETS
0 NULL
1 [0 - 1)
2 NULL
COLUMN 3:DATA - STRUCT
COLUMN 3:DATA:CHILD_0 - INT64
0 706
COLUMN 3:DATA:CHILD_1 - STRING
0 "SPECIAL" 5350454349414c
COLUMN 3:DATA:CHILD_2 - STRING
0 "San Jose" 53616e204a6f7365
COLUMN 3:DATA:CHILD_3 - STRING
0 "CA" 4341
We either need cuDF to return this as unparsed string or we need to implement our own parsing, using cuDF for the JSON tokenizer.
The C++ for the JSON parser returns a table_with_metadata. https://github.com/rapidsai/cudf/blob/29556a2514f4d274164a27a80539410da7e132d6/cpp/include/cudf/io/types.hpp#L231
We strip off much of the metadata to try and make the API consistent with the other reader APIs that just return the data in the same layout as the schema we passed in. You could use that, but then what happens if you have mixed data types? Like if one of the rows it happens to be a string and the others are structs? I think the only long term solution is to have separate processing for JSOn after the tokenization similar to what we do for map. The special map processing code already does this. But in speaking with some people in CUDF they are going to investigate if this is something that they want to support themselves or if we just need to write our own parser after the CUDF tokenization. There are enough differences already that I am leaning towards our own custom parsing.
I started on a prototype for this issue in https://github.com/NVIDIA/spark-rapids/pull/10326 and this needs updating now that https://github.com/rapidsai/cudf/pull/14954 has been merged
With the most recent changes (including https://github.com/NVIDIA/spark-rapids/pull/10575) in we are now getting an exception instead of the wrong data.
With spark.rapids.sql.json.read.mixedTypesAsString.enabled set to true or false we get back
Caused by: java.lang.IllegalStateException: Don't know how to transform ColumnVector{rows=1, type=LIST, nullCount=Optional.empty, offHeap=(ID: 98 7f7af001a3a0)} to StringType for JSON
at org.apache.spark.sql.rapids.GpuJsonReadCommon$.throwMismatchException(GpuJsonReadCommon.scala:273)
at org.apache.spark.sql.rapids.GpuJsonReadCommon$.nestedColumnViewMismatchTransform(GpuJsonReadCommon.scala:294)
at org.apache.spark.sql.rapids.GpuJsonReadCommon$.$anonfun$convertToDesiredType$1(GpuJsonReadCommon.scala:317)
at com.nvidia.spark.rapids.ColumnCastUtil$.$anonfun$deepTransformView$9(ColumnCastUtil.scala:135)
at scala.Option.map(Option.scala:230)
at com.nvidia.spark.rapids.ColumnCastUtil$.$anonfun$deepTransformView$1(ColumnCastUtil.scala:134)
https://github.com/rapidsai/cudf/issues/15278 is the issue that was filed to fix it in CUDF.
@andygrove do you still plan on trying to fix this?
@andygrove do you still plan on trying to fix this?
I am not actively working on this, so have unassigned myself.
Now the issue seems already being fixed somehow:
24/09/24 03:57:25 WARN GpuOverrides:
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(from_json(StructField(Zipcode,StringType,true), StructField(ZipCodeType,StringType,true), StructField(City,StringType,true), StructField(State,StringType,true), nest_jsonF#7, Some(UTC)) as string) AS output#56 will run on GPU
*Expression <Cast> cast(from_json(StructField(Zipcode,StringType,true), StructField(ZipCodeType,StringType,true), StructField(City,StringType,true), StructField(State,StringType,true), nest_jsonF#7, Some(UTC)) as string) will run on GPU
*Expression <JsonToStructs> from_json(StructField(Zipcode,StringType,true), StructField(ZipCodeType,StringType,true), StructField(City,StringType,true), StructField(State,StringType,true), nest_jsonF#7, Some(UTC)) will run on GPU
+-------------------------------------------------------------------------------------------------------------+
|output |
+-------------------------------------------------------------------------------------------------------------+
|{[706,123,4646], SPECIAL, San Jose, [{"Zipcode":706,"ZipCodeType":"SPECIAL","City":"San Jose","State":"CA"}]}|
|null |
|null |
|null |
+-------------------------------------------------------------------------------------------------------------+
Close as fixed.