kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
S3 Sink Task - Parquet - Error: Java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
Hello Guys,
I am getting below exception while using Kafka S3 Sink Connector to write data as Parquet Format to S3. I am using JDBC source connector to write data to kafka topics in AVRO format.
[2020-08-10 03:39:29,070] ERROR WorkerSinkTask{id=s3-sink-test-21-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: com/google/common/base/Preconditions (org.apache.kafka.connect.runtime.WorkerSinkTask:566)
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
at org.apache.hadoop.conf.Configuration$DeprecationDelta.
Below is config for S3 sink task connector..
{
"name": "s3-sink-test-21",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "TESTREPLICA12",
"s3.region": "ap-southeast-2",
"s3.bucket.name": "xxxxx",
"s3.part.size": "5242880",
"flush.size": "1",
"auto.register.schemas": "false",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.compatibility": "FULL",
"locale": "AU",
"timezone": "EST",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"partition.duration.ms":"86400000",
"rotate.interval.ms":"60000",
"timestamp.extractor":"Record",
"parquet.codec": "none",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://127.0.0.1:8081",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"enhanced.avro.schema.support":"true",
"path.format":"YYYY-MM-dd-HH"
}
}'
Source Connector config:
curl -s -X POST -H "Content-Type: application/json"
http://localhost:8083/connectors
-d '{
"name":"sybase-test-12",
"config":{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": 1,
"connection.url": "jdbc:jtds:sybase://xxxxx",
"connection.user":"xxx",
"connection.password":"xxx",
"mode": "timestamp",
"timestamp.column.name": "updatedOn",
"numeric.mapping":"best_fit",
"table.type": "TABLE",
"table.whitelist":"TESTREPLICA12",
"dialect.name" : "SybaseDatabaseDialect",
"topic.prefix": "",
"poll.interval.ms": "6000",
"table.poll.interval.ms": "6000",
"transforms":"createKey,extract",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"WAKArrn",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field":"WAKArrn",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://127.0.0.1:8081"
}
}'
Please advise .
Regards Shane
Env Details:
KAFKA - AWS MSK - 2.2.1
Kafka Connect - 2.12-2.5.0
S3 Connector - "5.5.1"
Schema Registry - confluent-5.5.1]
Issue has been resoled by using Confluent/kafka-connect image.
I am seeing the same error as @shanemougham but also the error below that seems related. It seems like there is a missing dependency.
Is there anything else I can share to help troubleshoot? Any suggestions for a work around?
INFO || Opening record writer for: {path}.snappy.parquet [io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider]
ERROR || WorkerSinkTask{id=***.s3-parquet-connector-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not initialize class org.apache.hadoop.conf.Configuration [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
at org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(ParquetWriter.java:345)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:74)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
ERROR || WorkerSinkTask{id=***.s3-parquet-connector-4} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.NullPointerException
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)
at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)
at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:410)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:644)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
... 7 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.conf.Configuration
at org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(ParquetWriter.java:345)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:162)
at org.apache.parquet.avro.AvroParquetWriter$Builder.<init>(AvroParquetWriter.java:153)
at org.apache.parquet.avro.AvroParquetWriter.builder(AvroParquetWriter.java:43)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:74)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
Hi,
As per what I remember, Issue was primarily with s3-connetor related jar was not under the path /usr/share/java as mentioned connect-distributed.properties.
Also check the below config, which is working for myself:
curl -s -X POST -H "Content-Type: application/json"
http://${KAFKA_CONNECT_HOST}:${KAFKA_CONNECT_PORT}/connectors
-d '{
"name": "s3-sink-member-12",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "topicname",
"s3.region": "'${S3_REGION}'",
"s3.bucket.name":"'${S3_BUCKET_NAME}'",
"s3.part.size": "5242880",
"flush.size": "1",
"auto.register.schemas": "false",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"schema.compatibility": "FULL",
"locale": "AU",
"timezone":"'${TIMEZONE}'",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"rotate.interval.ms":"60000",
"timestamp.extractor":"Record",
"parquet.codec": "none",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://'${SCHEMA_REGISTRY_HOST}':'${SCHEMA_REGISTRY_PORT}'",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"enhanced.avro.schema.support":"true",
"format.class.schemas.enable": "false",
"path.format":"YYYY-MM-dd-HH",
}
}'
Please let me know if it helps. Meanwhile I will search and let me know if I found something else.
Shane
Thank you for sharing your connector. Do you recall what jar was missing exactly? Should this be included with the plugin?
I'm feeling like our issues are similar but not exactly the same. I'm feeling like there are dependencies missed in https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/pom.xml. It is not my area of expertise either though. Seems like https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common and https://mvnrepository.com/artifact/com.google.guava/guava-base should be dependencies there for this plugin.
Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused.
I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead
java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
--
at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3472)
at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3476)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at com.google.common.cache.LocalCache.get(LocalCache.java:3962)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946)
at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952)
at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:144)
at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206)
at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:287)
at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564)
at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime"
I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well.
I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.
I just hit the same position as @dude0001. Manually adding guava but still not working!
Problem solved. Add corresponding guava
then enjoy.
https://stackoverflow.com/questions/68698325/confluent-kafka-s3-sink-connector-throws-java-lang-noclassdeffounderror-com-go/68698326#68698326
Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused.
I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead
java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess -- at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3472) at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3476) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) at com.google.common.cache.LocalCache.get(LocalCache.java:3962) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946) at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952) at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:144) at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206) at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:287) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime"
I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well.
I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.
Thanks @dude0001 , this has massively helped. I am deserialising Avro values using the Schema Registry and storing them in S3 as parquet files. I added both guava
and failureaccess
dependencies plus the Avro converter jar into lib/
and the s3 sink connector is now working like a charm. Happy to explain anybody what I did to get it working.
Interestingly I found hadoop-annotations-2.7.7.jar, hadoop-auth-2.7.7.jar, hadoop-common-2.7.7.jar are already included in the connector and I see them deployed in my plugin folder. So I started getting more confused. I decided to tackle the com/google/common/base/Preconditions error @shanemougham originally posted first. I got https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar linked from https://github.com/google/guava/releases/tag/v30.1.1, and put that in my plugin folder. I then started to see this error instead
java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess -- at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3472) at com.google.common.cache.LocalCache$LoadingValueReference.<init>(LocalCache.java:3476) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2134) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) at com.google.common.cache.LocalCache.get(LocalCache.java:3962) at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3985) at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4946) at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4952) at org.apache.hadoop.io.compress.CodecPool.updateLeaseCount(CodecPool.java:135) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:162) at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:144) at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:206) at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:189) at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:287) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
It looked like that InternalFutureFailureAccess class was in a project guava-parent. I could not find any .jar file for it or any information. After much digging (I am not a Java guy, major imposter syndrome) I found on https://guava.dev/ "Guava has one dependency that is needed at runtime" I downloaded failureaccess-1.0.1.jar from https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.1/ and deployed this to my plugin folder as well. I am now failing to authenticate to S3, so I think this is progress! No idea if this is the correct workaround. This is just a PoC for me, so I'd use it with caution for production until someone with more knowledge comments.
Thanks @dude0001 , this has massively helped. I am deserialising Avro values using the Schema Registry and storing them in S3 as parquet files. I added both
guava
andfailureaccess
dependencies plus the Avro converter jar intolib/
and the s3 sink connector is now working like a charm. Happy to explain anybody what I did to get it working.
Hi! I'm having the same error, where did you add the guava
and failureaccess
jars? Inside the lib
folder from confluentinc-kafka-connect-avro-converter
?
I'm still getting the errors:
[Worker-0ac2e6a34584f1276] java.lang.NoClassDefFoundError: com/google/common/base/Ticker
[Worker-0ac2e6a34584f1276] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)
[Worker-0ac2e6a34584f1276] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:175)
[Worker-0ac2e6a34584f1276] at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:158)
[Worker-0ac2e6a34584f1276] at io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory.newClient(SchemaRegistryClientFactory.java:36)
[Worker-0ac2e6a34584f1276] at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:68)
Or is there any other lib
folder I'm not aware? (far from a Java expert here). I'm creating my plugin with the S3 and AVRO plugins:
Hi @dragondgold, the lib/
folder inside the s3 sink connector confluentinc-kafka-connect-s3-10.2.2
(this is the version I am using) which is what was failing for me.