kafka
kafka copied to clipboard
KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
- https://issues.apache.org/jira/browse/KAFKA-13952: RetryWithToleranceOperator doesn't respect infinite retries config - i.e. when
errors.retry.timeout
is set to-1
- From https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect:
errors.retry.timeout: [-1, 0, 1, ... Long.MAX_VALUE], where -1 means infinite duration.
- Also, from the config doc: https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129-L130
- This PR fixes the issue in
RetryWithToleranceOperator
along with a couple of unit tests to ensure the same
@C0urante @tombentley could you please take a look at this simple bug fix?
Thanks for the review @C0urante and sorry about the late reply. That's a very astute observation regarding tasks potentially getting stuck in infinite loops - you're right in that the task thread is never interrupted even if the scheduled shutdown times out.
can you see about adding some logic to break out of retry loops here if the task is cancelled
Any pointers on how we could go about doing that? Some ideas I had in mind were around making the WorkerTask
and its method isCancelled()
both public, and when we instantiate RetryWithToleranceOperator
in the Worker
class - either configure it with its associated WorkerTask
(i.e. there will be a circular reference since the WorkerTask
also references the RetryWithToleranceOperator
instance) or configure it with a method reference to the associated WorkerTask
's isCancelled()
method. This way in the retry loop we can check the return value for isCancelled()
in addition to (or instead of) the thread's interrupt status. This doesn't seem like a particularly clean solution due to the additional coupling introduced so I'd be happy to hear if you had any better ideas!
I think a Supplier<Boolean> isCancelled
would be fine to check if the operator should prematurely exit. One other thing worth accounting for is the backoff logic, which currently uses Time::sleep
to wait in between retries. We could replace those calls with CountDownLatch::await
, and when the task is cancelled, invoke CountDownLatch::countDown
so that any in-progress calls to await
return early.
I think a Supplier<Boolean> isCancelled would be fine to check if the operator should prematurely exit
Makes sense, I've implemented this with a test.
One other thing worth accounting for is the backoff logic, which currently uses Time::sleep to wait in between retries. We could replace those calls with CountDownLatch::await, and when the task is cancelled, invoke CountDownLatch::countDown so that any in-progress calls to await return early.
Why do we need to do this? If the task has been cancelled and basically abandoned by the worker, it shouldn't really matter if the backoff takes up to errorMaxDelayInMillis
right?
Why do we need to do this? If the task has been cancelled and basically abandoned by the framework, it shouldn't really matter if the backoff takes up to
errorMaxDelayInMillis
right?
We should do everything in our power to shut down the task once it's been cancelled in order to release resources allocated for the task by invoking Task::stop
, closing the Kafka clients created for it, etc.
Since users can configure the maximum error delay, there's no guarantees about how long the task might stick around after it's been cancelled if we don't find a way to interrupt it during backoff.
Thanks @C0urante, that makes sense. I think we can maintain a Map<ConnectorTaskId, CountDownLatch>
in the Worker
class (let's say taskCancelLatch
) which gets populated in the task builder (with the CountDownLatch
having a count of 1). The task's cancel latch could then be passed to its RetryWithToleranceOperator
which could call await
on it with a timeout in the backoff method instead of Time::sleep
like you mentioned. The task cancel latch is counted down in Worker::awaitStopTask
in the if (!task.awaitStop(timeout))
block. However, a number of unit tests for RetryWithToleranceOperator
rely on MockTime
to verify whether the operator's retry and backoff mechanism is working as expected. Any ideas on how we could do this instead when using CountDownLatch::await
with a timeout? One possible idea could be to expose a couple of retry variables for testing which would be used to determine number of retries and total retry duration, WDYT?
Why construct the latches in Worker
? Wouldn't it be cleaner to create them in the RetryWithToleranceOperator
constructor and expose a method in that class to count down the latch, which we could then invoke from tasks inside their cancel
method?
As far as testing goes, can we just mock the latch and leave everything else basically as-is? We should continue to use the Time
interface for everything except sleeping, so all the existing invocations of Time::milliseconds
can stay.
Why construct the latches in Worker? Wouldn't it be cleaner to create them in the RetryWithToleranceOperator constructor and expose a method in that class to count down the latch, which we could then invoke from tasks inside their cancel method?
That's a fair point, I guess I was coming from the point of view of implementing a generic solution which could potentially be re-used by other operations that require signalling when a task is cancelled (i.e by awaiting on the task's cancel latch). But I think since this RetryWithToleranceOperator
seems to be the only one where such a thing would be useful right now, I agree that your proposed solution is cleaner.
As far as testing goes, can we just mock the latch and leave everything else basically as-is? We should continue to use the Time interface for everything except sleeping, so all the existing invocations of Time::milliseconds can stay.
The MockTime::sleep
method advances its internal time counter which is then used in the tests to make assertions on how much time has elapsed.
Edit: Ah, I guess we could make assertions on the number of times await
was called on the countdown latch and with what timeouts. I think that's what you were referring to earlier?
Yeah, we can make assertions on when/how often/with which arguments await
is called. If necessary, we can also manually advance the mock time by invoking sleep
on it in our testing logic when the mocked latch is waited upon, which should cause other calls to Time::milliseconds
to reflect the waiting that we would have done on the latch.
Thanks for the detailed reviews and for bearing with me through multiple rounds of review on this one Chris!