kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Getting NullPointerException at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)
Hi guys,
Currently, we have a Kafka topic called maxwell which has around 10 Million records lag over 200 partitions. It seems Kafka connect S3 sinker keeps throws NPE during the bootstrap (Running state end up to Failed state in several secs). Can someone help plz, NPE is not an useful exception to help us debug the issue.
Also, restart api seems not functional as well. when I tried to use restful api to restart the failed task by calling
curl -X POST localhost:8083/connectors/maxwell-s3-connector-test/restart
. It seems it only restart one consumer worker, rest of workers has no response ....
Here is status topic data:
.... ect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.16.219:8083","generation":26} {"state":"UNASSIGNED","trace":null,"worker_id":"172.31.16.219:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.16.219:8083","generation":26}
I was expecting it would start 10 consumers(workers) instead ? (task.max = 10). The workaround we have to do is to delete the kafka connector and recreate one ....
-
The error stack trace from status topic:
{"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.28.239:8083","generation":24}
-
Kafka connect version:
confluentinc/cp-kafka-connect:5.0.0
-
Kafka s3 connector config:
"config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": 10, "topics": "maxwell", "flush.size": 10000, "rotate.schedule.interval.ms": 1800000, "s3.part.size": 26214400, "s3.region": "us-east-1", "s3.bucket.name": "maxwell-connect-dev", "s3.credentials.provider.class": "com.amazonaws.auth.EnvironmentVariableCredentialsProvider", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat", "format.bytearray.separator": "\n", "schema.compatibility": "NONE", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "timestamp.extractor": "Wallclock", "partition.duration.ms": 3600000, "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH", "locale": "en-US", "timezone": "UTC" }
-
Some clarification about the config
- "flush.size": 10000 and "rotate.schedule.interval.ms": 1800000. S3 is not a streaming service and it has request limit, so we would like to batch more records and then commit to s3.
Not sure the immediate issue, but would it be possible to get debugging enabled like I did here - https://github.com/confluentinc/kafka-connect-storage-common/issues/91
Your stacktrace exactly aligns with that and would like to know why your config would be causing it since you didn't rename a topic like in the other post.
RE https://github.com/confluentinc/kafka-connect-storage-common/issues/91#issuecomment-444734003 I think you meant to respond here
I've been filling in working example stacks here over time - https://github.com/cricket007/kafka-connect-sandbox
Minio is a suitable replacement. Doesn't need to run on AWS
I am hitting this issue in my setup
I'm getting this same exception but when I try to use transforms.
Without it it works just fine, but as soon I added the following to the connector configuration I got NPE.
transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=.*
transforms.dropPrefix.replacement=prefix_$0
Or
transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=avro_(.*)
transforms.dropPrefix.replacement=$1
Or
transforms=KeyTopic
transforms.KeyTopic.type=io.confluent.connect.transforms.ExtractTopic$Key
Exception (it's always the same for all examples above):
[2019-11-27 16:10:18,826] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2019-11-27 16:10:18,827] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
[2019-11-27 16:10:18,828] ERROR WorkerSinkTask{id=avro_writer-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
Hi, same issue as @msilvestre ... I'm using the io.confluent.connect.s3.S3SinkConnector
and my tranform config looks like this:
"transforms": "AddSuffix",
"transforms.AddSuffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddSuffix.regex": ".*",
"transforms.AddSuffix.replacement": "$0_bak"
Hi @dongxiaohe, it looks like the NPE comes from the topic partition not being found - was there any topic or topic partition manipulation done?
@aakashnshah the RegexRouter
is changing the topic name dynamically but this should work as in other connectors
@aakashnshah We're having a similar issue with the NPE when modifying the topic name before it goes to the S3SinkConnector. Our expectation is that the S3SinkConnector would just translate the new topic as part of the path that should be getting written too. Is it actually more nuanced than that? Is there more validation happening where it expects that topic to actually exist?
I can confirm that we also only have this on the transform... it seems like it might be specific to s3/cloud-storage connector, the same setup for JDBCSinkConnector works for routing to different tables as this would be to route to specific folders. The stack trace is pretty awful, so i'm wondering if is actually catch bug that's obfuscating the underlying stack trace.
"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value"
"transforms.ExtractTopic.field": "eventType"
"transforms.ExtractTopic.skip.missing.or.null": true
It looks like there are some writers initialized on open based upon the context of the connector source topics, and when the records are being written there are no writers initialized based upon the records after the mapping.
I also notice they dropped the assignment model in the initialization in recent versions, perhaps the below would be a solution?
Initialization
@Override
public void open(Collection<TopicPartition> partitions) {
// assignment should be empty, either because this is the initial call or because it follows
// a call to "close".
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
topicPartitionWriters.put(tp, newTopicPartitionWriter(tp));
}
}
Usage
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
for (SinkRecord record : records) {
String topic = record.topic();
int partition = record.kafkaPartition();
TopicPartition tp = new TopicPartition(topic, partition);
// this won't handle a rename since topic partition writers map is built of sources
topicPartitionWriters.get(tp).buffer(record);
}
...
}
Potential Solution
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
for (SinkRecord record : records) {
String topic = record.topic();
int partition = record.kafkaPartition();
TopicPartition tp = new TopicPartition(topic, partition);
// will this work???
TopicPartitionWriter writer = topicPartitionWriters.get(tp);
if (writer == null) {
writer = newTopicPartitionWriter(tp);
assignment.add(tp);
topicPartitionWriters.put(tp, writer);
}
writer.buffer(record);
}
...
}
I tried this solution on local and it didn't work... and it seems for good reason as the writer uses buffered writes per topic, so splitting/routing from one to one seems possible since it's one consumer but one to multiple (our use case) seems like a more fundamental change since there would be multiple file buffers per consumer.
In our case, we're using filters with multiple connectors instead and changing the parent directory.
Same issue as @msilvestre and others here. Regex router on s3 sink makes it go NPE.
(Not sure if it's the same cause as @dongxiaohe, though. Shouldn't this issue be splitted ?)
I've found the cause for this NPE and I'm working on a fix
Whenever we apply transformers to a topic it looses the ability to interface with the source topic/partition, thus we need to keep track of it.
here https://github.com/fmeyer/kafka-connect-storage-cloud/commit/8448b5954a6f359d8c25834739ef18f64ae125ff
PR for the transforms issue is here #480 @pcfleischer @mdespriee let me know if you can try it on your cases.
Hi @fmeyer I had compiled your code in my local and copied the .jar file to my plugins directory, then i had run the connector with the 'RegexRouter' transformation , it is working as expected , i can see the topic name getting changed.
But not sure why this PR is not merged , are there any unknown issues if we merge this changes ?