spring-kafka
spring-kafka copied to clipboard
Configure if message goes to DLQ after retries conditionally, based on MessageHeaders and thrown Exception
Expected Behavior
I should be able to decide if message goes to DLT based on MessageHeaders predicate. I want to provide classes that implement something like the interface
@FunctionalInterface
public interface DltStrategy {
boolean shouldSendToDLTAfterRetries(MessageHeaders headers, Exception exception);
}
Current Behavior
I can only configure the DLT decisions as static:
- NO_DLT
- ALWAYS_RETRY_ON_ERROR
- FAIL_ON_ERROR
Context
We do heavy testing on production here. In my case, I don't want messages with header isTest=true to go to DLT, because once in a while manual testing produce invalid messages, that we can't process.
Note I want messages to be retryed, but be discarded after, not going to DLTs. I am not aware of any workaround.
@santunioni The DltStrategy you mention is an enum provided by the framework that is expected to be used with the @RetrableTopic for non-blocking retries. I want to ensure that this is the case and that you use non-blocking retries. In that case, have you looked into using a DltHandler method and tried to inject the @Header to retrieve the header value and make the DLT decision?
See these: https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html https://stackoverflow.com/a/69302622/2070861
I want to ensure that this is the case and that you use non-blocking retries.
Yes. That is my case. I am using non-blocking retries.
However, after retrying N times by consuming and producing to topic <main-topic-name>-retry, the message goes to a DLQ topic not controlled by Spring. I reprocess DLQs by moving messages to the retry topic again, and this is fixed by my company tooling.
I don't know if the @DltHandler would work for me, when the system don't poll from DLT directly.
Could I implement the behavior with KafkaListenerErrorHandler? I think the documentation suggest it, but not sure.
I guess we need to understand the issue a bit further. Do you have a small sample where the issue can be reproduced?
Well, sounds like you are over complicating.
So, what is to not send to DLT conditionally?
Mostly it means process the record as normal: if no exception, then nothing to send to retry topic and so on.
Perhaps you indeed can use that KafkaListenerErrorHandler to decide if to re-throw an exception for the record or not.
You may give it a chance to go through some retry topics cycle, but when it is done enough just swallow an exception for the record with that specific header.
I don't think we would need some extra contract to introduce if that is really possible with existing API.
Please, give it a chance and let us know!
Hi, I think I have the same issue. How to filter which message goes or not to the DLT ? For example, I don't want to publish messages corresponding to business exceptions to the DLT. Is there a way to handle this ? Thank you for your help.