spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

ChannelInterceptor in preSend method doesn't work as expected

Open ganchix opened this issue 3 years ago • 3 comments

Description

Hi,

I'm implementing a ChannelInterceptor to ignore some messages depending on the message content. According to the preSend documentation method in ChannelInterceptor:

Invoked before the Message is actually sent to the channel. This allows for modification of the Message if necessary. If this method returns null then the actual send invocation will not occur.

But if the method returns null the send invocation occurs and the message tries to retry:

2022-01-31 16:51:51.164 ERROR 22644 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'bean 'account-in-0'' within timeout: -1, failedMessage=GenericMessage [payload=byte[73], headers={kafka_offset=487, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33f0edd4, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=account.destination, kafka_receivedTimestamp=1643643854637, kafka_groupId=anonymous.a501a198-973a-4a0b-9429-902259342f47, target-protocol=streamBridge}]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:190)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)

How to reproduce

The code is in this repo

To reproduce only has to return null in preSend method in ChannelInterceptor

Version of the framework

  • Spring Boot version: 2.6.3
  • Spring Cloud version: 2021.0.0
  • Java version 11

Expected behavior

Omit the send invocation and generate the ack

Thank you

ganchix avatar Jan 31 '22 16:01 ganchix

Just FYI the GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:190) does the MessageChannel.send() exactly. The ChannelInterceptor is a part of the MessageChannel already. So, the send you see in the stack trace is misleading you a little bit.

What exactly to do over here with the null return is indeed a decision should be made withing this issue.

Thanks

artembilan avatar Jan 31 '22 16:01 artembilan

Maybe this is a better stack trace:

Caused by: org.springframework.messaging.MessageDeliveryException: Failed to send message to channel 'bean 'account-in-0'' within timeout: -1
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:190) ~[spring-messaging-5.3.15.jar:5.3.15]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.15.jar:5.3.15]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.15.jar:5.3.15]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.15.jar:5.3.15]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.8.jar:5.5.8]

My use case is to implement an idempotent interceptor, and I understand the message should be omitted and send the ack indicating that the message is consumed.

Thank you

ganchix avatar Jan 31 '22 16:01 ganchix

May be this one can help you somehow: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#idempotent-receiver ?

The point is that MessageChannel indeed returns false when no message after preSend():

				message = interceptorList.preSend(message, this, interceptorStack);
				if (message == null) {
					return false;
				}

And then that GenericMessagingTemplate does this:

		if (!sent) {
			throw new MessageDeliveryException(message,
					"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
		}

So, technically the ChannelInterceptor is a wrong place for ignoring messages, aka filtering or idempotency support.

artembilan avatar Jan 31 '22 16:01 artembilan

I would suggest to use an Event Routing for that instead of dealing with low level abstraction such as channels and interceptors. In any case. It appears that in the context of this issue here is really nothing to do, so closing it

olegz avatar Jan 04 '23 18:01 olegz