spark-avro icon indicating copy to clipboard operation
spark-avro copied to clipboard

How to parse Avro messages while read a stream of messages from Kakfa in Spark 2.2.0?

Open kant111 opened this issue 7 years ago • 21 comments

The below code reads the messages from Kafka and the messages are in Avro so how do I parse the message and put it into a dataframe in Spark 2.2.0?

Dataset<Row> df = sparkSession.readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "topic1")
            .load();

kant111 avatar Dec 16 '17 04:12 kant111

Is there a from_avro function just like from_json function that is already available ?

// Json version that is already available.

StructType jsonSchema = new StructType()......;
df.select(from_json(new Column("value").cast("string"), jsonSchema).as("payload"));

// Avro version that is not yet available.

StructType avroSchema = new StructType()......;
df.select(from_avro(new Column("value").cast("string"), avroSchema).as("payload"));

kant111 avatar Dec 18 '17 20:12 kant111

@gengliangwang

kant111 avatar Dec 18 '17 20:12 kant111

I would also be interested in this, in the context of reading Avro from DynamoDB instead. Is there a way to mix input sources such as Kafka/DynamoDB/etc. with spark-avro? This would be very useful.

peay avatar Jan 25 '18 09:01 peay

need this badly as well

bobbui avatar Apr 25 '18 00:04 bobbui

Looking forward for this feature

devsaik avatar Jul 15 '18 15:07 devsaik

There's a databricks page (https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html#avro-dataframe) that claims there is a from_avro But, it returns "error: not found: value from_avro" after importing:

import com.databricks.spark.avro._ import org.apache.avro.SchemaBuilder

mushgrant avatar Jul 18 '18 18:07 mushgrant

I just checked again. It doesn’t exist

kant111 avatar Jul 19 '18 05:07 kant111

I think Databricks has not open sourced it. It must be working on their plateform

samklr avatar Aug 28 '18 12:08 samklr

This project seems more up-to-date with Kafka support https://github.com/AbsaOSS/ABRiS

OneCricketeer avatar Aug 30 '18 19:08 OneCricketeer

This post shows example usage with Kafka and Spark 2.4 Avro support https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

OneCricketeer avatar Dec 10 '18 22:12 OneCricketeer

Does it support confluent Kafka avro format ?

sterkh66 avatar Dec 11 '18 13:12 sterkh66

@sterkh66 The abris library above does. The Spark library is just whatever was available in here, AFAIK

OneCricketeer avatar Dec 11 '18 14:12 OneCricketeer

@cricket007 Thanks for quick reply. This article gets me confused https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html, so far I don't see support for schema registry described in there's example

sterkh66 avatar Dec 11 '18 14:12 sterkh66

Again. It doesn't. Just because there's Avro in Kafka doesn't mean you need to use a Schema Registry. The messages in the blog will need to have the schema as part of the message

OneCricketeer avatar Dec 11 '18 14:12 OneCricketeer

@cricket007 There is an explicit example of schema registry usage in databricks blog. Could you explain why schema registry is mentioned there? I believe it confuses a lot of people.

dbolshak avatar Dec 19 '18 15:12 dbolshak

@dbolshak I didn't write the article and have no affiliation with Databricks.

I can only speculate that because Confluent Platform is one of the main enterprise deployments of Kafka and people kept filing issues about being unable to use "Confluent encoded" Avro and/or how to integrate this library with the Schema Registry

OneCricketeer avatar Dec 19 '18 15:12 OneCricketeer

If you mean it's confusing to see Databricks have one example that's not in the Spark documentation, then I agree, and I've voiced my opinions in the Spark JIRA, but that's not an issue to discuss here as well

OneCricketeer avatar Dec 19 '18 15:12 OneCricketeer

@cricket007 There's still more. The from_avro and to_avro functions mentioned in the blog databricks blog have never been included to 4.0 release and remained unmerged as this PR

sterkh66 avatar Dec 19 '18 15:12 sterkh66

From what I understand, Databricks platform maintains their own Avro functions that include the Schema Registry support, and those methods that allow for the url are not open sourced. The remainder of this repo is now merged with Spark 2.4

OneCricketeer avatar Dec 19 '18 16:12 OneCricketeer

This seems to be the only reasonable explanation and non open-sourced version has been already supposed in one of the comments above. Anyway, Jordan, thanks for participating in this tricky "investigation".

sterkh66 avatar Dec 19 '18 17:12 sterkh66

@cricket007 @sterkh66 @dbolshak The Schema Registry support is Databricks Runtime only.

@kant111 The function is already in Spark 2.4.

gengliangwang avatar Dec 19 '18 18:12 gengliangwang