Close #169 and add support for DataSet of Avro records
@bdrillard you did an amazing job getting this working and I hope you'll have a look at the slight refactoring I've done to get this over the finish line:
- Works with multiple versions of Spark (2.0.x and 2.1.x so far)
- Travis test matrix should be updated appropriately
- Works with multiple versions of Avro
The main challenge was getting past Spark version incompatibilities. I've done that through the addition of SBT modules, liberal use of reflection and some straight up copy/paste. I've tried to put proper fences around the hacks.
Let me know what you guys think! Again, all credit to @bdrillard since this is 90% his work.
Cheers!
Codecov Report
Merging #217 into master will increase coverage by
2.45%. The diff coverage is95.56%.
@@ Coverage Diff @@
## master #217 +/- ##
==========================================
+ Coverage 90.71% 93.16% +2.45%
==========================================
Files 5 7 +2
Lines 334 688 +354
Branches 50 73 +23
==========================================
+ Hits 303 641 +338
- Misses 31 47 +16
@themodernlife thanks for putting this pull-request together, I certainly didn't have the SBT expertise necessary to handle multiple builds of Spark, so this additional work is just what the encoder needed. I had a look at the inlining of additional object types that aren't common to both 2.0.0 and 2.1.x, as well as the serializing of schemas (when used in parsing complex unions), those changes look fine to me. I noticed just a few stray commented-out lines from what may have been some refactoring experiments and a few style points in doc comments. Otherwise this looks great, thanks for the additional legwork on this issue.
I'm trying to add the following test based on this PR, but I get a org.apache.spark.SparkException: Task not serializable caused by Cause: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
Here the full stack trace https://gist.github.com/gcpagano/e95a351dc75c4a6ecd2a50fdd7665e90
I'm not sure if this use case wasn't meant to be supported in this PR or maybe I'm doing something wrong in the test code.
test("create Dataset from GenericRecord") {
val sparkSession = spark
import sparkSession.implicits._
val schema: Schema =
SchemaBuilder
.record("GenericRecordTest")
.namespace("com.databricks.spark.avro")
.fields()
.requiredString("field1")
.endRecord()
implicit val enc = AvroEncoder.of[GenericData.Record](schema)
val genericRecords = (1 to 10) map { i =>
new GenericRecordBuilder(schema)
.set("field1", "field-" + i)
.build()
}
val rdd: RDD[Record] = sparkSession.sparkContext
.parallelize(genericRecords)
val ds = rdd.toDS()
assert(ds.count() == genericRecords.size)
}
Hmm... ran into this with SpecificRecord as well. The problem is toDS runs the encoder in mapPartitions. In mapPartitions we check that the encoder's entire expression tree is serializable. Some of the nodes in the expression tree hold a reference to the Avro Schema which is not, and you get errors like the above.
I'm looking into a way to solve this. I have a fix for your unit test, but looking at the code I'm worried there could be other runtime errors scattered about.
@gcpagano pushed change for your unit test and added coverage for fixed and enum cases. Let me know if you have any feedback.
LGTM
Been watching this PR for a while. Any reason to hold back on this issue?
@themodernlife Could you check and resolve this last merge conflict? It appears to be pretty minor. @JoshRosen Would it be possible for us to merge this PR soon?
@bdrillard ok merge conflicts should be fixed now. LMK any other feedback.
FWIW, we've been using this branch for months to bring a large collection of complex Avro records (hundreds of fields nested several layers deep) into Spark, and it has worked great. We'd love to see this in spark-avro properly.
+1. This would be a very welcomed addition.
Would love to see this one get in soon. Is there anything blocking this moving forwards? Do we need to nag someone on this?
This would be really great to have! @yhuai @rxin
+1 for merge - what is left to do to get this merged? Is there any help I can offer? @JoshRosen
It seems to me that Databricks doesn't really care about Avro support that much, which is a shame as we move towards a full streaming-based world. I have since moved on to alternative solutions as it's clear they don't care to support this project. Last commit merged is back in February.
Unfortunately, I cannot agree with you more. There are several feature-rich pull requests that are tested, commented on, and simply waiting to be merged that would continue the value of this project. This has languished and, like many others, we've abandoned this library.
@JoshRosen - If you are not maintaining this project anymore, can you indicate as such and ideally hand it off to someone else?
@bellemare, @mnarrell what alternatives to spark-avro are you using?
As an aside, why is this is not part of Spark, like Parquet? This is not a loaded question. I really want to know.
And there is no Avro alternative or is there? I mean something that works for large batch files, for web service streams, binary, with external schema, schema evolution, usable for single records processing like in streaming. Would you guys use Thrift or Protocol Buffers if it would be just about streaming then?
I am currently evaluating options for an enterprise customer and that Avro is not part of Spark also may mean that you don’t get support. Say you bought an Hadoop distribution from a vendor and then run into a production issue the vendor may well say that this part is not what they signed up for :(
I hope it’s ok to discuss this here.
@racc No, alternatives to Avro. We don't use structured streaming, or at least dataframes with high speed data. We'll tolerate RDD[T] there. Data at rest is parquet.
@mnarrell ok well I'm interested in this PR because it fixes this issue: https://stackoverflow.com/questions/36532237/infinite-recursion-in-createdataframe-for-avro-types
Basically you can't bean encode an RDD of an Avro Generated class to a Dataset because of the getSchema() method. Much of the data we work with gets transformed to Avro Generated classes because it's convenient to write an AVDL and then generate a Hive table from the Avro schema - keeps defining the data model in one place.
That's why I was asking about alternatives to spark-avro for encoding Avro data :)
Out of curiosity, why do you use avro formats rather than a more modern one like Parquet? Just asking so I can better understand the use cases.
@rxin Mainly for the convenience use-case described above. Generating a class from an Avro Schema (AVDL) means we can parse our data into the generated POJOs and then write them out to whatever format (Avro, Parquet) etc. Since a Hive table can be created from an Avro Schema, we don't have to maintain a Hive DDL when the schema changes. If there's a more convenient alternative to using Avro generated classes I'm open to hearing about it.
I think that Avro is used in conjunction with Kafka too. People tend to send messages in the Avro format and then use a Schema registry which defines the types of messages being sent (https://github.com/confluentinc/schema-registry).
One other minor advantage of the Avro format is that you can append data to the file - which helps mitigate the small files problem on the HDFS. Otherwise with Parquet you'd probably need to do some periodic consolidation of the files.
@rxin to add our use case in, we use avro over kafka, and this PR lets us easily work with structured streaming to read from kafka and write to our hdfs cluster. We have a secondary etl process that consolidates the avro files into parquet. Being able to append data is a big reason why we use avro.
@rxin In addition to the above use cases, we have several petabytes of Avro files and many dozens of jobs using them (including use of specific records). This Avro encoder allows us to migrate to Spark datasets incrementally, which is important because a single, large move isn't doable in this case.
(I work with @bdrillard, who did the bulk of the work on this PR. We're using it in an internal fork in production and it's working well.)
@racc - Though we still use Spark for processing Parquet files, we've moved largely towards just using Avro with Kafka and KStreams. The only real need for the large batch processing is for historical analysis or ML training. In reality most of our data pipelines should be stream-based (faster response, more accurate up-to-the-minute reports); Kafka with Avro facilitates this nicely. Currently we do sink Avro data to S3 using Kafka Connect, and we're evaluating if we even need to convert to Parquet at all to do massive batch processing.
My 2 cents is to ditch Spark completely when it comes to processing streams. Kafka is just light years ahead, and with new improvements to Kafka Confluent you'll be able to do historic batch processing too. Kafka SQL is also coming out fairly soon, which will basically mimic Spark DataFrames but for streaming processes AND for the historical data. IMO Spark is going down the wrong path by not doing enough to facilitate proper streaming with Avro-encoded data, and I can't imagine going back to it for anything other than batch-only ML or historical analysis with existing code.
Sorry for the delay! I apologize that this repo has been ignored for a few months. It is not abandoned and we realize that avro support is important. Thanks for voicing your concerns!
We have made sure to devote resources in Q4 to merging patches and making a new release of this library. In particular, integration with structured streaming is important to me personally.
As an aside, you should of course benchmark your own workloads, but in all of our experiments we have seen Spark structured streaming performs multiple-orders of magnitude faster than the equivalent Kafka queries. So while I love using Kafka as a central message bus, I think there is still a lot of reasons to use Spark for stream and batch processing :)
@marmbrus Glad to see that my email to the Databricks support staff got some attention. I am pleased to hear that your team will be devoting some resources to improving Avro support going forwards. Though I appreciate the benchmark link, I'll just say that functionality trumps performance; There is a reason why we aren't all writing our web apps in assembly. It is easy for us to sacrifice performance for features and usability. Now, if Databricks can put together a streaming experience that handles Avro data seamlessly and still provides this multiple-of-magnitude performance gains then you would definitely be on the right track as far as I am concerned.
One more thing: That blog says that the benchmark details will be released shortly - that was 4 months ago. I am interested to see how the benchmark was set up, as such a large performance differential may or may not still be applicable in the latest releases of Kafka.
@bellemare totally agree. One of my favorite CS professors used to say "the biggest performance increase is when it goes from not working to working". Do keep letting us know where we are falling short on required features!
Regarding the benchmark, I'm hoping to post the full code that you can run in community edition (or on a real cluster if you are a paying customer) soon. Stay tuned to our blog. Sorry again for the delay!
@marmbrus Do you think that some of these changes will be backported to the previous versions of spark-avro (e.g. ver 2.0.1)? The reason I say this is that we're using Cloudera's Hadoop and they're still using Spark 1.6... which means that we're still on Spark 1.6 :\