kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

StackOverFlowError when using the ParquetFormat

Open 4n4nd opened this issue 5 years ago • 12 comments

Issue: Writing parquet files to S3 fails with a stackoverflow error. Also the same configuration works when I change the format.class to json or Avro Error:

2019-11-19 18:10:20,402 INFO Opening record writer for: storage/kafka/dbserver1.inventory.customers/year=2019/month=11/day=19/hour=18/dbserver1.inventory.customers+1+0000000009.snappy.parquet (io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider) [pool-3-thread-2]
2019-11-19 18:10:20,432 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) [pool-3-thread-2]
java.lang.StackOverflowError
	at org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:403)
	at org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:309)
	at org.apache.parquet.schema.Types$Builder.named(Types.java:290)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:193)
	at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:214)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:171)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
    .....
    at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
	at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
	at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
2019-11-19 18:10:20,439 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [pool-3-thread-2]
java.lang.NullPointerException
	at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)
	at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)
	at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:229)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:617)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-11-19 18:10:20,439 ERROR WorkerSinkTask{id=s3-connector-dbserver1.inventory.customers-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [pool-3-thread-2]
2019-11-19 18:10:20,439 INFO [Consumer clientId=consumer-7, groupId=connect-s3-connector-dbserver1.inventory.customers] S

Connector configuration:

{
    "name": "s3-connector-{{ S3_CONNECTOR_TOPIC }}",
        "config": {
            "topics.dir": "storage/kafka",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max": "1",
            "topics": "{{ S3_CONNECTOR_TOPIC }}",
            "flush.size": "300",
            "rotate.interval.ms": "30000",
            "store.url": "{{ S3_ENDPOINT }}",
            "s3.bucket.name": "{{ S3_CONNECTOR_BUCKET }}",
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
            "format.class.schemas.enable": "false",
            "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
            "timestamp.extractor": "Record",
            "partition.duration.ms": "3600000",
            "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
            "locale": "en",
            "timezone": "GMT"
        }
    }```

repo tag for the connector being used: `5.5.0-beta191113214629`

4n4nd avatar Nov 19 '19 18:11 4n4nd

@4n4nd is there anything special to your schema? Could this be related to https://issues.apache.org/jira/browse/PARQUET-129 ?

kkonstantine avatar Nov 21 '19 03:11 kkonstantine

Thanks for the reply @kkonstantine. I've looked over the issue, and I'm not entirely sure if that's the problem - though it may certainly be possible. I'll try to look further into it and verify.

For reference (and in case you want to reproduce it) I'm using Debezium with a very similar setup as described in this tutorial. Once debezium writes mysql db events to its corresponding topics, I use the s3-connector convert them to parquet and write it to an s3 bucket.

4n4nd avatar Nov 21 '19 16:11 4n4nd

we have the same problem. any solutions ?

ganeshsi avatar Dec 12 '19 17:12 ganeshsi

nope sorry :(

4n4nd avatar Dec 12 '19 18:12 4n4nd

A starting point would be to mention a few characteristics of the records, their schema and any tips to reproduce the issue. Unless, the tutorial mentioned above is sufficient.

kkonstantine avatar Dec 12 '19 19:12 kkonstantine

Hi, i have the same issue with basic json data with 2 fields. Is it possible to convert JSON data to Parquet with Kafka Connect S3 ? I have tried to convert successfully with avro data but with json it seem that the parquetformater cannot infer the schema correctly.

BDeus avatar Feb 04 '20 14:02 BDeus

5.4.0 is ok for me. Just add hadoop library.

coolteddy avatar Nov 10 '20 18:11 coolteddy

@coolteddy could you please elaborate on the workaround? Will be helpful if you could mention a list of jars to include.

pushpavanthar avatar Nov 20 '20 11:11 pushpavanthar

@pushpavanthar, imported following plugin jars into our libs, hope this helps.

wget https://api.hub.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/5.4.0/archive  && unzip archive
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-s3-5.4.0.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-storage* 
confluentinc-kafka-connect-s3-5.4.0/lib/aws-java-sdk* 
confluentinc-kafka-connect-s3-5.4.0/lib/commons-cli-1.2.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/commons* 
confluentinc-kafka-connect-s3-5.4.0/lib/common-utils-5.4.0.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/http* 
confluentinc-kafka-connect-s3-5.4.0/lib/joda-time-2.9.6.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/joda-time-2.9.6.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/avro* 
confluentinc-kafka-connect-s3-5.4.0/lib/parquet* 
confluentinc-kafka-connect-s3-5.4.0/lib/hadoop* 
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-avro* 
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-connect-avro-converter-5.4.0.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/jackson* 
confluentinc-kafka-connect-s3-5.4.0/lib/common-config-5.4.0.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/commons-configuration-1.6.jar 
confluentinc-kafka-connect-s3-5.4.0/lib/kafka-schema-registry-client-5.4.0.jar 

coolteddy avatar Nov 20 '20 18:11 coolteddy

Any updates on how to fix it? I have a source Debezium connector and an S3 sink, they were working fine, I have added the info below to remove the schema from Debezium and then started getting this error.

Info added to connect container:

-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
-e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE="false" ```

Thanks 

shlomi-viz avatar Apr 06 '21 15:04 shlomi-viz

We get the same issue as @shlomi-viz . Any advise on how to fix it?

smasilamani-cfins avatar Apr 15 '21 16:04 smasilamani-cfins

I had to rolled back this change. See my comment here: https://forum.confluent.io/t/how-the-s3-sink-connector-extract-data/1344/5?u=shlomi as I understand it the ParquetFormat is indeed using the schema information, so you can not remove it from Debezium without using a schema registry

shlomi-viz avatar Apr 19 '21 07:04 shlomi-viz