spark-avro icon indicating copy to clipboard operation
spark-avro copied to clipboard

Close #169 and add support for DataSet of Avro records

Open themodernlife opened this issue 9 years ago • 59 comments

@bdrillard you did an amazing job getting this working and I hope you'll have a look at the slight refactoring I've done to get this over the finish line:

  • Works with multiple versions of Spark (2.0.x and 2.1.x so far)
  • Travis test matrix should be updated appropriately
  • Works with multiple versions of Avro

The main challenge was getting past Spark version incompatibilities. I've done that through the addition of SBT modules, liberal use of reflection and some straight up copy/paste. I've tried to put proper fences around the hacks.

Let me know what you guys think! Again, all credit to @bdrillard since this is 90% his work.

Cheers!

themodernlife avatar Feb 10 '17 23:02 themodernlife

Codecov Report

Merging #217 into master will increase coverage by 2.45%. The diff coverage is 95.56%.

@@            Coverage Diff             @@
##           master     #217      +/-   ##
==========================================
+ Coverage   90.71%   93.16%   +2.45%     
==========================================
  Files           5        7       +2     
  Lines         334      688     +354     
  Branches       50       73      +23     
==========================================
+ Hits          303      641     +338     
- Misses         31       47      +16

codecov-io avatar Feb 10 '17 23:02 codecov-io

@themodernlife thanks for putting this pull-request together, I certainly didn't have the SBT expertise necessary to handle multiple builds of Spark, so this additional work is just what the encoder needed. I had a look at the inlining of additional object types that aren't common to both 2.0.0 and 2.1.x, as well as the serializing of schemas (when used in parsing complex unions), those changes look fine to me. I noticed just a few stray commented-out lines from what may have been some refactoring experiments and a few style points in doc comments. Otherwise this looks great, thanks for the additional legwork on this issue.

bdrillard avatar Feb 13 '17 15:02 bdrillard

I'm trying to add the following test based on this PR, but I get a org.apache.spark.SparkException: Task not serializable caused by Cause: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Here the full stack trace https://gist.github.com/gcpagano/e95a351dc75c4a6ecd2a50fdd7665e90

I'm not sure if this use case wasn't meant to be supported in this PR or maybe I'm doing something wrong in the test code.

  test("create Dataset from GenericRecord") {
    val sparkSession = spark
    import sparkSession.implicits._

    val schema: Schema =
      SchemaBuilder
        .record("GenericRecordTest")
        .namespace("com.databricks.spark.avro")
        .fields()
        .requiredString("field1")
        .endRecord()

    implicit val enc = AvroEncoder.of[GenericData.Record](schema)

    val genericRecords = (1 to 10) map { i =>
      new GenericRecordBuilder(schema)
        .set("field1", "field-" + i)
        .build()
    }

    val rdd: RDD[Record] = sparkSession.sparkContext
      .parallelize(genericRecords)

    val ds = rdd.toDS()

    assert(ds.count() == genericRecords.size)
  }

gcpagano avatar Feb 14 '17 09:02 gcpagano

Hmm... ran into this with SpecificRecord as well. The problem is toDS runs the encoder in mapPartitions. In mapPartitions we check that the encoder's entire expression tree is serializable. Some of the nodes in the expression tree hold a reference to the Avro Schema which is not, and you get errors like the above.

I'm looking into a way to solve this. I have a fix for your unit test, but looking at the code I'm worried there could be other runtime errors scattered about.

themodernlife avatar Feb 15 '17 22:02 themodernlife

@gcpagano pushed change for your unit test and added coverage for fixed and enum cases. Let me know if you have any feedback.

themodernlife avatar Feb 15 '17 22:02 themodernlife

LGTM

gcpagano avatar Feb 17 '17 14:02 gcpagano

Been watching this PR for a while. Any reason to hold back on this issue?

tkinz27 avatar Mar 06 '17 23:03 tkinz27

@themodernlife Could you check and resolve this last merge conflict? It appears to be pretty minor. @JoshRosen Would it be possible for us to merge this PR soon?

bdrillard avatar Apr 17 '17 14:04 bdrillard

@bdrillard ok merge conflicts should be fixed now. LMK any other feedback.

themodernlife avatar Apr 18 '17 19:04 themodernlife

FWIW, we've been using this branch for months to bring a large collection of complex Avro records (hundreds of fields nested several layers deep) into Spark, and it has worked great. We'd love to see this in spark-avro properly.

rbrush avatar Apr 18 '17 21:04 rbrush

+1. This would be a very welcomed addition.

mnarrell avatar May 05 '17 00:05 mnarrell

Would love to see this one get in soon. Is there anything blocking this moving forwards? Do we need to nag someone on this?

bellemare avatar May 18 '17 17:05 bellemare

This would be really great to have! @yhuai @rxin

AnmolMago avatar Sep 22 '17 18:09 AnmolMago

+1 for merge - what is left to do to get this merged? Is there any help I can offer? @JoshRosen

racc avatar Sep 28 '17 10:09 racc

It seems to me that Databricks doesn't really care about Avro support that much, which is a shame as we move towards a full streaming-based world. I have since moved on to alternative solutions as it's clear they don't care to support this project. Last commit merged is back in February.

bellemare avatar Oct 02 '17 12:10 bellemare

Unfortunately, I cannot agree with you more. There are several feature-rich pull requests that are tested, commented on, and simply waiting to be merged that would continue the value of this project. This has languished and, like many others, we've abandoned this library.

mnarrell avatar Oct 02 '17 14:10 mnarrell

@JoshRosen - If you are not maintaining this project anymore, can you indicate as such and ideally hand it off to someone else?

bellemare avatar Oct 02 '17 14:10 bellemare

@bellemare, @mnarrell what alternatives to spark-avro are you using?

racc avatar Oct 03 '17 06:10 racc

As an aside, why is this is not part of Spark, like Parquet? This is not a loaded question. I really want to know.

And there is no Avro alternative or is there? I mean something that works for large batch files, for web service streams, binary, with external schema, schema evolution, usable for single records processing like in streaming. Would you guys use Thrift or Protocol Buffers if it would be just about streaming then?

I am currently evaluating options for an enterprise customer and that Avro is not part of Spark also may mean that you don’t get support. Say you bought an Hadoop distribution from a vendor and then run into a production issue the vendor may well say that this part is not what they signed up for :(

I hope it’s ok to discuss this here.

marianokamp avatar Oct 03 '17 07:10 marianokamp

@racc No, alternatives to Avro. We don't use structured streaming, or at least dataframes with high speed data. We'll tolerate RDD[T] there. Data at rest is parquet.

mnarrell avatar Oct 03 '17 12:10 mnarrell

@mnarrell ok well I'm interested in this PR because it fixes this issue: https://stackoverflow.com/questions/36532237/infinite-recursion-in-createdataframe-for-avro-types

Basically you can't bean encode an RDD of an Avro Generated class to a Dataset because of the getSchema() method. Much of the data we work with gets transformed to Avro Generated classes because it's convenient to write an AVDL and then generate a Hive table from the Avro schema - keeps defining the data model in one place.

That's why I was asking about alternatives to spark-avro for encoding Avro data :)

racc avatar Oct 03 '17 22:10 racc

Out of curiosity, why do you use avro formats rather than a more modern one like Parquet? Just asking so I can better understand the use cases.

rxin avatar Oct 03 '17 22:10 rxin

@rxin Mainly for the convenience use-case described above. Generating a class from an Avro Schema (AVDL) means we can parse our data into the generated POJOs and then write them out to whatever format (Avro, Parquet) etc. Since a Hive table can be created from an Avro Schema, we don't have to maintain a Hive DDL when the schema changes. If there's a more convenient alternative to using Avro generated classes I'm open to hearing about it.

I think that Avro is used in conjunction with Kafka too. People tend to send messages in the Avro format and then use a Schema registry which defines the types of messages being sent (https://github.com/confluentinc/schema-registry).

One other minor advantage of the Avro format is that you can append data to the file - which helps mitigate the small files problem on the HDFS. Otherwise with Parquet you'd probably need to do some periodic consolidation of the files.

racc avatar Oct 03 '17 22:10 racc

@rxin to add our use case in, we use avro over kafka, and this PR lets us easily work with structured streaming to read from kafka and write to our hdfs cluster. We have a secondary etl process that consolidates the avro files into parquet. Being able to append data is a big reason why we use avro.

tkinz27 avatar Oct 03 '17 23:10 tkinz27

@rxin In addition to the above use cases, we have several petabytes of Avro files and many dozens of jobs using them (including use of specific records). This Avro encoder allows us to migrate to Spark datasets incrementally, which is important because a single, large move isn't doable in this case.

(I work with @bdrillard, who did the bulk of the work on this PR. We're using it in an internal fork in production and it's working well.)

rbrush avatar Oct 03 '17 23:10 rbrush

@racc - Though we still use Spark for processing Parquet files, we've moved largely towards just using Avro with Kafka and KStreams. The only real need for the large batch processing is for historical analysis or ML training. In reality most of our data pipelines should be stream-based (faster response, more accurate up-to-the-minute reports); Kafka with Avro facilitates this nicely. Currently we do sink Avro data to S3 using Kafka Connect, and we're evaluating if we even need to convert to Parquet at all to do massive batch processing.

My 2 cents is to ditch Spark completely when it comes to processing streams. Kafka is just light years ahead, and with new improvements to Kafka Confluent you'll be able to do historic batch processing too. Kafka SQL is also coming out fairly soon, which will basically mimic Spark DataFrames but for streaming processes AND for the historical data. IMO Spark is going down the wrong path by not doing enough to facilitate proper streaming with Avro-encoded data, and I can't imagine going back to it for anything other than batch-only ML or historical analysis with existing code.

bellemare avatar Oct 04 '17 14:10 bellemare

Sorry for the delay! I apologize that this repo has been ignored for a few months. It is not abandoned and we realize that avro support is important. Thanks for voicing your concerns!

We have made sure to devote resources in Q4 to merging patches and making a new release of this library. In particular, integration with structured streaming is important to me personally.

As an aside, you should of course benchmark your own workloads, but in all of our experiments we have seen Spark structured streaming performs multiple-orders of magnitude faster than the equivalent Kafka queries. So while I love using Kafka as a central message bus, I think there is still a lot of reasons to use Spark for stream and batch processing :)

marmbrus avatar Oct 06 '17 19:10 marmbrus

@marmbrus Glad to see that my email to the Databricks support staff got some attention. I am pleased to hear that your team will be devoting some resources to improving Avro support going forwards. Though I appreciate the benchmark link, I'll just say that functionality trumps performance; There is a reason why we aren't all writing our web apps in assembly. It is easy for us to sacrifice performance for features and usability. Now, if Databricks can put together a streaming experience that handles Avro data seamlessly and still provides this multiple-of-magnitude performance gains then you would definitely be on the right track as far as I am concerned.

One more thing: That blog says that the benchmark details will be released shortly - that was 4 months ago. I am interested to see how the benchmark was set up, as such a large performance differential may or may not still be applicable in the latest releases of Kafka.

bellemare avatar Oct 06 '17 22:10 bellemare

@bellemare totally agree. One of my favorite CS professors used to say "the biggest performance increase is when it goes from not working to working". Do keep letting us know where we are falling short on required features!

Regarding the benchmark, I'm hoping to post the full code that you can run in community edition (or on a real cluster if you are a paying customer) soon. Stay tuned to our blog. Sorry again for the delay!

marmbrus avatar Oct 06 '17 22:10 marmbrus

@marmbrus Do you think that some of these changes will be backported to the previous versions of spark-avro (e.g. ver 2.0.1)? The reason I say this is that we're using Cloudera's Hadoop and they're still using Spark 1.6... which means that we're still on Spark 1.6 :\

racc avatar Oct 08 '17 22:10 racc