smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

JMS publisher and subscriber in one JMSContext

Open andrejpetras opened this issue 5 years ago • 8 comments

Hi, would it be possible to add functionality for JMS publisher and subscriber to use the same JMSContext? I would like to use the SESSION_TRANSACTED.

I created my "test app" and extend the incoming/outgoing channel with "transaction-group". If they are in the same group, they will share the JMSContext. The JmsSink does the commit. Example: https://github.com/andrejpetras/quarkus-jms-stream/blob/master/impl/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java

andrejpetras avatar Mar 31 '20 20:03 andrejpetras

That sounds a great addition!

What we can do is pass the context in the (incoming) message metadata. So, if we get the metadata in the outbound connector (JmsSink), we can use this context and commit. WDYT?

cescoffier avatar Apr 01 '20 06:04 cescoffier

Does this depend on the @Acknowledgment? I am not sure if my test was correct but I have a method without @Acknowledgment and first was ack on incoming message and then commit in JmsSink (JMS publisher). This is why I do it in the JmsSink and there is JMSContext.

andrejpetras avatar Apr 01 '20 07:04 andrejpetras

That's a good question - should it be committed in the acknowledgment callback?

Let's imagine I receive a JMS message. The connector creates the Reactive Messaging Message with an acknowledgment callback acknowledging the JMS message. Should the commit happen at the same time?

This callback is invoked when the message gets acknowledged. This can be triggered by the outbound connector (JMS or other), or earlier depending on the acknowledgment strategy.

cescoffier avatar Apr 01 '20 08:04 cescoffier

In my case I would like to have one acknowledgment after received message and sent all out-going messages.

  1. receive message - consumer.receive()
  2. send messages - producer.send(...)
  3. commit

I am not sure if this is possible with current specification of the @Acknowledgment.

I did my manual tests:

public Message<String> test1(Message<String> input) {
    return Message.of("RESULT " + input.getPayload());
}
  1. INCOMING MESSAGE COMMIT/ACK
  2. JMS-SINK COMMIT/ACK
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data));
}
  1. INCOMING MESSAGE COMMIT/ACK
  2. PRODUCER SEND
  3. JMS-SINK COMMIT/ACK
  4. PRODUCER SEND
  5. JMS-SINK COMMIT/ACK
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                System.out.println("ON_COMPLETE");
            });
}
  1. INCOMING MESSAGE COMMIT/ACK
  2. PRODUCER SEND null
  3. JMS-SINK COMMIT/ACK
  4. PRODUCER SEND null
  5. JMS-SINK COMMIT/ACK
  6. ON_COMPLETE

Would it be possible to have for MANUAL @Acknowledgment the context.commit() in the onComplete method and no incoming and outgoing messages ack ?

@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                context.commit();
            });
}

andrejpetras avatar Apr 01 '20 20:04 andrejpetras

@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public PublisherBuilder<Message<String>> message(Message<String> input) {
    String data = input.getPayload();
    return ReactiveStreams.of(Message.of("RESULT_1 " + data), Message.of("RESULT_2 " + data))
            .onComplete(() -> {
                // input.commit();
                // input.ack() -> JMSContext.commit()
                input.ack();
            });
}

andrejpetras avatar Apr 02 '20 12:04 andrejpetras

In the end, I had this solution

@Incoming("input")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> message(IncomingJmsTxMessage<Data> input) {
    try {    
      ...
      // optional send message
      input.send(Message.of("Output"));
        
      // session commit
      return input.ack();
    } catch (Exception ex) {
        // session rollback
        return input.rollback();
    }
}

This extension is based on the smallrye-reactive-messaging-jms version 2.0.0 https://github.com/lorislab/quarkus-reactive-jms-tx

andrejpetras avatar Apr 15 '20 08:04 andrejpetras

Nice! And Yes SmallRye Reactive Messaging is bringing a few more things around acknowledgment (and we have more to come).

Note that you can still receive a regular message and extract the "TX part" in specific metadata.

cescoffier avatar Apr 15 '20 08:04 cescoffier

BTW, I would love to have this here, so if you like to contribute this feature, feel free to open a PR.

cescoffier avatar Apr 15 '20 08:04 cescoffier