spring-cloud-stream-samples icon indicating copy to clipboard operation
spring-cloud-stream-samples copied to clipboard

How to add custom header to the DLT Event using Spring Cloud Stream

Open jonathanmdr opened this issue 9 months ago • 4 comments

I have a consumer, and I want to set custom headers when a failure occurs in the consumer for the DLT event.

I implemented this approach, but it didn't work!

@Component
public class MyConsumer implements Consumer<Message<MyEvent>> {

    @Override
    public void accept(final Message<MyEvent> event) {
        // Some logic to process the event and throws an exception
    }

    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(final KafkaOperations<?, ?> kafkaOperations) {
        final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaOperations);
        deadLetterPublishingRecoverer.setHeadersFunction((consumerRecord, exception) -> {
            final Headers headers = consumerRecord.headers();
            headers.add("foo", "bar".getBytes(StandardCharsets.UTF_8));
            return headers;
        });
        return deadLetterPublishingRecoverer;
    }

    @Bean
    public DefaultErrorHandler customDefaultErrorHandler(final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new DefaultErrorHandler(deadLetterPublishingRecoverer);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(final DefaultErrorHandler errorHandler) {
        return (container, destination, group) -> container.setCommonErrorHandler(errorHandler);
    }

}

How do I do this correctly?

stackoverflow issue

jonathanmdr avatar Mar 19 '25 23:03 jonathanmdr

Hi, I Updated the SO thread. But here is what I added as comments there.

See this related issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2700 Also: https://stackoverflow.com/questions/75718833/add-custom-header-when-message-publish-to-dlq-by-spring-cloud-stream-kafka-binde https://stackoverflow.com/questions/76133263/how-do-i-add-custom-headers-to-a-failed-message-before-it-reaches-the-dead-lette

sobychacko avatar Mar 20 '25 00:03 sobychacko

Hi @sobychacko, can you help me understand what I configured wrong in my study project?

I adjusted the implementation per your link suggestions, but the custom header is not added to the event before it reaches the DLT.

spring:
    cloud:
        function:
            definition: authorizationProcessedConsumer;authorizationProcessedDltConsumer
        stream:
            kafka:
                binder:
                    auto-create-topics: true
                bindings:
                    authorizationProcessedConsumer-in-0:
                        consumer:
                            enableDlq: true
                            dlqName: authorization-order-events-dlt
                            dlqPartitions: 1
                            autoCommitOnError: true
                            autoCommitOffset: true
                    authorizationProcessedDltConsumer-in-0:
                        consumer:
                            enableDlq: false
                            dlqName: authorization-order-events-dlt
                            dlqPartitions: 1
                            autoCommitOnError: false
                            autoCommitOffset: true
            bindings:
                authorizationProcessedConsumer-in-0:
                    destination: authorization-order-events
                    group: ${spring.application.name}
                    binder: ecommerce
                authorizationProcessedDltConsumer-in-0:
                    destination: authorization-order-events-dlt
                    group: ${spring.application.name}
                    binder: ecommerce
            binders:
                ecommerce:
                    type: kafka
                    environment:
                        spring:
                            cloud:
                                stream:
                                    kafka:
                                        binder:
                                            brokers: ${KAFKA_BROKER_HOSTS:localhost:9092}
                                            configuration:
                                                security:
                                                    protocol: ${KAFKA_PROTOCOL:SASL_PLAINTEXT}
                                                sasl:
                                                    mechanism: PLAIN
                                                    jaas:
                                                        config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"order-worker\" password=\"order-worker\";"
@Component
public class AuthorizationProcessedConsumer implements Consumer<AuthorizationProcessedEvent> {

    private final UpdateOrderUseCase updateOrderUseCase;

    public AuthorizationProcessedConsumer(final UpdateOrderUseCase updateOrderUseCase) {
        this.updateOrderUseCase = updateOrderUseCase;
    }

    @Override
    public void accept(final AuthorizationProcessedEvent event) {
        final NotificationHandler handler = NotificationHandler.create();
        event.validate(handler);

        if (handler.hasErrors()) {
            throw DomainException.with("The authorization processed event has validation errors", handler.errors());
        }

        final UpdateOrderInput input = UpdateOrderInput.with(event.orderId(), event.status());
        this.updateOrderUseCase.execute(input);
    }

    @Bean
    public ListenerContainerWithDlqAndRetryCustomizer customizer(final KafkaOperations<?, ?> kafkaOperations) {
        return (container, destinationName, group, dlqDestinationResolver, backOff) -> {
            if (Objects.isNull(dlqDestinationResolver) || Objects.isNull(backOff)) {
                return;
            }

            final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaOperations, dlqDestinationResolver);
            deadLetterPublishingRecoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
                final String exceptionType = ExceptionUtils.getRootCauseMessage(exception);
                kafkaHeaders.add("exception-type", exceptionType.getBytes());
            });
            container.setCommonErrorHandler(new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff));
        };
    }

}

jonathanmdr avatar Mar 20 '25 15:03 jonathanmdr

Could you put these components into a standalone Spring Boot application and share it here? That way, it is easier to triage. Thanks!

sobychacko avatar Mar 20 '25 17:03 sobychacko

Sure, @sobychacko. In this repository, I have a sample with a simple implementation of this in the poc-dlt-headers branch! https://github.com/jonathanmdr/Spot/tree/feature/poc-dlt-headers

jonathanmdr avatar Mar 20 '25 18:03 jonathanmdr

@sobychacko and @olegz, any updates about this topic?

jonathanmdr avatar Jun 11 '25 19:06 jonathanmdr

Looking and will get back before he end of the day

olegz avatar Jun 27 '25 07:06 olegz

@jonathanmdr i thought you have included a reproducer, but instead it's full blown project with many classes and configurations. We would not have be able to review it in this state. Please include a bare minimum to reproduce the issue.

olegz avatar Jun 27 '25 08:06 olegz

@olegz in the link I posted, redirects to the repository branch poc-dlt-headers, it has a minimal example to reproduce the behavior, the class with the recommendded implementation is the SpotApplication.java.

jonathanmdr avatar Jun 27 '25 18:06 jonathanmdr

@jonathanmdr It appears you were close but have missed one crusial part and that is why the code has never reached your custom error handler. What you have missed is override and return false fromretryAndDlqInBinding. As teh javadoc says:

	/**
	 * Return false to move retries and DLQ from the binding to a customized error handler
	 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
	 * configured via
	 ....
	 */
	default boolean retryAndDlqInBinding(String destinationName, String group) {
		return true;
	}

Here is the refactored version of your ListenerContainerWithDlqAndRetryCustomizer

@Bean
 ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
        return new ListenerContainerWithDlqAndRetryCustomizer() {

            @Override
            public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                                  String group,
                                  BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                                  BackOff backOff) {
                if (Objects.isNull(dlqDestinationResolver) || Objects.isNull(backOff)) {
                    return;
                }

                final DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver);
                deadLetterPublishingRecoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
                    final String exceptionType = ExceptionUtils.getRootCauseMessage(exception);
                    kafkaHeaders.add("exception-type", exceptionType.getBytes());
                });
                container.setCommonErrorHandler(new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff));
            }

            @Override
            public boolean retryAndDlqInBinding(String destinationName, String group) {
                return false;
            }
        };
 }

Please let me know so we can close this issue

olegz avatar Jun 29 '25 09:06 olegz

Thanks @olegz, your response helped me understand the problem and the behavior of the customized error handler!

jonathanmdr avatar Jun 30 '25 18:06 jonathanmdr