Brighter icon indicating copy to clipboard operation
Brighter copied to clipboard

[Feature] Kafka: Support Retry with Delay and DLQ

Open iancooper opened this issue 3 years ago • 5 comments

Is your feature request related to a problem? Please describe. Kafka has no delay queue feature. So if processing a message results in a transient error, there is no way to delay and retry later.

Kafka preserves ordering. Indeed, series data by its nature is often ordered. When you requeue to retry, you break any ordering as the message will now be serviced out-of-order. So Kafka will not do this "out-of-the-box".

This does not matter if you do not care about ordering, such as for independent events (this is discrete not series data so Kafka is not the best fit-but you may be constrained to using 'just one' broker), or because you adopt a “latest timestamp wins “strategy” (although in this case Kafka provides accidental complexity as if you do not need strict ordering, why do you need partitioning-although random access to the stream may still be a needed use-case)

Describe the solution you'd like For a given topic ask for the retry intervals. Create topics for those intervals. Delay the reading of those topics until the delay has passed.

Then we can write a factory to configure Kafka with retry queues. You give us a source queue, and a set of delay queues and we create the additional required topics (or you create them and indicate that) and we move between queues on a requeue with delay.

In your last delay queue we should Nack to the DLQ if we cannot process.

Describe alternatives you've considered An alternative would be to support some kind of control bus delay queue, perhaps backed by Redis and use this for all requeues, but this would tend to mean we would be inserting back on to the original queue and given the nature of Kafka's immutable queues, this might not make as much sense. In addition, there seems to be expectation that this is 'how this works'

Additional context None

iancooper avatar May 24 '21 08:05 iancooper

hi @iancooper, may I know if you are still working on this ?

honkuan86 avatar Sep 15 '22 07:09 honkuan86

I have paused it for other priorities but will return to it.

It’s fairly ease to support yourself by writing a middleware handler that on receipt of a DeferMessageAction posts to another topic (name in the attribute parameters) which you poll at a greater interval.

On 15 Sep 2022, at 08:47, Yong Hon Kuan @.***> wrote:

 hi @iancooper, may I know if you are still working on this ?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.

iancooper avatar Sep 15 '22 16:09 iancooper

It turns out that polling at an interval is less easy than you might think in Kafka because of timeouts for the consumer. It is better to use a strategy based on timestamps, but that takes a little bit of work: https://copyprogramming.com/howto/how-to-achieve-delayed-queue-with-apache-kafka

It is probably worth splitting out DLQ and just using DQL on a Nack as an approach, over DLQ on retry failure

iancooper avatar Apr 03 '23 11:04 iancooper

I have amended this to be support Retry only

iancooper avatar Apr 03 '23 12:04 iancooper

So common wisdom for Kafka is to use requeue with delay topics, see Uber's blog post for example.

Relying on Kafka to store requests that need to be requeued after a delay reduces the need for additional infrastructure, but it comes with Kafka problems such as needing to process the delayed items sequentially, waiting for the next item to cross the threshold of ready to requeue

Now that needs us to have the message that we want to requeue. But Brighter has a request at this point. Now that suggests we would need to get message out of a store by id, and then send it. Now this could be global and in-memory, but if the message is in a store, why not use the approach of Outbox, and simply write it there with the time to requeue it, and have a sweeper pick up anything that needs to requeue, and bulk send it back on to the original topic.

To avoid infrastructure explosion we note that many consumers may already have an Inbox. Now the Inbox stores commands for us and does so via the pipeline if you declare it. But we could add another table to the Inbox that stores incoming messages. We would alter the pump to store the message that has been read from the channel (in-memory if you don't give us an inbox).

Then a requeuer can operate on a separate process/thread.

We would probably be able to delete asynchronously on an ack i.e. you don't need the message you received now you have processed it. Once you requeue, you increment the number of requeus. When you hit a threshold, you end up sending to the dlq. The dlq work also needs the message, so it would also need this #2592 to grab the message.

This then takes us back to the idea of requeue then dlq so we can close #2592 and just keep this one open.

iancooper avatar Apr 06 '23 10:04 iancooper