kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Support null items within arrays with Parquet writer

Open hashhar opened this issue 5 years ago • 8 comments

With a connect schema like SchemaBuilder.array(OPTIONAL_STRING_SCHEMA), the Parquet writer fails with the following stack trace.

[2020-06-25 01:05:36,483][ERROR][task-thread-prod-integration-s3-pq-ums-user-sink-7][runtime.WorkerSinkTask]-WorkerSinkTask{id=prod-integration-s3-pq-ums-user-sink-7} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Array contains a null element at 1
Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
java.lang.NullPointerException: Array contains a null element at 1
Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
        at org.apache.parquet.avro.AvroWriteSupport$TwoLevelListWriter.writeCollection(AvroWriteSupport.java:556)
        at org.apache.parquet.avro.AvroWriteSupport$ListWriter.writeList(AvroWriteSupport.java:397)
        at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:355)
        at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
        at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
        at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
        at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:88)
        at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
        at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
        at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
        at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
        at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

hashhar avatar Jun 25 '20 01:06 hashhar

My proposed solution involves allowing pass-through properties for the RecordWriter classes. I'll try to take a stab at this over the next week - unless someone here has an idea of how this can be solved without code changes.

hashhar avatar Jun 25 '20 01:06 hashhar

I have a patch ready for this - will be polishing and submitting over the weekend.

hashhar avatar Jun 25 '20 13:06 hashhar

Hi I have the same problem. Does exist a patch to solve it or any configuration item?

epriale avatar Feb 17 '21 14:02 epriale

You'll need to fork and pass the following config when creating the Parquet writer here

parquet.avro.write-old-list-structure=false.

hashhar avatar Feb 17 '21 14:02 hashhar

Hi, I changed the file as you suggested, adding a conf var (I had also to add import org.apache.hadoop.conf.Configuration) try { log.info("Opening record writer for: {}", filename); org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);

        Configuration conf = new Configuration();
        conf.setBoolean("parquet.avro.write-old-list-structure", false);

        s3ParquetOutputFile = new S3ParquetOutputFile(storage, filename);
        writer = AvroParquetWriter
                .<GenericRecord>builder(s3ParquetOutputFile)
                .withConf(conf)      
                .withSchema(avroSchema)
                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                .withDictionaryEncoding(true)
                .withCompressionCodec(storage.conf().parquetCompressionCodecName())
                .withPageSize(PAGE_SIZE)
                .build();
      }

I can rebuild without errrors, but when i run the connector, I got these runtime errors:

java.lang.NoSuchMethodError: io.confluent.connect.storage.StorageSinkConnectorConfig.enableParquetConfig(Lorg/apache/kafka/common/config/ConfigDef;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Ljava/lang/String;I)V\n\tat io.confluent.connect.s3.S3SinkConnectorConfig.newConfigDef(S3SinkConnectorConfig.java:219)\n\tat io.confluent.connect.s3.S3SinkConnectorConfig.(S3SinkConnectorConfig.java:590)\n\tat io.confluent.connect.s3.S3SinkConnector.start(S3SinkConnector.java:59)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:253)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:924)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:920)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)

Any help is wellcome (I am quite new to java, so perhaps I did some stupid mistakes)

epriale avatar Feb 19 '21 11:02 epriale

I think the Confluent/Kafka Connect version differs from the version you built connector from so there are classpath issues.

Try making sure that you are Confluent 6.1 (since that's what the current master branch of this repo seems to pull in).

hashhar avatar Feb 19 '21 11:02 hashhar

Hi Hashhar, now it works! I just changed the order of

.withConf(conf)
.withSchema(avroSchema)

to withSchema(avroSchema) .withConf(conf)

I am working on versione 5.5.3

Thanks a lot for your help!!!!!

epriale avatar Feb 22 '21 11:02 epriale

Hey,

Thanks for fixing this. Can I use it with confluentinc/cp-kafka-connect-base docker image? I'm adding to it the S3 connector with

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.5.3

Thanks @hashhar

shlomi-viz avatar May 06 '21 10:05 shlomi-viz