kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Protobuf to Parquet schema conversion failed.
We use 3-rd party protobuf schema (.com.google.openrtb) which contains multiple nested message definitions with same names. For example:
message Asset {
required int32 id = 1;
optional bool required = 2 [default = false];
oneof asset_oneof {
.com.google.openrtb.NativeRequest.Asset.Title title = 3;
.com.google.openrtb.NativeRequest.Asset.Image img = 4;
.com.google.openrtb.BidRequest.Imp.Video video = 5;
.com.google.openrtb.NativeRequest.Asset.Data data = 6;
}
message Title {
required int32 len = 1;
}
message Image {
optional .com.google.openrtb.ImageAssetType type = 1;
optional int32 w = 2;
optional int32 h = 3;
optional int32 wmin = 4;
optional int32 hmin = 5;
repeated string mimes = 6;
}
message Data {
required .com.google.openrtb.DataAssetType type = 1;
optional int32 len = 2;
}
}
...
message User {
optional string id = 1;
optional string buyeruid = 2;
optional int32 yob = 3;
optional string gender = 4;
optional string keywords = 5;
optional string customdata = 6;
optional .com.google.openrtb.BidRequest.Geo geo = 7;
repeated .com.google.openrtb.BidRequest.Data data = 8;
}
message Data {
optional string id = 1;
optional string name = 2;
repeated .com.google.openrtb.BidRequest.Data.Segment segment = 3;
message Segment {
optional string id = 1;
optional string name = 2;
optional string value = 3;
}
}
Now we want to write the data to S3 using Parquet format. There were no problems when I tried to write in Json, but right after I changed "format.class" to "io.confluent.connect.s3.format.parquet.ParquetFormat", the connector failed with error:
Stack trace
[2021-04-06 08:05:22,932] ERROR WorkerSinkTask{id=thresher_proto_test-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.avro.SchemaParseException: Can't redefine: Data
at org.apache.avro.Schema$Names.put(Schema.java:1511)
at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203)
at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
at org.apache.avro.Schema.toString(Schema.java:396)
at org.apache.avro.Schema.toString(Schema.java:382)
at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137)
at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:277)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:85)
at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:105)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:532)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:302)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:245)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:196)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Connector config
{"name": "thresher_proto_test",
"config":
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "2",
"bootstrap.servers":"k1:9092,k2:9092",
"topics": "protobuf_test-v2",
"s3.region": "us-east-1",
"s3.bucket.name": "bucket",
"topics.dir": "data-logs/parquet",
"aws.access.key.id": "KEY",
"aws.secret.access.key": "SECRET",
"s3.part.size": "5242880",
"flush.size": "100000000",
"rotate.interval.ms": "60000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.converter.schema.registry.url": "http://schema-registry.service.consul:8081",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url": "http://schema-registry.service.consul:8081",
"connect.meta.data": "false",
"file.delim": "-",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "60000",
"path.format": "YYYY/MM/dd/HH",
"locale": "en-US",
"timezone": "UTC",
"timestamp.extractor": "Record",
"consumer.override.max.poll.records": "1000"
}
}
@Laboltus did you manage to resolve this?
We now convert protobuf to avro format before writing the data to Kafka (using org.apache.avro.protobuf.ProtobufDatumWriter in our java app).