kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Protobuf to Parquet schema conversion failed.

Open Laboltus opened this issue 3 years ago • 2 comments

We use 3-rd party protobuf schema (.com.google.openrtb) which contains multiple nested message definitions with same names. For example:

message Asset {
    required int32 id = 1;
    optional bool required = 2 [default = false];  

    oneof asset_oneof {
      .com.google.openrtb.NativeRequest.Asset.Title title = 3;
      .com.google.openrtb.NativeRequest.Asset.Image img = 4;
      .com.google.openrtb.BidRequest.Imp.Video video = 5;
      .com.google.openrtb.NativeRequest.Asset.Data data = 6;
    }
  
    message Title {
      required int32 len = 1;
    }
    message Image {
      optional .com.google.openrtb.ImageAssetType type = 1;
      optional int32 w = 2;
      optional int32 h = 3;
      optional int32 wmin = 4;
      optional int32 hmin = 5;
      repeated string mimes = 6;
    }
    message Data {
      required .com.google.openrtb.DataAssetType type = 1;
      optional int32 len = 2;
   }
 }

...


  message User {
    optional string id = 1;
    optional string buyeruid = 2;
    optional int32 yob = 3;
    optional string gender = 4;
    optional string keywords = 5;
    optional string customdata = 6;
    optional .com.google.openrtb.BidRequest.Geo geo = 7;
    repeated .com.google.openrtb.BidRequest.Data data = 8;
  }
  message Data  {
    optional string id = 1;
    optional string name = 2;
    repeated .com.google.openrtb.BidRequest.Data.Segment segment = 3;
  
    message Segment {
      optional string id = 1;
      optional string name = 2;
      optional string value = 3;
    }
  }

Now we want to write the data to S3 using Parquet format. There were no problems when I tried to write in Json, but right after I changed "format.class" to "io.confluent.connect.s3.format.parquet.ParquetFormat", the connector failed with error:

Stack trace
[2021-04-06 08:05:22,932] ERROR WorkerSinkTask{id=thresher_proto_test-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.avro.SchemaParseException: Can't redefine: Data
	at org.apache.avro.Schema$Names.put(Schema.java:1511)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
	at org.apache.avro.Schema.toString(Schema.java:396)
	at org.apache.avro.Schema.toString(Schema.java:382)
	at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137)
	at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:277)
	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
	at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:85)
	at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:105)
	at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:532)
	at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:302)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:245)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:196)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more
Connector config
{"name": "thresher_proto_test", 
  "config":
   {
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "tasks.max": "2",
      "bootstrap.servers":"k1:9092,k2:9092",
      "topics": "protobuf_test-v2",
      "s3.region": "us-east-1",
      "s3.bucket.name": "bucket",
      "topics.dir": "data-logs/parquet",
      "aws.access.key.id": "KEY",
      "aws.secret.access.key": "SECRET",
      "s3.part.size": "5242880",
      "flush.size": "100000000",
      "rotate.interval.ms": "60000",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
      "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.converter.schema.registry.url": "http://schema-registry.service.consul:8081",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "value.converter.schema.registry.url": "http://schema-registry.service.consul:8081",
      "connect.meta.data": "false",
      "file.delim": "-",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "partition.duration.ms": "60000",
      "path.format": "YYYY/MM/dd/HH",
      "locale": "en-US",
      "timezone": "UTC",
      "timestamp.extractor": "Record",
      "consumer.override.max.poll.records": "1000"
  }
}

Laboltus avatar Apr 06 '21 09:04 Laboltus

@Laboltus did you manage to resolve this?

omeraha avatar Nov 02 '21 07:11 omeraha

We now convert protobuf to avro format before writing the data to Kafka (using org.apache.avro.protobuf.ProtobufDatumWriter in our java app).

Laboltus avatar Nov 02 '21 09:11 Laboltus