kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

S3 Sink Task - Parquet - Error: Java.lang.NoClassDefFoundError: com/google/common/base/Preconditions

Open shanemougham opened this issue 4 years ago • 12 comments

Hello Guys,

I am getting below exception while using Kafka S3 Sink Connector to write data as Parquet Format to S3. I am using JDBC source connector to write data to kafka topics in AVRO format.

[2020-08-10 03:39:29,070] ERROR WorkerSinkTask{id=s3-sink-test-21-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: com/google/common/base/Preconditions (org.apache.kafka.connect.runtime.WorkerSinkTask:566) java.lang.NoClassDefFoundError: com/google/common/base/Preconditions at org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configuration.java:361) at org.apache.hadoop.conf.Configuration$DeprecationDelta.(Configuration.java:374) at org.apache.hadoop.conf.Configuration.(Configuration.java:456) at org.apache.parquet.hadoop.ParquetWriter$Builder.(ParquetWriter.java:345) at org.apache.parquet.avro.AvroParquetWriter$Builder.(AvroParquetWriter.java:162) at org.apache.parquet.avro.AvroParquetWriter$Builder.(AvroParquetWriter.java:153) at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:74) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 24 more [2020-08-10 03:39:29,072] ERROR WorkerSinkTask{id=s3-sink-test-21-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186) java.lang.NullPointerException at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97) at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:314) at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:401) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Below is config for S3 sink task connector.. { "name": "s3-sink-test-21", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "topics": "TESTREPLICA12", "s3.region": "ap-southeast-2", "s3.bucket.name": "xxxxx", "s3.part.size": "5242880", "flush.size": "1", "auto.register.schemas": "false", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "schema.compatibility": "FULL", "locale": "AU", "timezone": "EST", "partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"partition.duration.ms":"86400000",
"rotate.interval.ms":"60000",
"timestamp.extractor":"Record", "parquet.codec": "none",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://127.0.0.1:8081", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "enhanced.avro.schema.support":"true", "path.format":"YYYY-MM-dd-HH"
} }'

Source Connector config:

curl -s -X POST -H "Content-Type: application/json"
http://localhost:8083/connectors
-d '{ "name":"sybase-test-12", "config":{ "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:jtds:sybase://xxxxx", "connection.user":"xxx", "connection.password":"xxx", "mode": "timestamp", "timestamp.column.name": "updatedOn", "numeric.mapping":"best_fit", "table.type": "TABLE", "table.whitelist":"TESTREPLICA12", "dialect.name" : "SybaseDatabaseDialect", "topic.prefix": "", "poll.interval.ms": "6000", "table.poll.interval.ms": "6000", "transforms":"createKey,extract", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"WAKArrn", "transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extract.field":"WAKArrn", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://127.0.0.1:8081" } }'

Please advise .

Regards Shane

shanemougham avatar Aug 10 '20 03:08 shanemougham

Env Details:

KAFKA - AWS MSK - 2.2.1 Kafka Connect - 2.12-2.5.0
S3 Connector - "5.5.1" Schema Registry - confluent-5.5.1]

shanemougham avatar Aug 10 '20 04:08 shanemougham

Issue has been resoled by using Confluent/kafka-connect image.

shanemougham avatar Aug 17 '20 02:08 shanemougham

I am seeing the same error as @shanemougham but also the error below that seems related. It seems like there is a missing dependency.

Is there anything else I can share to help troubleshoot? Any suggestions for a work around?

INFO   ||  Opening record writer for: {path}.snappy.parquet   [io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider]
ERROR  ||  WorkerSinkTask{id=***.s3-parquet-connector-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not initialize class org.apache.hadoop.conf.Configuration   [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
	at org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(ParquetWriter.java:345)
	at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
	at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
	at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
	at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:74)
	at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
	at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
ERROR  ||  WorkerSinkTask{id=***.s3-parquet-connector-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.NullPointerException
		at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)
		at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)
		at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249)
		at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:410)
		at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:644)
		at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
		... 7 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
	at org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(ParquetWriter.java:345)
	at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
	at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
	at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
	at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:74)
	at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
	at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
	at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
	at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more

dude0001 avatar May 26 '21 23:05 dude0001

Hi,

As per what I remember, Issue was primarily with s3-connetor related jar was not under the path /usr/share/java as mentioned connect-distributed.properties.

Also check the below config, which is working for myself:

curl -s -X POST -H "Content-Type: application/json"
http://${KAFKA_CONNECT_HOST}:${KAFKA_CONNECT_PORT}/connectors
-d '{ "name": "s3-sink-member-12", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "1", "topics": "topicname", "s3.region": "'${S3_REGION}'", "s3.bucket.name":"'${S3_BUCKET_NAME}'", "s3.part.size": "5242880", "flush.size": "1", "auto.register.schemas": "false", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "schema.compatibility": "FULL", "locale": "AU", "timezone":"'${TIMEZONE}'", "partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner", "rotate.interval.ms":"60000", "timestamp.extractor":"Record", "parquet.codec": "none", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter.schema.registry.url": "http://'${SCHEMA_REGISTRY_HOST}':'${SCHEMA_REGISTRY_PORT}'", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "enhanced.avro.schema.support":"true", "format.class.schemas.enable": "false", "path.format":"YYYY-MM-dd-HH", } }'

Please let me know if it helps. Meanwhile I will search and let me know if I found something else.

Shane

shanemougham avatar May 27 '21 04:05 shanemougham

Thank you for sharing your connector. Do you recall what jar was missing exactly? Should this be included with the plugin?

dude0001 avatar May 27 '21 10:05 dude0001

I'm feeling like our issues are similar but not exactly the same. I'm feeling like there are dependencies missed in https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/pom.xml. It is not my area of expertise either though. Seems like https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common and https://mvnrepository.com/artifact/com.google.guava/guava-base should be dependencies there for this plugin.

dude0001 avatar May 27 '21 16:05 dude0001

Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused.

I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead

java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
--
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3472)
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3476)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946)
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952)
at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.&lt;init&gt;(CodecFactory.java:144)
at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206)
at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189)
at org.apache.parquet.hadoop.ParquetWriter.&lt;init&gt;(ParquetWriter.java:287)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime"

I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well.

I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.

dude0001 avatar May 27 '21 20:05 dude0001

I just hit the same position as @dude0001. Manually adding guava but still not working!

dz902 avatar Aug 07 '21 18:08 dz902

Problem solved. Add corresponding guava then enjoy.

https://stackoverflow.com/questions/68698325/confluent-kafka-s3-sink-connector-throws-java-lang-noclassdeffounderror-com-go/68698326#68698326

dz902 avatar Aug 08 '21 06:08 dz902

Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused.

I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead

java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
--
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3472)
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3476)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946)
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952)
at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.&lt;init&gt;(CodecFactory.java:144)
at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206)
at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189)
at org.apache.parquet.hadoop.ParquetWriter.&lt;init&gt;(ParquetWriter.java:287)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime"

I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well.

I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.

Thanks @dude0001 , this has massively helped. I am deserialising Avro values using the Schema Registry and storing them in S3 as parquet files. I added both guava and failureaccess dependencies plus the Avro converter jar into lib/ and the s3 sink connector is now working like a charm. Happy to explain anybody what I did to get it working.

imonteroq avatar Oct 13 '22 15:10 imonteroq

Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused. I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead

java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
--
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3472)
at com.google.common.cache.LocalCache$LoadingValueReference.&lt;init&gt;(LocalCache.java:3476)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946)
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952)
at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.&lt;init&gt;(CodecFactory.java:144)
at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206)
at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189)
at org.apache.parquet.hadoop.ParquetWriter.&lt;init&gt;(ParquetWriter.java:287)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime" I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well. I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.

Thanks @dude0001 , this has massively helped. I am deserialising Avro values using the Schema Registry and storing them in S3 as parquet files. I added both guava and failureaccess dependencies plus the Avro converter jar into lib/ and the s3 sink connector is now working like a charm. Happy to explain anybody what I did to get it working.

Hi! I'm having the same error, where did you add the guava and failureaccess jars? Inside the lib folder from confluentinc-kafka-connect-avro-converter?

image

I'm still getting the errors:

[Worker-0ac2e6a34584f1276] java.lang.NoClassDefFoundError: com/google/common/base/Ticker
[Worker-0ac2e6a34584f1276] 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)
[Worker-0ac2e6a34584f1276] 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)
[Worker-0ac2e6a34584f1276] 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)
[Worker-0ac2e6a34584f1276] 	at io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)
[Worker-0ac2e6a34584f1276] 	at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)

Or is there any other lib folder I'm not aware? (far from a Java expert here). I'm creating my plugin with the S3 and AVRO plugins:

image

dragondgold avatar Nov 29 '22 21:11 dragondgold

Hi @dragondgold, the lib/ folder inside the s3 sink connector confluentinc-kafka-connect-s3-10.2.2 (this is the version I am using) which is what was failing for me.

imonteroq avatar Nov 30 '22 08:11 imonteroq