How to add custom header to the DLT Event using Spring Cloud Stream
I have a consumer, and I want to set custom headers when a failure occurs in the consumer for the DLT event.
I implemented this approach, but it didn't work!
@Component
public class MyConsumer implements Consumer<Message<MyEvent>> {
@Override
public void accept(final Message<MyEvent> event) {
// Some logic to process the event and throws an exception
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(final KafkaOperations<?, ?> kafkaOperations) {
final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaOperations);
deadLetterPublishingRecoverer.setHeadersFunction((consumerRecord, exception) -> {
final Headers headers = consumerRecord.headers();
headers.add("foo", "bar".getBytes(StandardCharsets.UTF_8));
return headers;
});
return deadLetterPublishingRecoverer;
}
@Bean
public DefaultErrorHandler customDefaultErrorHandler(final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(final DefaultErrorHandler errorHandler) {
return (container, destination, group) -> container.setCommonErrorHandler(errorHandler);
}
}
How do I do this correctly?
Hi, I Updated the SO thread. But here is what I added as comments there.
See this related issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2700 Also: https://stackoverflow.com/questions/75718833/add-custom-header-when-message-publish-to-dlq-by-spring-cloud-stream-kafka-binde https://stackoverflow.com/questions/76133263/how-do-i-add-custom-headers-to-a-failed-message-before-it-reaches-the-dead-lette
Hi @sobychacko, can you help me understand what I configured wrong in my study project?
I adjusted the implementation per your link suggestions, but the custom header is not added to the event before it reaches the DLT.
spring:
cloud:
function:
definition: authorizationProcessedConsumer;authorizationProcessedDltConsumer
stream:
kafka:
binder:
auto-create-topics: true
bindings:
authorizationProcessedConsumer-in-0:
consumer:
enableDlq: true
dlqName: authorization-order-events-dlt
dlqPartitions: 1
autoCommitOnError: true
autoCommitOffset: true
authorizationProcessedDltConsumer-in-0:
consumer:
enableDlq: false
dlqName: authorization-order-events-dlt
dlqPartitions: 1
autoCommitOnError: false
autoCommitOffset: true
bindings:
authorizationProcessedConsumer-in-0:
destination: authorization-order-events
group: ${spring.application.name}
binder: ecommerce
authorizationProcessedDltConsumer-in-0:
destination: authorization-order-events-dlt
group: ${spring.application.name}
binder: ecommerce
binders:
ecommerce:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: ${KAFKA_BROKER_HOSTS:localhost:9092}
configuration:
security:
protocol: ${KAFKA_PROTOCOL:SASL_PLAINTEXT}
sasl:
mechanism: PLAIN
jaas:
config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"order-worker\" password=\"order-worker\";"
@Component
public class AuthorizationProcessedConsumer implements Consumer<AuthorizationProcessedEvent> {
private final UpdateOrderUseCase updateOrderUseCase;
public AuthorizationProcessedConsumer(final UpdateOrderUseCase updateOrderUseCase) {
this.updateOrderUseCase = updateOrderUseCase;
}
@Override
public void accept(final AuthorizationProcessedEvent event) {
final NotificationHandler handler = NotificationHandler.create();
event.validate(handler);
if (handler.hasErrors()) {
throw DomainException.with("The authorization processed event has validation errors", handler.errors());
}
final UpdateOrderInput input = UpdateOrderInput.with(event.orderId(), event.status());
this.updateOrderUseCase.execute(input);
}
@Bean
public ListenerContainerWithDlqAndRetryCustomizer customizer(final KafkaOperations<?, ?> kafkaOperations) {
return (container, destinationName, group, dlqDestinationResolver, backOff) -> {
if (Objects.isNull(dlqDestinationResolver) || Objects.isNull(backOff)) {
return;
}
final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaOperations, dlqDestinationResolver);
deadLetterPublishingRecoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
final String exceptionType = ExceptionUtils.getRootCauseMessage(exception);
kafkaHeaders.add("exception-type", exceptionType.getBytes());
});
container.setCommonErrorHandler(new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff));
};
}
}
Could you put these components into a standalone Spring Boot application and share it here? That way, it is easier to triage. Thanks!
Sure, @sobychacko. In this repository, I have a sample with a simple implementation of this in the poc-dlt-headers branch!
https://github.com/jonathanmdr/Spot/tree/feature/poc-dlt-headers
@sobychacko and @olegz, any updates about this topic?
Looking and will get back before he end of the day
@jonathanmdr i thought you have included a reproducer, but instead it's full blown project with many classes and configurations. We would not have be able to review it in this state. Please include a bare minimum to reproduce the issue.
@olegz in the link I posted, redirects to the repository branch poc-dlt-headers, it has a minimal example to reproduce the behavior, the class with the recommendded implementation is the SpotApplication.java.
@jonathanmdr It appears you were close but have missed one crusial part and that is why the code has never reached your custom error handler.
What you have missed is override and return false fromretryAndDlqInBinding.
As teh javadoc says:
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
....
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
Here is the refactored version of your ListenerContainerWithDlqAndRetryCustomizer
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff) {
if (Objects.isNull(dlqDestinationResolver) || Objects.isNull(backOff)) {
return;
}
final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver);
deadLetterPublishingRecoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
final String exceptionType = ExceptionUtils.getRootCauseMessage(exception);
kafkaHeaders.add("exception-type", exceptionType.getBytes());
});
container.setCommonErrorHandler(new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
Please let me know so we can close this issue
Thanks @olegz, your response helped me understand the problem and the behavior of the customized error handler!