smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
JMS publisher and subscriber in one JMSContext
Hi, would it be possible to add functionality for JMS publisher and subscriber to use the same JMSContext? I would like to use the SESSION_TRANSACTED.
I created my "test app" and extend the incoming/outgoing channel with "transaction-group". If they are in the same group, they will share the JMSContext. The JmsSink does the commit. Example: https://github.com/andrejpetras/quarkus-jms-stream/blob/master/impl/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java
That sounds a great addition!
What we can do is pass the context in the (incoming) message metadata. So, if we get the metadata in the outbound connector (JmsSink), we can use this context and commit. WDYT?
Does this depend on the @Acknowledgment? I am not sure if my test was correct but I have a method without @Acknowledgment and first was ack on incoming message and then commit in JmsSink (JMS publisher). This is why I do it in the JmsSink and there is JMSContext.
That's a good question - should it be committed in the acknowledgment callback?
Let's imagine I receive a JMS message. The connector creates the Reactive Messaging Message with an acknowledgment callback acknowledging the JMS message. Should the commit happen at the same time?
This callback is invoked when the message gets acknowledged. This can be triggered by the outbound connector (JMS or other), or earlier depending on the acknowledgment strategy.
In my case I would like to have one acknowledgment after received message and sent all out-going messages.
- receive message - consumer.receive()
- send messages - producer.send(...)
- commit
I am not sure if this is possible with current specification of the @Acknowledgment.
I did my manual tests:
public Message<String> test1(Message<String> input) {
return Message.of("RESULT " + input.getPayload());
}
- INCOMING MESSAGE COMMIT/ACK
- JMS-SINK COMMIT/ACK
public PublisherBuilder<Message<String>> message(Message<String> input) {
String data = input.getPayload();
return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data));
}
- INCOMING MESSAGE COMMIT/ACK
- PRODUCER SEND
- JMS-SINK COMMIT/ACK
- PRODUCER SEND
- JMS-SINK COMMIT/ACK
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
String data = input.getPayload();
return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
.onComplete(() -> {
System.out.println("ON_COMPLETE");
});
}
- INCOMING MESSAGE COMMIT/ACK
- PRODUCER SEND null
- JMS-SINK COMMIT/ACK
- PRODUCER SEND null
- JMS-SINK COMMIT/ACK
- ON_COMPLETE
Would it be possible to have for MANUAL @Acknowledgment the context.commit() in the onComplete method and no incoming and outgoing messages ack ?
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
String data = input.getPayload();
return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
.onComplete(() -> {
context.commit();
});
}
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
String data = input.getPayload();
return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
.onComplete(() -> {
// input.commit();
// input.ack() -> JMSContext.commit()
input.ack();
});
}
In the end, I had this solution
@Incoming("input")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> message(IncomingJmsTxMessage<Data> input) {
try {
...
// optional send message
input.send(Message.of("Output"));
// session commit
return input.ack();
} catch (Exception ex) {
// session rollback
return input.rollback();
}
}
This extension is based on the smallrye-reactive-messaging-jms version 2.0.0
https://github.com/lorislab/quarkus-reactive-jms-tx
Nice! And Yes SmallRye Reactive Messaging is bringing a few more things around acknowledgment (and we have more to come).
Note that you can still receive a regular message and extract the "TX part" in specific metadata.
BTW, I would love to have this here, so if you like to contribute this feature, feel free to open a PR.