spark-avro icon indicating copy to clipboard operation
spark-avro copied to clipboard

using the SchemaConverters.convertStructToAvro

Open divo77 opened this issue 6 years ago • 1 comments

In my project I am trying to convert the Struct DF schema to Avro schema with SchemaConverters.convertStructToAvro . Does anybody have example how to use this converter ?

I am using Scala and <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> 4.0.0

divo77 avatar Mar 23 '18 13:03 divo77

import com.databricks.spark.avro.SchemaConverters import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder.RecordBuilder import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.spark.sql.types.StructType import org.apache.spark.sql._

val recordName: String = "Model" val recordNamespace: String = "Test" val sparkSession: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val builder: RecordBuilder[Schema] = SchemaBuilder.record(recordName).namespace(recordNamespace)

def serialize(): Unit = { import sparkSession.implicits._ val inputDF: DataFrame = sparkSession.sparkContext.parallelize(Seq(Model(Some(List("param1"))))).toDF() val structType: StructType = Encoders.product[Model].schema

val schema: Schema = SchemaConverters.convertStructToAvro(structType, builder, recordNamespace) val genericRecord: GenericRecord = getGenericRecord(inputDF.head())

avroSerialize(schema, genericRecord) }

case class Model(params: Option[List[String]])

def avroSerialize(schema: Schema, genericRecord: GenericRecord): Try[Array[Byte]] = { Try { val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(genericRecord, encoder) encoder.flush() out.close() out.toByteArray } }

florin1288 avatar Jul 12 '18 14:07 florin1288