java-spring-cloud icon indicating copy to clipboard operation
java-spring-cloud copied to clipboard

opentracing-spring-cloud-starter breaks stomp client publish

Open cameronbraid opened this issue 5 years ago • 4 comments

As soon as I add opentracing-spring-cloud-starter:0.5.7 jar to my app I am no longer able to publish messages via stomp from client.

Client side subscribe works, and server publish messages are received on the client but the client can't publish as the following execption is received :

Exception in thread "clientInboundChannel-2" org.springframework.messaging.MessageDeliveryException: Failed to handle GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@3d640f4d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientDoOn@9ba905b]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}], failedMessage=GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}]
        at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153)
        at io.opentracing.contrib.concurrent.TracedRunnable.run(TracedRunnable.java:30)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[2], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/queue/alternative-search], content-length=[2]}, simpSessionAttributes={}, simpHeartbeat=[J@28df87fe, opentracing.span=679eddba3b563c2e:679eddba3b563c2e:0:0 - /queue/alternative-search, id=1296640f-6ea0-5b7b-8706-b71243d6fd1f, simpSessionId=35c1c8b0-8047-e0b8-e728-e48972d77b0b, simpDestination=/queue/alternative-search, timestamp=1600508333518}]
        at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:499)
        at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:256)
        at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144)
        ... 4 more

Spring boot version 2.3.4.RELEASE

cameronbraid avatar Sep 19 '20 09:09 cameronbraid

FYI I have more details of my broker setup and client are at https://github.com/spring-projects/spring-framework/issues/25640

cameronbraid avatar Sep 19 '20 10:09 cameronbraid

I believe the issue is related to how TracingChannelInterceptor is implemented. It uses MessageBuilder.from(message) to obtain a builder, make changes, and then call build() again. However the STOMP over WebSocket messaging in the Spring Framework works with headers through a MessageHeaderAccessor wrapper, and sub-classes which provide support for in-app headers vs native (STOMP) headers as well as mutability of headers across components (e.g. interceptors) before the message is sent on a channel. When this wrapper is not used, and headers are manipulated directly via MessageBuilder the wrapper is lost and that's what leads to the specific IllegalStateException above.

The Javadoc provides more details but roughly the usage is something like this:

MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
	// headers are mutable, so change the headers and no need to re-create message
	updateHeaders(accessor);
}
else {
	accessor = MessageHeaderAccessor.getMutableAccessor(message);
	accessor.setLeaveMutable(true); // in preSend, leaving this mutable could benefit other interceptors
	updateHeaders(accessor);
	return new GenericMessage<>(message.getPayload(), accessor.toMessageHeaders());
}

where updateHeaders(accessor) is a common method that sets or removes headers.

rstoyanchev avatar Sep 29 '20 10:09 rstoyanchev

@rstoyanchev wrapper is lost when we return a new GenericMessage object in the above snippet which still causes IllegalStateException in StompBrokerRelayMessageHandler. Do we have any solutions for this

arjunkrishnamurthy avatar Apr 05 '22 07:04 arjunkrishnamurthy

@rstoyanchev I'm using a java agent (opentelemetry java agent) to catch all traffic in, out my spring-boot app. It modifies header of websocket for tracing. Then I receive IllegalStateException. Do you know how to fix it?

My stack trace: Failed to handle GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], tracepFailed to handle GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, timestamp=1665369946985}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@be1f38d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@539ba851]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, simpDestination=/topic/53539-resource-billing-buy-more-changing, timestamp=1665369946985}]", "stackTraceElements":"org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153) org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) arent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, timestamp=1665369946985}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@be1f38d in StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@539ba851]]; nested exception is java.lang.IllegalStateException: No header accessor (not using the SimpMessagingTemplate?): GenericMessage [payload=byte[91], headers={simpMessageType=MESSAGE, nativeHeaders={foo=[bar], traceparent=[00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01]}, traceparent=00-e06e35d09fda2cb7cc624c1269acb2cd-7dc5aa0ca1566892-01, id=cf214d5f-5d9e-3d78-67d7-5b83c004f1e6, contentType=application/json, simpDestination=/topic/53539-resource-billing-buy-more-changing, timestamp=1665369946985}]", "stackTraceElements":"org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:153) org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)

anngdinh avatar Oct 10 '22 04:10 anngdinh