spark-avro icon indicating copy to clipboard operation
spark-avro copied to clipboard

spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

Open alexnastetsky opened this issue 10 years ago • 14 comments

I save my dataframe to avro with spark-avro 1.0.0 and it looks like this (using avro-tools tojson):

{"field1":"value1","field2":976200} {"field1":"value2","field2":976200} {"field1":"value3","field2":614100}

But when I use spark-avro 2.0.1, it looks like this:

{"field1":{"string":"value1"},"field2":{"long":976200}} {"field1":{"string":"value2"},"field2":{"long":976200}} {"field1":{"string":"value3"},"field2":{"long":614100}}

At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't seem to support specifying a compression codec (I want deflate).

alexnastetsky avatar Oct 14 '15 20:10 alexnastetsky

Simple repro case:

test.json:

{"field1":"foo","field2":123}
{"field1":"bar","field2":456}

(spark-shell):

scala> val df = sqlContext.read.json("test.json")
scala> df.write.format("com.databricks.spark.avro").save("test.avro")

java -jar avro-tools.jar tojson test.avro/part-r-00000-[uuid].avro

{"field1":{"string":"foo"},"field2":{"long":123}}
{"field1":{"string":"bar"},"field2":{"long":456}}

I've tried this with both spark 1.4.0 and 1.5.1.

alexnastetsky avatar Oct 14 '15 21:10 alexnastetsky

The issue is that the StructFields in the schema are being treated as nullable=true, even if you pass in false. I changed them to nullable=true in spark-avro 1.0.0 and was able to see the same weird behavior.

Does that mean this bug existed in 1.0.0 as well (albeit conditionally), or is output like this actually desirable?

alexnastetsky avatar Oct 15 '15 01:10 alexnastetsky

The reason it thinks it's nullable=true in 2.0.1 is because the DefaultSource class was changed from CreatableRelationProvider to HadoopFsRelationProvider, and in ResolvedDataSource#apply in the HadoopFsRelationProvider case, it uses dataSchema.asNullable to create the AvroRelation.

alexnastetsky avatar Oct 15 '15 02:10 alexnastetsky

Looks like 1.0.0 will suffice for me, got compression working with a simple sc.hadoopConfiguration.setBoolean("mapreduce.output.fileoutputformat.compress",true)

alexnastetsky avatar Oct 15 '15 02:10 alexnastetsky

I ran into this as well and tbh if the HadoopFsRelationProvider in spark is supposed to override explicitly set non-nulls then I think this library should revert to using CreatableRelationProvider if possible.

For a lot of use cases it is crucial that the schema is exactly as we declare it. This is especially true when working with data sets that are produced by different systems (for example a spark job over a data stream generated by python avro + batch generated avro coming from spark)

It is also kinda scary that we're outputting nullable fields when we explicitly state that they should not be... I have a feeling this issue can be tied to https://issues.apache.org/jira/browse/SPARK-11319

tyro89 avatar Nov 26 '15 19:11 tyro89

To reproduce: (Note that nullable is set to False)

$ bin/pyspark --packages com.databricks:spark-avro_2.10:2.0.1
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0-SNAPSHOT
      /_/

Using Python version 3.5.0 (default, Sep 14 2015 02:37:27)
SparkContext available as sc, SQLContext available as sqlContext.
>>> from pyspark.sql.types import *
>>> df = sqlContext.createDataFrame([("value",)], StructType([StructField("key", StringType(), False)]))
>>> df.write.format("com.databricks.spark.avro").save("/tmp/test")

And then upon inspecting the avro file you will find the following schema:

{"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":["string","null"]}]}

tyro89 avatar Nov 26 '15 19:11 tyro89

How can I resolve this issue since we have spark 1.6.0? Does [email protected] works with spark 1.6?

jatinder85 avatar Sep 12 '16 16:09 jatinder85

This issue is still present in Spark v2.0.2 with spark-avro v3.1.0. Specifying custom schema while reading is available but the schema for writing still makes all the fields nullable and outputs them as unions.

gauravkumar37 avatar Dec 08 '16 18:12 gauravkumar37

So my hunch here is that the problem lies somewhere in our schema conversion of Spark SQL structs to avro schemas: https://github.com/databricks/spark-avro/blob/branch-3.1/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala#L113

I don't have time to debug this at the moment, but I'd be happy to provide pointers and assistance to anyone who does; I'm also happy to help review patches for this.

What would be extremely helpful is if someone could come up with a set of failing regression tests and submit them as a PR against our current test suites; this would pave the way for rapid iteration on a fix.

JoshRosen avatar Dec 10 '16 18:12 JoshRosen

As you can see from the PR, the problem does not seem to be in SchemaConverters, as the schema changes from the one provided to the dataframe and the one written to the avro file.

ahobson avatar Feb 13 '17 15:02 ahobson

I believe the issue is

DefaultSource extends HadoopFsRelationProvider

and in spark

https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala#L112

For all datasources extending from HadoopFsRelationProvider, all columns are nullable. This looks to be a very bad issue and we are not sure why spark is doing this?

rlachu avatar May 03 '17 15:05 rlachu

@rlachu You are correct. On spark master: https://github.com/apache/spark/blob/267aca5bd5042303a718d10635bc0d1a1596853f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L403.

On 403 line we see that any dataSchema created via a non-streaming FileFormat is blanket converted to a nullable version. Anything that uses FileFormat to read/write data will have it's schema transformed to allow nulls.

ghost avatar Aug 01 '17 21:08 ghost

Is there a work around for this issue?

apatry avatar Dec 01 '17 02:12 apatry

workaround is to set nullable=false on the fields. for an example of how to change nullability of fields on a DF, see https://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe

alexnastetsky avatar Jan 29 '18 18:01 alexnastetsky