spring-amqp
spring-amqp copied to clipboard
Initial Reactive Client Support
- Map to/from Spring AMQP
Message
abstractions - Add support for existing payload conversion
PoC based on SampleSender
and SampleReceiver
public class SampleSender {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleSender.class);
private final Sender sender;
private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();
public SampleSender() {
this.sender = RabbitFlux.createSender();
}
public void send(String queue, int count, CountDownLatch latch) {
Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
.map(i -> this.converter.toOutbound("", queue, "Message_" + i))); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
sender.declareQueue(QueueSpecification.queue(queue))
.thenMany(confirmations)
.doOnError(e -> LOGGER.error("Send failed", e))
.subscribe(r -> {
if (r.isAck()) {
LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
latch.countDown();
}
});
}
public void close() {
this.sender.close();
}
}
public class SampleReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleReceiver.class);
private final Receiver receiver;
private final Sender sender;
private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();
public SampleReceiver() {
this.receiver = RabbitFlux.createReceiver();
this.sender = RabbitFlux.createSender();
}
public Disposable consume(String queue, CountDownLatch latch) {
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(queue));
Flux<Delivery> messages = receiver.consumeAutoAck(queue);
return queueDeclaration.thenMany(messages)
.map(d -> this.converter.deliveryToObject(d)) // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
.subscribe(m -> {
LOGGER.info("Received message {}", m.toString());
latch.countDown();
});
}
public void close() {
this.sender.close();
this.receiver.close();
}
}
public class SpringReactorMessageConverter {
private final MessagePropertiesConverter propertiesConverter;
private final MessageConverter converter;
public SpringReactorMessageConverter() {
this(new DefaultMessagePropertiesConverter(), new SimpleMessageConverter());
}
public SpringReactorMessageConverter(MessagePropertiesConverter propertiesConverter, MessageConverter converter) {
this.propertiesConverter = propertiesConverter;
this.converter = converter;
}
public Message deliveryToMessage(Delivery delivery) {
MessageProperties messageProperties = this.propertiesConverter.toMessageProperties(delivery.getProperties(),
delivery.getEnvelope(), StandardCharsets.UTF_8.name());
return new Message(delivery.getBody(), messageProperties);
}
public Object deliveryToObject(Delivery delivery) {
return this.converter.fromMessage(deliveryToMessage(delivery));
}
public OutboundMessage toOutbound(String exchange, String routingKey, Object object) {
Message message = this.converter.toMessage(object, new MessageProperties());
return new OutboundMessage(exchange, routingKey,
this.propertiesConverter.fromMessageProperties(message.getMessageProperties(),
StandardCharsets.UTF_8.name()),
message.getBody());
}
}
cc/ @artembilan @acogoluegnes - very raw at this stage.
I think this.converter.toOutbound()
should produce a Mono<OutboundMessage>
do not have it in a blocking state, but just let it to be evaluated later on demand during subscription.
Looks like the same is applied for the deliveryToMessage()
and deliveryToObject
. I mean must be a Mono<>
return type.
The sender.declareQueue()
confuses me a little bid. As long as I know Spring AMQP it never was a producer responsibility to worry about the queue. It just sends to an exchange with the routing key.
We always declare queues on the listener side.
Plus the way we define a receiver in the SampleReceiver
is a bit confusing: looks like we always have to go over a sender
to really consume message...
I may miss something though. That's why such a rough nit-pick review.
Is reactive client support going to be available soon?
It is not currently being worked on; contributions are welcome.
I understand that SB team is working on greater things. These are not trivial changes for me to contribute either.
I am trying to understand the big picture. For reactive streams, are you suggesting that users should use reactor-rabbitmq
, and support for streams is not in future milestones? I happen to read this comment, hence asking this question: https://stackoverflow.com/questions/49662157/using-spring-amqp-consumer-in-spring-webflux#comment86378895_49675970
@artembilan @garyrussell any comments?
I tagged your names just in case if this did not come in your radar
Hi @vavrfam !
Thank you for your interest in the feature. Yes, plans are build something in Spring AMQP based on the reactor-rabbitmq
. And yes such a feature definitely can go into the next minor 2.3
version.
Tell us, please, what else you would like to hear from us!
And contributions are welcome 😄
Thank you, appreciate your response.
My question was more along if this reactive client feature is in the roadmap for spring-amqp
project. If the answer is no, is it the expectation that users are supposed to reactor-rabbitmq
for streaming? If you think this feature is small enough for people like me to contribute, would you be able to spend time with me for the design discussions, to talk through the implementation? Please let me know. Depending on the effort, I need to decide if my personal time is enough for this , or I need my organization support.
This has not been a priority; I felt my PoC didn't really add much value over the reactive client (aside from reusing the message converters).
We haven't had the bandwidth to spend much time on this; we need to revisit the reactor-rabbitmq to see if there is any real value spring-amqp can add to it.
In our project, we rely heavily on the implementation RabbitListenerAnnotationBeanPostProcessor#processAmqpListener that decides which bean methods is responsible for processing the request. Current spring-amqp
implementation works for RPC perfectly. Ideally, we would need similar support for streams, so that spring-amqp
can support listener methods that has return type of Flux/Mono. Hope I am making sense.
We already have support for Mono<?>
return types.
I am not sure what returning a Flux<?>
from an RPC server means.
For example (hypothetical example), my request contains a details of a huge file, I am expecting a response in chunks. If I get a Flux<> as return type , I could subscribe to it, and download the whole file.
Yes, we already using AsyncRabbitTemplate
which is working nicely for Mono.
That implies some kind of multiplexing since RabbitMQ deals with discrete messages.
We could possibly add something like that to the existing infrastructure, at least on the reply side; probably not on the sending side, though.
After some more thought, I don't think t would be too difficult to add a wrapper around the rabbit template; something like
Flux<Message> sendAndReceive(Flux<Message>)
Where send a message for each flux element and a "final" message when the Flux completes and use the "final" message to complete the "remote" flux (and similarly with the reply Flux).
Bear in mind though, that the underlying amqp-client is not reactive and can block - I don't know if/how reactor-rabbitmq
solves that.
Hmm... We are not gaining anything if client is not reactive right.
I haven't looked lately, but I seem to recall the rabbit team were going to look at adding back-pressure somehow.
Hello! I wanted to ask, what is the status of reactive support in spring-amqp? Will it be released soon or at least took in a roadmap? By the way, i've used a lot of spring-amqp and now started to use reactive-rabbit. On my opinion it looks great but I have a question. Can I mix this two approaches? As Example, use sender and amqp admin from spring-amqp and receiver from reactive-rabbit and other combination? Or it's not a good way?
@Nokinori We just haven't had time to work on it; maybe later this year.
Contributions are welcome!
Yes, you can use both libraries in the same application.
@garyrussell Thanks for you answer! I investigated the source code more detailed. I think, I'll try to handle an implementation. I'm going to prepare a simple prototype first so you'll be able to check it. After that we'll discuss next steps.