Support null items within arrays with Parquet writer
With a connect schema like SchemaBuilder.array(OPTIONAL_STRING_SCHEMA), the Parquet writer fails with the following stack trace.
[2020-06-25 01:05:36,483][ERROR][task-thread-prod-integration-s3-pq-ums-user-sink-7][runtime.WorkerSinkTask]-WorkerSinkTask{id=prod-integration-s3-pq-ums-user-sink-7} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Array contains a null element at 1
Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
java.lang.NullPointerException: Array contains a null element at 1
Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:556)
at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:397)
at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:88)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
My proposed solution involves allowing pass-through properties for the RecordWriter classes. I'll try to take a stab at this over the next week - unless someone here has an idea of how this can be solved without code changes.
I have a patch ready for this - will be polishing and submitting over the weekend.
Hi I have the same problem. Does exist a patch to solve it or any configuration item?
You'll need to fork and pass the following config when creating the Parquet writer here
parquet.avro.write-old-list-structure=false.
Hi, I changed the file as you suggested, adding a conf var (I had also to add import org.apache.hadoop.conf.Configuration) try { log.info("Opening record writer for: {}", filename); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
Configuration conf = new Configuration();
conf.setBoolean("parquet.avro.write-old-list-structure", false);
s3ParquetOutputFile = new S3ParquetOutputFile(storage, filename);
writer = AvroParquetWriter
.<GenericRecord>builder(s3ParquetOutputFile)
.withConf(conf)
.withSchema(avroSchema)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withDictionaryEncoding(true)
.withCompressionCodec(storage.conf().parquetCompressionCodecName())
.withPageSize(PAGE_SIZE)
.build();
}
I can rebuild without errrors, but when i run the connector, I got these runtime errors:
java.lang.NoSuchMethodError: io.confluent.connect.storage.StorageSinkConnectorConfig.enableParquetConfig(Lorg/apache/kafka/common/config/ConfigDef;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Ljava/lang/String;I)V\n\tat io.confluent.connect.s3.S3SinkConnectorConfig.newConfigDef(S3SinkConnectorConfig.java:219)\n\tat io.confluent.connect.s3.S3SinkConnectorConfig.
Any help is wellcome (I am quite new to java, so perhaps I did some stupid mistakes)
I think the Confluent/Kafka Connect version differs from the version you built connector from so there are classpath issues.
Try making sure that you are Confluent 6.1 (since that's what the current master branch of this repo seems to pull in).
Hi Hashhar, now it works! I just changed the order of
.withConf(conf)
.withSchema(avroSchema)
to withSchema(avroSchema) .withConf(conf)
I am working on versione 5.5.3
Thanks a lot for your help!!!!!
Hey,
Thanks for fixing this. Can I use it with confluentinc/cp-kafka-connect-base docker image? I'm adding to it the S3 connector with
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.5.3
Thanks @hashhar