kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Prevent clearing topic-partitions that are still assigned

Open SatyaKuppam opened this issue 1 year ago • 2 comments

Problem

To decrease the impact of rebalances during rolling bounces of k8s pods, we changed the partition.assignment.strategy from the default RangeAssignor to CooperativeStickyAssignor. After this change we encountered NPEs and the S3SinkTask goes into an unrecoverable state. We did not find the same issue with StickyAssignor however.

Example of an NPE (this is with v10.0.7):

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask\
      \ due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\
      \tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\
      \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\t\
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\
      \tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\
      \tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\
      \tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\
      \tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\
      \t... 10 more\n"

Solution

WorkerSinkTask always sends a list of topicPartitions on close. We currently clear all the assigned topicPartitionWriters on close(). This worked fine with stop-the-world rebalance strategies like RangeAssignor or StickyAssignor, since the current assignment would be fully closed. But with CooperativeStickyAssignor only a few topicPartitions could be reassigned/closed. In such a scenario clearing out all topicPartitionWriters is causing NPEs.

I am not sure if there is some historical context that I might be missing here and the .clear() is deliberate, could not find clues from commit history.

Test Strategy

Testing done:

Did not specifically write any tests for this case, nor am I aware of any existing tests that test assignment strategies. Open to ideas on any necessary tests. We have applied this path for the past few days and dont see the same degradation.

  • [ ] Unit tests
  • [ ] Integration tests
  • [ ] System tests
  • [ ] Manual tests

SatyaKuppam avatar May 05 '23 19:05 SatyaKuppam

Jenkins job failing ITs because of IAM perm issues. Screenshot 2023-05-05 at 4 34 07 PM

https://jenkins.public.confluent.io/job/kafka-connect-storage-cloud/job/PR-648/2/console

SatyaKuppam avatar May 05 '23 20:05 SatyaKuppam