kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Prevent clearing topic-partitions that are still assigned
Problem
To decrease the impact of rebalances during rolling bounces of k8s pods, we changed the partition.assignment.strategy
from the default RangeAssignor
to CooperativeStickyAssignor
. After this change we encountered NPEs and the S3SinkTask
goes into an unrecoverable state. We did not find the same issue with StickyAssignor
however.
Example of an NPE (this is with v10.0.7
):
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask\
\ due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\
\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\
\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\
\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\t\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\
\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\
\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\
\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\
\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\
\t... 10 more\n"
Solution
WorkerSinkTask
always sends a list of topicPartitions
on close
. We currently clear all the assigned topicPartitionWriter
s on close()
. This worked fine with stop-the-world rebalance strategies like RangeAssignor
or StickyAssignor
, since the current assignment would be fully closed. But with CooperativeStickyAssignor
only a few topicPartition
s could be reassigned/closed. In such a scenario clearing out all topicPartitionWriter
s is causing NPEs.
I am not sure if there is some historical context that I might be missing here and the .clear()
is deliberate, could not find clues from commit history.
Test Strategy
Testing done:
Did not specifically write any tests for this case, nor am I aware of any existing tests that test assignment strategies. Open to ideas on any necessary tests. We have applied this path for the past few days and dont see the same degradation.
- [ ] Unit tests
- [ ] Integration tests
- [ ] System tests
- [ ] Manual tests
Jenkins job failing ITs because of IAM perm issues.
https://jenkins.public.confluent.io/job/kafka-connect-storage-cloud/job/PR-648/2/console