kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

Open yashmayya opened this issue 2 years ago • 1 comments

  • https://issues.apache.org/jira/browse/KAFKA-13952: RetryWithToleranceOperator doesn't respect infinite retries config - i.e. when errors.retry.timeout is set to -1
  • From https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect:

errors.retry.timeout: [-1, 0, 1, ... Long.MAX_VALUE], where -1 means infinite duration.

  • Also, from the config doc: https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129-L130
  • This PR fixes the issue in RetryWithToleranceOperator along with a couple of unit tests to ensure the same

yashmayya avatar Aug 03 '22 14:08 yashmayya

@C0urante @tombentley could you please take a look at this simple bug fix?

yashmayya avatar Aug 06 '22 11:08 yashmayya

Thanks for the review @C0urante and sorry about the late reply. That's a very astute observation regarding tasks potentially getting stuck in infinite loops - you're right in that the task thread is never interrupted even if the scheduled shutdown times out.

can you see about adding some logic to break out of retry loops here if the task is cancelled

Any pointers on how we could go about doing that? Some ideas I had in mind were around making the WorkerTask and its method isCancelled() both public, and when we instantiate RetryWithToleranceOperator in the Worker class - either configure it with its associated WorkerTask (i.e. there will be a circular reference since the WorkerTask also references the RetryWithToleranceOperator instance) or configure it with a method reference to the associated WorkerTask's isCancelled() method. This way in the retry loop we can check the return value for isCancelled() in addition to (or instead of) the thread's interrupt status. This doesn't seem like a particularly clean solution due to the additional coupling introduced so I'd be happy to hear if you had any better ideas!

yashmayya avatar Aug 19 '22 11:08 yashmayya

I think a Supplier<Boolean> isCancelled would be fine to check if the operator should prematurely exit. One other thing worth accounting for is the backoff logic, which currently uses Time::sleep to wait in between retries. We could replace those calls with CountDownLatch::await, and when the task is cancelled, invoke CountDownLatch::countDown so that any in-progress calls to await return early.

C0urante avatar Aug 19 '22 13:08 C0urante

I think a Supplier<Boolean> isCancelled would be fine to check if the operator should prematurely exit

Makes sense, I've implemented this with a test.

One other thing worth accounting for is the backoff logic, which currently uses Time::sleep to wait in between retries. We could replace those calls with CountDownLatch::await, and when the task is cancelled, invoke CountDownLatch::countDown so that any in-progress calls to await return early.

Why do we need to do this? If the task has been cancelled and basically abandoned by the worker, it shouldn't really matter if the backoff takes up to errorMaxDelayInMillis right?

yashmayya avatar Aug 19 '22 16:08 yashmayya

Why do we need to do this? If the task has been cancelled and basically abandoned by the framework, it shouldn't really matter if the backoff takes up to errorMaxDelayInMillis right?

We should do everything in our power to shut down the task once it's been cancelled in order to release resources allocated for the task by invoking Task::stop, closing the Kafka clients created for it, etc.

Since users can configure the maximum error delay, there's no guarantees about how long the task might stick around after it's been cancelled if we don't find a way to interrupt it during backoff.

C0urante avatar Aug 19 '22 16:08 C0urante

Thanks @C0urante, that makes sense. I think we can maintain a Map<ConnectorTaskId, CountDownLatch> in the Worker class (let's say taskCancelLatch) which gets populated in the task builder (with the CountDownLatch having a count of 1). The task's cancel latch could then be passed to its RetryWithToleranceOperator which could call await on it with a timeout in the backoff method instead of Time::sleep like you mentioned. The task cancel latch is counted down in Worker::awaitStopTask in the if (!task.awaitStop(timeout)) block. However, a number of unit tests for RetryWithToleranceOperator rely on MockTime to verify whether the operator's retry and backoff mechanism is working as expected. Any ideas on how we could do this instead when using CountDownLatch::await with a timeout? One possible idea could be to expose a couple of retry variables for testing which would be used to determine number of retries and total retry duration, WDYT?

yashmayya avatar Aug 23 '22 13:08 yashmayya

Why construct the latches in Worker? Wouldn't it be cleaner to create them in the RetryWithToleranceOperator constructor and expose a method in that class to count down the latch, which we could then invoke from tasks inside their cancel method?

As far as testing goes, can we just mock the latch and leave everything else basically as-is? We should continue to use the Time interface for everything except sleeping, so all the existing invocations of Time::milliseconds can stay.

C0urante avatar Aug 23 '22 15:08 C0urante

Why construct the latches in Worker? Wouldn't it be cleaner to create them in the RetryWithToleranceOperator constructor and expose a method in that class to count down the latch, which we could then invoke from tasks inside their cancel method?

That's a fair point, I guess I was coming from the point of view of implementing a generic solution which could potentially be re-used by other operations that require signalling when a task is cancelled (i.e by awaiting on the task's cancel latch). But I think since this RetryWithToleranceOperator seems to be the only one where such a thing would be useful right now, I agree that your proposed solution is cleaner.

As far as testing goes, can we just mock the latch and leave everything else basically as-is? We should continue to use the Time interface for everything except sleeping, so all the existing invocations of Time::milliseconds can stay.

The MockTime::sleep method advances its internal time counter which is then used in the tests to make assertions on how much time has elapsed.

Edit: Ah, I guess we could make assertions on the number of times await was called on the countdown latch and with what timeouts. I think that's what you were referring to earlier?

yashmayya avatar Aug 23 '22 16:08 yashmayya

Yeah, we can make assertions on when/how often/with which arguments await is called. If necessary, we can also manually advance the mock time by invoking sleep on it in our testing logic when the mocked latch is waited upon, which should cause other calls to Time::milliseconds to reflect the waiting that we would have done on the latch.

C0urante avatar Aug 23 '22 17:08 C0urante

Thanks for the detailed reviews and for bearing with me through multiple rounds of review on this one Chris!

yashmayya avatar Sep 08 '22 14:09 yashmayya