spark-avro
spark-avro copied to clipboard
'Unknown datum type' exception generated by ArrayType elements
Hello,
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type [Ljava.lang.Object;: [Ljava.lang.Object;@616f81b5 at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:636) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:265) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:601) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
The above exception is triggered in the following scenario:
case class Model(params: Option[List[String]])
The above case class generates the following schema:
{"type":"record","name":"Model","namespace":"Test","fields":[{"name":"params","type":[{"type":"array","items":["string","null"]},"null"]}]}
Now, when I create my converterToAvro:
val structType: StructType = Encoders.product[Model].schema
val converter = createConverterToAvro(structType, recordName, recordNamespace)
...and try to generate my genericRecord:
val record: GenericRecord = converter(item).asInstanceOf[GenericRecord]
...I get the above exception!
This happens because in the implementation of AvroOutputWriter.createConverterToAvro
, in the case ArrayType
, we have the following:
val targetArray = new Array[Any](sourceArraySize)
...and GenericData.getSchemaName
does this check:
if (isArray(datum)) return Type.ARRAY.getName();
protected boolean isArray(Object datum) { return datum instanceof Collection;}
Now scala.Array
is not an instance of Collection
and it will fail gracefully.
In order to fix this, we can use java.util.ArrayList
!!!
We also bumped into this, it appears to be fixed since the merge in upstream Spark by using ArrayList
instead of Array
https://github.com/apache/spark/commit/96030876383822645a5b35698ee407a8d4eb76af#diff-01fea32e6ec6bcf6f34d06282e08705aR103