kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14250: MirrorSourceTask exception causes the task to fail

Open viktorsomogyi opened this issue 2 years ago • 1 comments

Exception during normal operation in MirrorSourceTask causes the task to fail instead of shutting down gracefully.

In MirrorSourceTask we are loading offsets for the topic partitions. At this point, while we are fetching the partitions, it is possible for the offset reader to be stopped by a parallel thread. Stopping the reader causes a CancellationException to be thrown, due to KAFKA-9051.

Currently this exception is not caught in MirrorSourceTask and so the exception propagates up and causes the task to go into FAILED state. We only need it to go to STOPPED state so that it would be restarted later.

This can be achieved by catching the exception and stopping the task directly.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

viktorsomogyi avatar Sep 21 '22 14:09 viktorsomogyi

@viktorsomogyi I'm not sure I understand the motivation for this change yet.

The only time the offset reader should only be closed while the MirrorSourceTask is still running is when the task is being cancelled because it has been scheduled for shutdown by the Kafka Connect framework, but has not completed that shutdown in time. When tasks are cancelled, we intentionally refrain from emitting statuses for them as there may be new instances of the same task already up and running. So, we shouldn't be seeing inaccurate task statuses as a result of an offset read by a MirrorSourceTask being interrupted during startup.

Even when a task is cancelled, we make sure to invoke Task::stop on it as soon as we're able to, which should allow the task to clean up its resources.

I'm also unclear on what the ideal behavior is with regards to FAILED vs. STOPPED; it sounds like there's an expectation that the task gets automatically restarted without user intervention (e.g., hitting the /connectors/{connector}/tasks/{task}/restart endpoint). Can you shed some light on what kind of user-facing behavior you expect to see with Kafka Connect / MirrorMaker 2 with this change?

C0urante avatar Sep 21 '22 15:09 C0urante

@C0urante this problem arises when stop is called really fast, right after calling start when that is still running. I've attached a log sample to the case but the exception we're getting is this:

2022-08-22 17:54:46,544 ERROR org.apache.kafka.connect.runtime.WorkerTask: WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
	at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
	at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
	at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:234)
	at org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:229)
	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1580)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:229)
	at org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:99)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:213)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so.
	at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
	... 21 common frames omitted

In this case the task was getting stopped too fast due to a rebalance I believe when it was still starting up, hence we tried to solve this by propagating it the way shown it the PR and handle it as a CancellationException to allow the task to being able to be cancelled and restarted gracefully. In this case I'd expect it to be restarted automatically since the problem is caused by the threading design of the herder as I understand.

viktorsomogyi avatar Sep 27 '22 15:09 viktorsomogyi

This kind of failure shouldn't persist across rebalances; we don't take task status (running vs. failed vs. unassigned) into account after rebalances when starting/restarting tasks. If the task is scheduled to be shut down because of a rebalance, it should be restarted after the rebalance, regardless of whether it failed during shutdown.

In addition, catching this exception wouldn't do anything to keep the task running--by the time the offset reader is closed, the task has been cancelled, and as soon as it returns control to the framework, will be shut down.

C0urante avatar Sep 27 '22 15:09 C0urante

To me it seems like it persists. Let me explain what I see in my logs, maybe I'm in the wrong here. So first let's start when it stops the task during the frequent rebalances:

2022-08-22 17:54:44,793 INFO org.apache.kafka.connect.runtime.distributed.WorkerCoordinator: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Rebalance started                                                                                                            
2022-08-22 17:54:44,793 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] (Re-)joining group                                                                                                          
2022-08-22 17:54:44,822 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Successfully joined group with generation 64                                                                                
2022-08-22 17:54:44,823 INFO org.apache.kafka.connect.runtime.Worker: Stopping task MirrorSourceConnector-0                                                                                                                                                                   
2022-08-22 17:54:44,823 INFO org.apache.kafka.connect.runtime.Worker: Stopping task MirrorSourceConnector-1                                                                                                                                                                   
2022-08-22 17:54:44,823 INFO org.apache.kafka.connect.runtime.Worker: Stopping connector MirrorSourceConnector

Then after committing offsets etc. the connector is stopped:

2022-08-22 17:54:44,836 INFO org.apache.kafka.connect.runtime.Worker: Stopped connector MirrorSourceConnector

Then it finishes stopping tasks:

2022-08-22 17:54:44,839 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Finished stopping tasks in preparation for rebalance                                                                         
2022-08-22 17:54:44,842 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Finished flushing status backing store in preparation for rebalance

Then it revokes the MirrorSourceConnector-0 and MirrorSourceConnector-1 ids. After this onAssigned is called in DistributedHerder and starts the rebalance.

2022-08-22 17:54:44,843 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Starting connectors and tasks using config offset 921                                                                        
2022-08-22 17:54:44,843 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Finished starting connectors and tasks                                                                                       
2022-08-22 17:54:44,843 INFO org.apache.kafka.connect.runtime.distributed.WorkerCoordinator: [Worker clientId=connect-8, groupId=bdkfkwx006-mm2] Rebalance started

2 seconds later it runs into an error:

2022-08-22 17:54:46,135 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-14, groupId=bdkfkwx006-mm2] Finished starting connectors and tasks                                                                                      
2022-08-22 17:54:46,542 ERROR org.apache.kafka.connect.runtime.Worker: Graceful stop of task MirrorSourceConnector-0 failed.                                                                                                                                                  
2022-08-22 17:54:46,542 ERROR org.apache.kafka.connect.runtime.Worker: Graceful stop of task MirrorSourceConnector-1 failed.                                                                                                                                                  
2022-08-22 17:54:46,543 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-7, groupId=kfkhwshared00-mm2] Finished stopping tasks in preparation for rebalance                                                                      
2022-08-22 17:54:46,543 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-7, groupId=kfkhwshared00-mm2] Finished flushing status backing store in preparation for rebalance

Then it finishes starting connectors and tasks but also throws the exception above:

2022-08-22 17:54:46,543 INFO org.apache.kafka.connect.runtime.distributed.DistributedHerder: [Worker clientId=connect-7, groupId=kfkhwshared00-mm2] Finished starting connectors and tasks                                                                                    
2022-08-22 17:54:46,543 ERROR org.apache.kafka.connect.storage.OffsetStorageReaderImpl: Failed to fetch offsets from namespace MirrorSourceConnector:

And then a few times I see the mentioned error and traces of rebalancing:

2022-08-22 17:54:46,543 INFO org.apache.kafka.connect.runtime.distributed.WorkerCoordinator: [Worker clientId=connect-7, groupId=kfkhwshared00-mm2] Rebalance started                                                                                                         
2022-08-22 17:54:46,543 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Worker clientId=connect-7, groupId=kfkhwshared00-mm2] (Re-)joining group                                                                                                       
2022-08-22 17:54:46,543 ERROR org.apache.kafka.connect.storage.OffsetStorageReaderImpl: Failed to fetch offsets from namespace MirrorSourceConnector:                                                                                                                         
│org.apache.kafka.connect.errors.ConnectException: Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so.

And finally it the exception is propagated up to the WorkerTask level and it won't be restarted (until 2 days later where I think they restarted it manually)

2022-08-22 17:54:46,544 ERROR org.apache.kafka.connect.runtime.WorkerTask: WorkerSourceTask{id=MirrorSourceConnector-1} Task is being killed and will not recover until manually restarted

In all honesty I couldn't yet reproduce this because it seemed more complex but if you'd like to see a reproduction, I can try and come back with a setup that showcases this. Or do you have anything in mind that we could do to make this part of the code more robust?

viktorsomogyi avatar Sep 28 '22 16:09 viktorsomogyi

That's interesting--were you running a dedicated MM2 cluster, or were you deploying MM2 as a connector on a vanilla Kafka Connect cluster? And if the latter, were you able to read the status of the connector and its tasks via the REST API before restarting them?

C0urante avatar Sep 28 '22 16:09 C0urante

@C0urante in fact this was as escalation that our customer brought to us, they were using a dedicated MM2 cluster. In the meantime during another case with the customer we figured out that KAFKA-9851 could have been the real cause and the fact that MM2 prints that error message is just red herring. Regardless, it would be good to improve the error messages to include that the kind of failure experienced here won't persist between rebalances. What do you think?

viktorsomogyi avatar Oct 27 '22 11:10 viktorsomogyi

I'm not opposed to improving error messages, but I'd like to ask again which version of Kafka Connect / MirrorMaker 2 you were running when you encountered this issue. We've already made some improvements to our error messages in https://github.com/apache/kafka/pull/10503, which may help in this case.

C0urante avatar Oct 28 '22 09:10 C0urante