spring-cloud-stream
spring-cloud-stream copied to clipboard
Request-Reply in Spring Cloud Stream
Does any capability of sending a request and receiving a reply exist in Spring Cloud Stream? Something that is similar to a Gateway in Spring Integration.
Or is Spring Cloud Stream just for one direction messaging?
Second this, can anyone who has done this provide a sample?
Currently, Spring Cloud Stream has no dedicated support for request/reply interaction. If necessary, this can be achieved by using a Spring Integration Gateway.
There is already some community requirements and some ideas how to be on the matter: https://stackoverflow.com/questions/47800497/how-can-messaginggateway-be-configured-with-spring-cloud-stream-messagechannels.
I've cooked some simple PoC how it works without any modifications to the Framework:
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
interface GatewayChannels {
String REQUEST = "request";
@Output(REQUEST)
MessageChannel request();
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private static final String ENRICH = "enrich";
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);
StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
String result = gateway.process("foo");
System.out.println(result);
}
}
The application properties:
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
So, the idea is like to send a message into the bound destination from the @MessagingGateway
and wait for the reply from another bound destination. Those two are simply INPUT and OUTPUT for the processor on the other side.
As for the out-of-the-box solution it seems just enough to include into some ChannelInterceptor
for the outbound part the logic to convert TemporaryReplyChannel
to the string representation over existing HeaderChannelRegistry
. On the consumer side copy appropriate replyChannel
and errorChannel
from the request message to the reply message when we process @SendTo
(@ServiceActivator
does that already automatically).
Any other thoughts are welcome!
@artembilan Does this example uses some kind of correlationId?
If not would would you suggest when dealing with multiple concurrent requests/responses?
The correlation is provided by this
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
Each request gets a dedicated replyChannel which is stored in a header; the enricher stores the channel in a HeaderChannelRegistry
and changes the header to a String value.
When the reply is received, the gateway resolves the string back to the actual reply channel for this request.
For this reason, the server side must echo back the replyChannel
header.
Hence .copyHeaders(request.getHeaders())
.
This works out-of-the-box with the Rabbit binder.
When using the Kafka binder, you must add the replyChannel
to the binder's headers property (or use the kafka11 binder or 2.0.M4 which uses kafka 1.0.)
@artembilan @garyrussell I know this has been targeted as a possible future feature, but I have a question about Artem's solution in the meantime. In Artem's comments within the solution to my original post, he stated:
Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory.
Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist? I am using NGINX to load balance across multiple servers, and ideally I'd like to have the consumer running on each server in a horizontal scaling configuration. True to Artem's comment, when I run the consumer on each server, my test cases fail. When I only run the consumer on one of the servers (and thusly only one is available within the cloud via Eureka), the test cases work. However, this isn't a resilient enough configuration considering server and/or microservice failures.
Hi,
I am using version 2.0.0.Release of binder kafka and streams like so:
plugins {
id 'java'
}
group 'com.projectdrgn'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile "org.springframework.boot:spring-boot-starter-webflux:2.0.1.RELEASE"
compile "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:2.0.0.RELEASE"
compile "org.springframework.cloud:spring-cloud-stream-binder-kafka:2.0.0.RELEASE"
testCompile group: 'junit', name: 'junit', version: '4.12'
}
I copied Artem's sample + application properties directly into my intellij. And it works with just one exception. Instead of FOO I receive a byte array encoded into a string saying 70,79,79. Anything I am missing?
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {
interface GatewayChannels {
String REQUEST = "request";
@Output(REQUEST)
MessageChannel request();
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private static final String ENRICH = "enrich";
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
String process(String payload);
}
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.channel(GatewayChannels.REQUEST)
.get();
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> process(Message<String> request) {
return MessageBuilder.withPayload(request.getPayload().toUpperCase())
.copyHeaders(request.getHeaders())
.build();
}
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext applicationContext =
SpringApplication.run(CloudStreamGatewayApplication.class, args);
StreamGateway gateway = applicationContext.getBean(StreamGateway.class);
Thread.sleep(5000);
while (true) {
String result = gateway.process("foo");
System.out.println(result);
Thread.sleep(1000);
}
}
}
applications.yml
spring:
cloud:
stream:
bindings:
input:
destination: requests
output:
destination: replies
request:
destination: requests
reply:
destination: replies
Thanks, Stefan
@stebart,
Please, follow this fix: https://github.com/titoc/rest-gateway/pull/1
@kabennett Sorry for the delay.
Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?
It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.
The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a BinderAwareChannelResolver
as is used in the router app.
I copied Artem's sample + application properties directly into my intellij. And it works with just one exception. Instead of FOO I receive a byte array encoded into a string saying 70,79,79. Anything I am missing?
Having the same problem myself
As I said in the mentioned PR: You need to have this in the gateway configuration:
@ServiceActivator(inputChannel = GatewayChannels.TO_UPPERCASE_REPLY)
public MyMessage getReply(MyMessage myMessage) {
return myMessage;
}
The gateway interface must be declared without replyChannel
:
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = ENRICH)
String process(String payload);
}
This way Spring Cloud Stream will perform its content negotiation logic and convert an incoming bytes to the expected type with built-in message converters.
Since the service activator is configured without an outputChennel
, the replyChannel
header is going to be selected and the gateway will receive a desired reply.
@garyrussell
It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.
The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a
BinderAwareChannelResolver
as is used in the router app.
Anything in the Greenwich release that assists in implementing this?
Greenwich? That is spring-cloud. I think you meant Fishtown. Outside of what's available in SI, no there is nothing in Fishtown
Hi, I was trying to accomplish something similar, although it is HTTP specific and not a generic request/reply mechanism out of the box.
-
I used AsyncContext from Servlet 3.1
-
AsyncContext returned from every request is stored in a
ConcurrentMap
with a key that is calculated fromSystem.identityHashCode(request)
whererequest
isHttpServletRequest
which will become acorrelationId
that needs to be stored in the header part of aMessage<?>
and it is the responsibility of theMessage<?>
receiving component that it puts the sameid
back in theMessaeg<?>
that it sends back to the origin. -
As it uses the
AsyncContext
, the thread which handled the request does not block. Thread is reattached whenAsyncContextServletMessagingGateway
retrieves a returned message fromand finds a
AsyncContextwith the
correlationIdand calls
dispatch()` on it.
The original idea came from jetty's Continuations which has been deprecated in replace of native AsyncContext
use. Therefore, it is nothing new and has been used in all kinds of projects including CometD
Here is the entire code so far I am using for my PoC. I hope this will give some hint to someone who is thinking of doing request/reply pattern(of HTTP) using Spring Cloud Stream.
https://gist.github.com/hanishi/e2f2e83a168e192f681f3a85379019d6
AsyncContext
can also change HttpRequestHandlingMessagingGateway
none blocking, which is a good thing. @artembilan You could probably make the AsyncContextServletMessagingGateway
approach more portable so the mechanism can be used in both Spring Integration and Spring Cloud Stream.
@kabennett Sorry for the delay.
Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?
It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance.
The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a
BinderAwareChannelResolver
as is used in the router app.
Hi do you know if there is any solution for this issue now? Thanks.
@kabennett Sorry for the delay.
Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?
It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance. The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a
BinderAwareChannelResolver
as is used in the router app.Hi do you know if there is any solution for this issue now? Thanks.
An alternative to Garry's solution. I'm using filtering mechanism. For example
@MessagingGateway
public interface StreamGateway {
String ENRICH = "enrich";
String FILTER = "filter";
@Gateway(requestChannel = ENRICH, replyChannel = FILTER)
Mono<byte[]> sendAndReceive(Message<String> message);
//or byte[] sendSynchronousEvent(Message<String> message);
}
@Configuration
public class BeanContainer {
public static final UUID instanceUUID = UUID.randomUUID();
@Bean
public IntegrationFlow headerEnricherFlow() {
return IntegrationFlows.from(EventListenerGateway.ENRICH)
.enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("instanceId",instanceUUID))
.channel(Source.OUTPUT)
.get();
}
@Bean
public IntegrationFlow replyFiltererFlow() {
return IntegrationFlows.from(Sink.INPUT)
.filter(Message.class, message -> instanceUUID.equals(message.getHeaders().get("instanceId")))
.channel(EventListenerGateway.FILTER)
.get();
}
}
It's getting warning. Maybe it's not best practice but work for multiple producer instance.
Explanation: When consumer reply-back to producer and there is a multiple producer, each producer will collect to reply-message. This is a problem and I am using this code block to solve this problem. If you set dedicated instance id to message header, you can filter reply-message by instance id.
@kabennett Sorry for the delay.
Any ideas on how this solution could possibly work in a clustered environment where more than one consumer in the group can exist?
It's not a problem on the consumer side; one of the instances will handle the request and send the reply. The issue is getting the reply back to the right client-side instance. The simplest solution for that is to have a dedicated reply topic for each instance and use dynamic reply routing via a
BinderAwareChannelResolver
as is used in the router app.Hi do you know if there is any solution for this issue now? Thanks.
An alternative to Garry's solution. I'm using filtering mechanism. For example
@MessagingGateway public interface StreamGateway { String ENRICH = "enrich"; String FILTER = "filter"; @Gateway(requestChannel = ENRICH, replyChannel = FILTER) Mono<byte[]> sendAndReceive(Message<String> message); //or byte[] sendSynchronousEvent(Message<String> message); }
@Configuration public class BeanContainer { public static final UUID instanceUUID = UUID.randomUUID(); @Bean public IntegrationFlow headerEnricherFlow() { return IntegrationFlows.from(EventListenerGateway.ENRICH) .enrichHeaders(HeaderEnricherSpec::headerChannelsToString) .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("instanceId",instanceUUID)) .channel(Source.OUTPUT) .get(); } @Bean public IntegrationFlow replyFiltererFlow() { return IntegrationFlows.from(Sink.INPUT) .filter(Message.class, message -> instanceUUID.equals(message.getHeaders().get("instanceId"))) .channel(EventListenerGateway.FILTER) .get(); } }
It's getting warning. Maybe it's not best practice but work for multiple producer instance.
Explanation: When consumer reply-back to producer and there is a multiple producer, each producer will collect to reply-message. This is a problem and I am using this code block to solve this problem. If you set dedicated instance id to message header, you can filter reply-message by instance id.
Hi, @burakhelvaci I have done a sample project using your approach and seems to work well. see https://github.com/cjrequena/spring-integration-gateway-kafka-sample
IntegrationFlows.from(streamName)
...
.channel(streamName)
You make a trap for yourself. So, essentially you loop messages from the channel to itself.
It might work some time in your target solution, and sometime doesn't. Just because such a channel declaration brings for us a DirectChannel
instance with a UnicastingDispatcher
. If there are several subscribers to such a channel, the messages are going to be handed to them in round-robin manner.
I'm not sure what is your logic, but gateway's input must be a different one from the output
for target binding, so you won't loop accidentally.
Also you need to keep in mind that filter by instance id works, but this is not an efficient solution since all your instances are going to consume the same message from Kafka topic. However it is going to work only when all your instances have different groups to achieve a true publish-subscriber for reply delivery.
I don't have a perfect answer how it should be, so that might be a reason why we still don't have a reqest-reply as an out-of-the-box solution.
IntegrationFlows.from(streamName) ... .channel(streamName)
You make a trap for yourself. So, essentially you loop messages from the channel to itself. It might work some time in your target solution, and sometime doesn't. Just because such a channel declaration brings for us a
DirectChannel
instance with aUnicastingDispatcher
. If there are several subscribers to such a channel, the messages are going to be handed to them in round-robin manner. I'm not sure what is your logic, but gateway's input must be a different one from theoutput
for target binding, so you won't loop accidentally.Also you need to keep in mind that filter by instance id works, but this is not an efficient solution since all your instances are going to consume the same message from Kafka topic. However it is going to work only when all your instances have different groups to achieve a true publish-subscriber for reply delivery.
I don't have a perfect answer how it should be, so that might be a reason why we still don't have a reqest-reply as an out-of-the-box solution.
Thanks for your comments, I see your point about that is not the best solution in terms of efficiency.
This will be an exciting feature, do we know what is planned
Thanks @cjrequena and @artembilan I implemented something similar for RabbitMq with mix of s-c-stream and integration. It has got two producers and a consumer. It is working fine. Here is the sample: https://github.com/5aab/request-reply-pattern
here is a fully working example to do request-reply with Spring Cloud Stream & Spring Integration. A REST service sends the received data to an EDA service (upperCase of the received String) by doind request-reply through the "example.request-reply" exchange .
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.function.Function;
@RequestMapping(path = "/api", produces = MediaType.TEXT_PLAIN_VALUE)
@RestController
@Slf4j
public class ClientController {
@Autowired
private Function<String, String> convertSendAndReceive;
@Operation(summary = "do request-reply", description = "send message to ", tags = {"SCS"})
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "successful operation")
})
@PostMapping(value = "/sendToRabbit", consumes = MediaType.TEXT_PLAIN_VALUE)
ResponseEntity<String> sendToRabbit(@RequestBody String message) {
try {
String response = send.apply(message);
log.info("response : " + response);
return new ResponseEntity(response, HttpStatus.OK);
} catch (Exception exception) {
log.error(exception.getMessage());
return new ResponseEntity(exception.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}
//
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import java.util.function.Consumer;
@Configuration
public class ResponseConfiguration {
@Bean
public Consumer<Message<String>> respond(MessageHandler amqpOutboundEndpoint) {
return amqpOutboundEndpoint::handleMessage;
}
@Bean
public MessageHandler amqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint amqpOutboundEndpoint = new AmqpOutboundEndpoint(amqpTemplate);
amqpOutboundEndpoint.setRoutingKeyExpressionString("headers['" + AmqpHeaders.REPLY_TO + "']");
return amqpOutboundEndpoint;
}
}
//
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Function;
@Configuration
public class ServerConfiguration {
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
//
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Function;
@Configuration
public class ClientConfiguration {
private static final String EXCHANGE_NAME = "EXCHANGE_NAME";
private String DEFAULT_EXCHANGE_NAME = "example.request-reply";
@Bean
@DependsOn("amqpOutbound")
public <T, R> Function<T, R> convertSendAndReceive(Function<Message<T>, R> sendToRabbitFunction) {
return message -> {
Message<T> msg = MessageBuilder.withPayload(message)
.setHeader(EXCHANGE_NAME, DEFAULT_EXCHANGE_NAME)
.build();
return sendToRabbitFunction.apply(msg);
};
}
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Function.class, gateway -> gateway.beanName("sendToRabbitFunction"))
.handle(Amqp.outboundGateway(amqpTemplate)
.exchangeNameExpression("headers['" + EXCHANGE_NAME + "']"))
.get();
}
}
//yml config
spring:
cloud:
stream:
function:
definition: uppercase|respond
rabbit:
bindings:
uppercaserespond-in-0:
consumer:
queueNameGroupOnly: true
exchangeType: topic
autoBindDlq: true
bindings:
uppercaserespond-in-0:
group: uppercase
destination: request-reply
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-cloud-stream-request-reply</artifactId>
<groupId>demo</groupId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<spring-cloud.version>2020.0.1</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- OpenAPI-->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.4.8</version>
</dependency>
<!-- workaround openapi-core-->
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>4.8.44</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
`