smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Provides Kafka Transaction support

Open loicmathieu opened this issue 4 years ago • 3 comments

Kafka Transaction support is usefull to provides:

  • Atomic send of multiple messages
  • Atomic consume / send of two messages to provides "processor" semantic (a component that reads from an inbound channel and write to an outbound one).

Reactive messaging didn't provides (yet) batching of messages but it's already possible to implement a processor by annoting a method by both @Incomming and @Outgoing annotations.

Kafka transactions could be used in this case (when both channel uses the same broker).

Ideal implementation would be something like:

@Incomming(in)
@Outgoing(out)
@Transactional
public Message process(Message) {}

Warning, reusing the javax.transaction.Transactional annotation, while tempting, could be a bad idea as it is often already wired up for database transaction and it would be more difficult to interact with a JTA transactional manager than providing a lighweight custom implementation.

loicmathieu avatar Jan 19 '21 15:01 loicmathieu

Another solution to using a @Transactional annotation would be to create another acknowledgement mode: @Acknowledgment(TRANSACTED) or to provides a @IncominOutgoing(incomming="in", outgoing="out", transactional=true) annotations.

loicmathieu avatar Jan 28 '21 09:01 loicmathieu

@ozangunalp can you describe your current approach? See #1680.

cescoffier avatar Apr 06 '22 10:04 cescoffier

The approach implemented in #1680 uses the emitter pattern for writing Kafka records inside transactions. It defines a custom emitter called KafkaTransactions, which can be injected as you'd do for an Emitter:

@Inject
@Channel("tx-producer")
KafkaTransactions kafkaTx;

Then to use it for sending messages inside a Kafka transaction:

@Incoming
Uni<Void> consumeAndProduce(String msgPayload) {
  return kafkaTx.withTransaction(emitter -> {
    emitter.send(KafkaRecord.of("topic1", "key", msgPayload));
    emitter.send(KafkaRecord.of("topic2", "key", msgPayload));
    emitter.send(KafkaRecord.of("topic3", "key", msgPayload));
    return Uni.createFrom().voidItem();
  });
}

The withTransaction method returns a Uni.

When subscribed (in this example as part of the incoming channel method) it'll begin the transaction, call the given function, which will send some records, and finally commit or abort the transaction.

The given function receives a TransactionalEmitter, and sends records concurrently, on the sender thread of the channel, without the need to wait for the result. The function itself returns a Uni, whose result will be returned from the withTransaction method. If an exception is thrown from the function, the passed emitter is marked for abort, or the returned Uni results in failure, the transaction is aborted. Otherwise, it is committed. Also, other variants of the withTransaction receive a Message and manages offset commit for exactly-once processing of the given message.

Using the Emitter-like API with a function to apply transactional work allows for clear demarcation of the transaction begin and commit/abort boundaries, allowing the application code to do work while the transaction is actually in progress.

ozangunalp avatar Apr 06 '22 18:04 ozangunalp

Closed by #1680

ozangunalp avatar Nov 09 '22 14:11 ozangunalp