schema-registry
schema-registry copied to clipboard
Conversion of protobuf to Parquet fails for messages with Union sealed_value_optional
Hi guys! I'm running an S3SinkConnector (Kafka connect version 5.5.7) and have some issues converting protobuf records to parquet. I have the following (example) Protobuf message:
import "scalapb/scalapb.proto";
message Msg {
string str = 1;
int32 int = 2;
Type t = 3;
}
message Type {
oneof sealed_value_optional {
Type1 t1 = 1;
Type1 t2 = 2;
Type1 t3 = 3;
}
}
message Type1 {}
message Type2{}
message Type3 {}
I registered this message in the Schema register.
I'm running the connector with the following configurations (kept only the relevant fields, for readability purposes):
{
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
...
}
and it fails with the following error:
Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional
Full stack trace
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97) at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313) at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:401) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202) ... 7 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.avro.Schema.toString(Schema.java:382) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137) at org.apache.parquet.hadoop.ParquetWriter.
Is there a way to resolve this?
Maybe this config helps?
https://github.com/confluentinc/schema-registry/blob/master/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java#L27
Or if it's Avro
https://github.com/confluentinc/schema-registry/blob/master/avro-data/src/main/java/io/confluent/connect/avro/AvroDataConfig.java#L28-L32
But also, I'm not entirely familiar with Parquet supporting union types
@OneCricketeer Tried adding this config, but it still fails for the same reason
You mention you're running 5.5.7... #2149 isn't part of that release
Running now with the latest versions: kafka-connect-s3-10.0.8.jar
and Kafka connect 6.0.7
It is running successfully? Or that's what you've upgraded to? You can look at the v6.0.7 branch of the ProtobufDataConfig class, and the property still isn't there. It was only recently added, and might not be released yet since it's also not in the v7.1.1 (latest release) branch