schema-registry icon indicating copy to clipboard operation
schema-registry copied to clipboard

Conversion of protobuf to Parquet fails for messages with Union sealed_value_optional

Open omeraha opened this issue 2 years ago • 5 comments

Hi guys! I'm running an S3SinkConnector (Kafka connect version 5.5.7) and have some issues converting protobuf records to parquet. I have the following (example) Protobuf message:

import "scalapb/scalapb.proto";

message Msg {
  string str = 1;
  int32 int = 2;
  Type t = 3;
}

message Type {
   oneof sealed_value_optional {
     Type1 t1 = 1; 
     Type1 t2 = 2; 
     Type1 t3 = 3; 
  }
}

message Type1 {}
message Type2{}
message Type3 {}

I registered this message in the Schema register.

I'm running the connector with the following configurations (kept only the relevant fields, for readability purposes):

{
  "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  ...
}

and it fails with the following error:

  Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional
Full stack trace
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97) at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313) at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:401) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202) ... 7 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.avro.Schema.toString(Schema.java:382) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:277) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) ... 10 more

Is there a way to resolve this?

omeraha avatar Jun 21 '22 14:06 omeraha

Maybe this config helps?

https://github.com/confluentinc/schema-registry/blob/master/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java#L27

Or if it's Avro

https://github.com/confluentinc/schema-registry/blob/master/avro-data/src/main/java/io/confluent/connect/avro/AvroDataConfig.java#L28-L32

But also, I'm not entirely familiar with Parquet supporting union types

OneCricketeer avatar Jun 22 '22 21:06 OneCricketeer

@OneCricketeer Tried adding this config, but it still fails for the same reason

omeraha avatar Jun 23 '22 07:06 omeraha

You mention you're running 5.5.7... #2149 isn't part of that release

OneCricketeer avatar Jun 23 '22 07:06 OneCricketeer

Running now with the latest versions: kafka-connect-s3-10.0.8.jar and Kafka connect 6.0.7

omeraha avatar Jun 23 '22 08:06 omeraha

It is running successfully? Or that's what you've upgraded to? You can look at the v6.0.7 branch of the ProtobufDataConfig class, and the property still isn't there. It was only recently added, and might not be released yet since it's also not in the v7.1.1 (latest release) branch

OneCricketeer avatar Jun 23 '22 12:06 OneCricketeer