kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Protobuf to Parquet schema conversion failed.
We use 3-rd party protobuf schema (.com.google.openrtb) which contains multiple nested message definitions with same names. For example:
message Asset { required int32 id = 1; optional bool required = 2 [default = false]; oneof asset_oneof { .com.google.openrtb.NativeRequest.Asset.Title title = 3; .com.google.openrtb.NativeRequest.Asset.Image img = 4; .com.google.openrtb.BidRequest.Imp.Video video = 5; .com.google.openrtb.NativeRequest.Asset.Data data = 6; } message Title { required int32 len = 1; } message Image { optional .com.google.openrtb.ImageAssetType type = 1; optional int32 w = 2; optional int32 h = 3; optional int32 wmin = 4; optional int32 hmin = 5; repeated string mimes = 6; } message Data { required .com.google.openrtb.DataAssetType type = 1; optional int32 len = 2; } } ... message User { optional string id = 1; optional string buyeruid = 2; optional int32 yob = 3; optional string gender = 4; optional string keywords = 5; optional string customdata = 6; optional .com.google.openrtb.BidRequest.Geo geo = 7; repeated .com.google.openrtb.BidRequest.Data data = 8; } message Data { optional string id = 1; optional string name = 2; repeated .com.google.openrtb.BidRequest.Data.Segment segment = 3; message Segment { optional string id = 1; optional string name = 2; optional string value = 3; } }
Now we want to write the data to S3 using Parquet format. There were no problems when I tried to write in Json, but right after I changed "format.class" to "io.confluent.connect.s3.format.parquet.ParquetFormat", the connector failed with error:
Stack trace
[2021-04-06 08:05:22,932] ERROR WorkerSinkTask{id=thresher_proto_test-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.avro.SchemaParseException: Can't redefine: Data at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.avro.Schema.toString(Schema.java:382) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:277) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:85) at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:105) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:532) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:302) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:245) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:196) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) ... 10 more
Connector config
{"name": "thresher_proto_test", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "2", "bootstrap.servers":"k1:9092,k2:9092", "topics": "protobuf_test-v2", "s3.region": "us-east-1", "s3.bucket.name": "bucket", "topics.dir": "data-logs/parquet", "aws.access.key.id": "KEY", "aws.secret.access.key": "SECRET", "s3.part.size": "5242880", "flush.size": "100000000", "rotate.interval.ms": "60000", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "key.converter": "io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url": "http://schema-registry.service.consul:8081", "value.converter": "io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url": "http://schema-registry.service.consul:8081", "connect.meta.data": "false", "file.delim": "-", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "partition.duration.ms": "60000", "path.format": "YYYY/MM/dd/HH", "locale": "en-US", "timezone": "UTC", "timestamp.extractor": "Record", "consumer.override.max.poll.records": "1000" } }
@Laboltus did you manage to resolve this?
We now convert protobuf to avro format before writing the data to Kafka (using org.apache.avro.protobuf.ProtobufDatumWriter in our java app).