kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
StackOverFlowError when using the ParquetFormat
Issue: Writing parquet files to S3 fails with a stackoverflow error. Also the same configuration works when I change the format.class to json or Avro Error:
2019-11-19 18:10:20,402 INFO Opening record writer for: storage/kafka/dbserver1.inventory.customers/year=2019/month=11/day=19/hour=18/dbserver1.inventory.customers+1+0000000009.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider) [pool-3-thread-2]
2019-11-19 18:10:20,432 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) [pool-3-thread-2]
java.lang.StackOverflowError
at org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:403)
at org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:309)
at org.apache.parquet.schema.Types$Builder.named(Types.java:290)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:193)
at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
.....
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
2019-11-19 18:10:20,439 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [pool-3-thread-2]
java.lang.NullPointerException
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)
at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)
at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:617)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-11-19 18:10:20,439 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [pool-3-thread-2]
2019-11-19 18:10:20,439 INFO [Consumer clientId=consumer-7, groupId=connect-s3-connector-dbserver1.inventory.customers] S
Connector configuration:
{
"name": "s3-connector-{{ S3_CONNECTOR_TOPIC }}",
"config": {
"topics.dir": "storage/kafka",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "{{ S3_CONNECTOR_TOPIC }}",
"flush.size": "300",
"rotate.interval.ms": "30000",
"store.url": "{{ S3_ENDPOINT }}",
"s3.bucket.name": "{{ S3_CONNECTOR_BUCKET }}",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"format.class.schemas.enable": "false",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor": "Record",
"partition.duration.ms": "3600000",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en",
"timezone": "GMT"
}
}```
repo tag for the connector being used: `5.5.0-beta191113214629`
@4n4nd is there anything special to your schema? Could this be related to https://issues.apache.org/jira/browse/PARQUET-129 ?
Thanks for the reply @kkonstantine. I've looked over the issue, and I'm not entirely sure if that's the problem - though it may certainly be possible. I'll try to look further into it and verify.
For reference (and in case you want to reproduce it) I'm using Debezium with a very similar setup as described in this tutorial. Once debezium writes mysql db events to its corresponding topics, I use the s3-connector convert them to parquet and write it to an s3 bucket.
we have the same problem. any solutions ?
nope sorry :(
A starting point would be to mention a few characteristics of the records, their schema and any tips to reproduce the issue. Unless, the tutorial mentioned above is sufficient.
Hi, i have the same issue with basic json data with 2 fields. Is it possible to convert JSON data to Parquet with Kafka Connect S3 ? I have tried to convert successfully with avro data but with json it seem that the parquetformater cannot infer the schema correctly.
5.4.0 is ok for me. Just add hadoop library.
@coolteddy could you please elaborate on the workaround? Will be helpful if you could mention a list of jars to include.
@pushpavanthar, imported following plugin jars into our libs, hope this helps.
wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/5.4.0/archive && unzip archive
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-s3-5.4.0.jar
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-storage*
confluentinc-kafka-connect-s3-5.4.0/lib/aws-java-sdk*
confluentinc-kafka-connect-s3-5.4.0/lib/commons-cli-1.2.jar
confluentinc-kafka-connect-s3-5.4.0/lib/commons*
confluentinc-kafka-connect-s3-5.4.0/lib/common-utils-5.4.0.jar
confluentinc-kafka-connect-s3-5.4.0/lib/http*
confluentinc-kafka-connect-s3-5.4.0/lib/joda-time-2.9.6.jar
confluentinc-kafka-connect-s3-5.4.0/lib/joda-time-2.9.6.jar
confluentinc-kafka-connect-s3-5.4.0/lib/avro*
confluentinc-kafka-connect-s3-5.4.0/lib/parquet*
confluentinc-kafka-connect-s3-5.4.0/lib/hadoop*
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-avro*
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-avro-converter-5.4.0.jar
confluentinc-kafka-connect-s3-5.4.0/lib/jackson*
confluentinc-kafka-connect-s3-5.4.0/lib/common-config-5.4.0.jar
confluentinc-kafka-connect-s3-5.4.0/lib/commons-configuration-1.6.jar
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-schema-registry-client-5.4.0.jar
Any updates on how to fix it?
I have a source Debezium
connector and an S3 sink, they were working fine, I have added the info below to remove the schema from Debezium
and then started getting this error.
Info added to connect container
:
-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
-e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE="false" ```
Thanks
We get the same issue as @shlomi-viz . Any advise on how to fix it?
I had to rolled back this change. See my comment here: https://forum.confluent.io/t/how-the-s3-sink-connector-extract-data/1344/5?u=shlomi as I understand it the ParquetFormat
is indeed using the schema information, so you can not remove it from Debezium
without using a schema registry