spring-amqp icon indicating copy to clipboard operation
spring-amqp copied to clipboard

Initial Reactive Client Support

Open garyrussell opened this issue 6 years ago • 19 comments

  • Map to/from Spring AMQP Message abstractions
  • Add support for existing payload conversion

PoC based on SampleSender and SampleReceiver

public class SampleSender {

	private static final Logger LOGGER = LoggerFactory.getLogger(SampleSender.class);

	private final Sender sender;

	private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();

	public SampleSender() {
		this.sender = RabbitFlux.createSender();
	}

	public void send(String queue, int count, CountDownLatch latch) {
		Flux<OutboundMessageResult> confirmations = sender.sendWithPublishConfirms(Flux.range(1, count)
				.map(i -> this.converter.toOutbound("", queue, "Message_" + i))); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

		sender.declareQueue(QueueSpecification.queue(queue))
				.thenMany(confirmations)
				.doOnError(e -> LOGGER.error("Send failed", e))
				.subscribe(r -> {
					if (r.isAck()) {
						LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody()));
						latch.countDown();
					}
				});
	}

	public void close() {
		this.sender.close();
	}

}
public class SampleReceiver {

	private static final Logger LOGGER = LoggerFactory.getLogger(SampleReceiver.class);

	private final Receiver receiver;

	private final Sender sender;

	private final SpringReactorMessageConverter converter = new SpringReactorMessageConverter();

	public SampleReceiver() {
		this.receiver = RabbitFlux.createReceiver();
		this.sender = RabbitFlux.createSender();
	}

	public Disposable consume(String queue, CountDownLatch latch) {
		Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(queue));
		Flux<Delivery> messages = receiver.consumeAutoAck(queue);
		return queueDeclaration.thenMany(messages)
				.map(d -> this.converter.deliveryToObject(d)) // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
				.subscribe(m -> {
					LOGGER.info("Received message {}", m.toString());
					latch.countDown();
		});
	}

	public void close() {
		this.sender.close();
		this.receiver.close();
	}

}
public class SpringReactorMessageConverter {

	private final MessagePropertiesConverter propertiesConverter;

	private final MessageConverter converter;

	public SpringReactorMessageConverter() {
		this(new DefaultMessagePropertiesConverter(), new SimpleMessageConverter());
	}

	public SpringReactorMessageConverter(MessagePropertiesConverter propertiesConverter, MessageConverter converter) {
		this.propertiesConverter = propertiesConverter;
		this.converter = converter;
	}

	public Message deliveryToMessage(Delivery delivery) {
		MessageProperties messageProperties = this.propertiesConverter.toMessageProperties(delivery.getProperties(),
				delivery.getEnvelope(), StandardCharsets.UTF_8.name());
		return new Message(delivery.getBody(), messageProperties);
	}

	public Object deliveryToObject(Delivery delivery) {
		return this.converter.fromMessage(deliveryToMessage(delivery));
	}

	public OutboundMessage toOutbound(String exchange, String routingKey, Object object) {
		Message message = this.converter.toMessage(object, new MessageProperties());
		return new OutboundMessage(exchange, routingKey,
				this.propertiesConverter.fromMessageProperties(message.getMessageProperties(),
						StandardCharsets.UTF_8.name()),
				message.getBody());
	}

}

cc/ @artembilan @acogoluegnes - very raw at this stage.

garyrussell avatar Jan 17 '19 22:01 garyrussell

I think this.converter.toOutbound() should produce a Mono<OutboundMessage> do not have it in a blocking state, but just let it to be evaluated later on demand during subscription. Looks like the same is applied for the deliveryToMessage() and deliveryToObject. I mean must be a Mono<> return type.

The sender.declareQueue() confuses me a little bid. As long as I know Spring AMQP it never was a producer responsibility to worry about the queue. It just sends to an exchange with the routing key. We always declare queues on the listener side.

Plus the way we define a receiver in the SampleReceiver is a bit confusing: looks like we always have to go over a sender to really consume message...

I may miss something though. That's why such a rough nit-pick review.

artembilan avatar Jan 22 '19 23:01 artembilan

Is reactive client support going to be available soon?

aup-1 avatar Nov 20 '19 16:11 aup-1

It is not currently being worked on; contributions are welcome.

garyrussell avatar Nov 20 '19 16:11 garyrussell

I understand that SB team is working on greater things. These are not trivial changes for me to contribute either. I am trying to understand the big picture. For reactive streams, are you suggesting that users should use reactor-rabbitmq, and support for streams is not in future milestones? I happen to read this comment, hence asking this question: https://stackoverflow.com/questions/49662157/using-spring-amqp-consumer-in-spring-webflux#comment86378895_49675970

aup-1 avatar Jan 28 '20 20:01 aup-1

@artembilan @garyrussell any comments?
I tagged your names just in case if this did not come in your radar

aup-1 avatar Jan 30 '20 14:01 aup-1

Hi @vavrfam !

Thank you for your interest in the feature. Yes, plans are build something in Spring AMQP based on the reactor-rabbitmq. And yes such a feature definitely can go into the next minor 2.3 version. Tell us, please, what else you would like to hear from us!

artembilan avatar Jan 30 '20 14:01 artembilan

And contributions are welcome 😄

garyrussell avatar Jan 30 '20 14:01 garyrussell

Thank you, appreciate your response. My question was more along if this reactive client feature is in the roadmap for spring-amqp project. If the answer is no, is it the expectation that users are supposed to reactor-rabbitmq for streaming? If you think this feature is small enough for people like me to contribute, would you be able to spend time with me for the design discussions, to talk through the implementation? Please let me know. Depending on the effort, I need to decide if my personal time is enough for this , or I need my organization support.

aup-1 avatar Jan 30 '20 19:01 aup-1

This has not been a priority; I felt my PoC didn't really add much value over the reactive client (aside from reusing the message converters).

We haven't had the bandwidth to spend much time on this; we need to revisit the reactor-rabbitmq to see if there is any real value spring-amqp can add to it.

garyrussell avatar Jan 30 '20 19:01 garyrussell

In our project, we rely heavily on the implementation RabbitListenerAnnotationBeanPostProcessor#processAmqpListener that decides which bean methods is responsible for processing the request. Current spring-amqp implementation works for RPC perfectly. Ideally, we would need similar support for streams, so that spring-amqp can support listener methods that has return type of Flux/Mono. Hope I am making sense.

aup-1 avatar Jan 30 '20 19:01 aup-1

We already have support for Mono<?> return types.

I am not sure what returning a Flux<?> from an RPC server means.

garyrussell avatar Jan 30 '20 19:01 garyrussell

For example (hypothetical example), my request contains a details of a huge file, I am expecting a response in chunks. If I get a Flux<> as return type , I could subscribe to it, and download the whole file.
Yes, we already using AsyncRabbitTemplate which is working nicely for Mono.

aup-1 avatar Jan 30 '20 19:01 aup-1

That implies some kind of multiplexing since RabbitMQ deals with discrete messages.

We could possibly add something like that to the existing infrastructure, at least on the reply side; probably not on the sending side, though.

garyrussell avatar Jan 30 '20 20:01 garyrussell

After some more thought, I don't think t would be too difficult to add a wrapper around the rabbit template; something like

Flux<Message> sendAndReceive(Flux<Message>)

Where send a message for each flux element and a "final" message when the Flux completes and use the "final" message to complete the "remote" flux (and similarly with the reply Flux).

Bear in mind though, that the underlying amqp-client is not reactive and can block - I don't know if/how reactor-rabbitmq solves that.

garyrussell avatar Jan 30 '20 21:01 garyrussell

Hmm... We are not gaining anything if client is not reactive right.

aup-1 avatar Jan 30 '20 22:01 aup-1

I haven't looked lately, but I seem to recall the rabbit team were going to look at adding back-pressure somehow.

garyrussell avatar Jan 31 '20 16:01 garyrussell

Hello! I wanted to ask, what is the status of reactive support in spring-amqp? Will it be released soon or at least took in a roadmap? By the way, i've used a lot of spring-amqp and now started to use reactive-rabbit. On my opinion it looks great but I have a question. Can I mix this two approaches? As Example, use sender and amqp admin from spring-amqp and receiver from reactive-rabbit and other combination? Or it's not a good way?

Nokinori avatar Apr 17 '20 08:04 Nokinori

@Nokinori We just haven't had time to work on it; maybe later this year.

Contributions are welcome!

Yes, you can use both libraries in the same application.

garyrussell avatar Apr 17 '20 13:04 garyrussell

@garyrussell Thanks for you answer! I investigated the source code more detailed. I think, I'll try to handle an implementation. I'm going to prepare a simple prototype first so you'll be able to check it. After that we'll discuss next steps.

Nokinori avatar Apr 22 '20 09:04 Nokinori