spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Configure if message goes to DLQ after retries conditionally, based on MessageHeaders and thrown Exception

Open santunioni opened this issue 10 months ago • 6 comments
trafficstars

Expected Behavior

I should be able to decide if message goes to DLT based on MessageHeaders predicate. I want to provide classes that implement something like the interface

@FunctionalInterface
public interface DltStrategy {
	boolean shouldSendToDLTAfterRetries(MessageHeaders headers, Exception exception);
}

Current Behavior

I can only configure the DLT decisions as static:

  • NO_DLT
  • ALWAYS_RETRY_ON_ERROR
  • FAIL_ON_ERROR

Context

We do heavy testing on production here. In my case, I don't want messages with header isTest=true to go to DLT, because once in a while manual testing produce invalid messages, that we can't process.

Note I want messages to be retryed, but be discarded after, not going to DLTs. I am not aware of any workaround.

santunioni avatar Jan 08 '25 23:01 santunioni

@santunioni The DltStrategy you mention is an enum provided by the framework that is expected to be used with the @RetrableTopic for non-blocking retries. I want to ensure that this is the case and that you use non-blocking retries. In that case, have you looked into using a DltHandler method and tried to inject the @Header to retrieve the header value and make the DLT decision?

See these: https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html https://stackoverflow.com/a/69302622/2070861

sobychacko avatar Jan 09 '25 00:01 sobychacko

I want to ensure that this is the case and that you use non-blocking retries.

Yes. That is my case. I am using non-blocking retries.

However, after retrying N times by consuming and producing to topic <main-topic-name>-retry, the message goes to a DLQ topic not controlled by Spring. I reprocess DLQs by moving messages to the retry topic again, and this is fixed by my company tooling.

I don't know if the @DltHandler would work for me, when the system don't poll from DLT directly.

santunioni avatar Jan 09 '25 03:01 santunioni

Could I implement the behavior with KafkaListenerErrorHandler? I think the documentation suggest it, but not sure.

santunioni avatar Jan 09 '25 13:01 santunioni

I guess we need to understand the issue a bit further. Do you have a small sample where the issue can be reproduced?

sobychacko avatar Jan 09 '25 14:01 sobychacko

Well, sounds like you are over complicating. So, what is to not send to DLT conditionally? Mostly it means process the record as normal: if no exception, then nothing to send to retry topic and so on. Perhaps you indeed can use that KafkaListenerErrorHandler to decide if to re-throw an exception for the record or not. You may give it a chance to go through some retry topics cycle, but when it is done enough just swallow an exception for the record with that specific header.

I don't think we would need some extra contract to introduce if that is really possible with existing API.

Please, give it a chance and let us know!

artembilan avatar Jan 09 '25 21:01 artembilan

Hi, I think I have the same issue. How to filter which message goes or not to the DLT ? For example, I don't want to publish messages corresponding to business exceptions to the DLT. Is there a way to handle this ? Thank you for your help.

idadev avatar Feb 22 '25 23:02 idadev