smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
Provides Kafka Transaction support
Kafka Transaction support is usefull to provides:
- Atomic send of multiple messages
- Atomic consume / send of two messages to provides "processor" semantic (a component that reads from an inbound channel and write to an outbound one).
Reactive messaging didn't provides (yet) batching of messages but it's already possible to implement a processor by annoting a method by both @Incomming
and @Outgoing
annotations.
Kafka transactions could be used in this case (when both channel uses the same broker).
Ideal implementation would be something like:
@Incomming(in)
@Outgoing(out)
@Transactional
public Message process(Message) {}
Warning, reusing the javax.transaction.Transactional
annotation, while tempting, could be a bad idea as it is often already wired up for database transaction and it would be more difficult to interact with a JTA transactional manager than providing a lighweight custom implementation.
Another solution to using a @Transactional
annotation would be to create another acknowledgement mode: @Acknowledgment(TRANSACTED)
or to provides a @IncominOutgoing(incomming="in", outgoing="out", transactional=true)
annotations.
@ozangunalp can you describe your current approach? See #1680.
The approach implemented in #1680 uses the emitter pattern for writing Kafka records inside transactions.
It defines a custom emitter called KafkaTransactions
, which can be injected as you'd do for an Emitter:
@Inject
@Channel("tx-producer")
KafkaTransactions kafkaTx;
Then to use it for sending messages inside a Kafka transaction:
@Incoming
Uni<Void> consumeAndProduce(String msgPayload) {
return kafkaTx.withTransaction(emitter -> {
emitter.send(KafkaRecord.of("topic1", "key", msgPayload));
emitter.send(KafkaRecord.of("topic2", "key", msgPayload));
emitter.send(KafkaRecord.of("topic3", "key", msgPayload));
return Uni.createFrom().voidItem();
});
}
The withTransaction
method returns a Uni
.
When subscribed (in this example as part of the incoming channel method) it'll begin the transaction, call the given function, which will send some records, and finally commit or abort the transaction.
The given function receives a TransactionalEmitter
, and sends records concurrently, on the sender thread of the channel, without the need to wait for the result. The function itself returns a Uni
, whose result will be returned from the withTransaction
method.
If an exception is thrown from the function, the passed emitter is marked for abort, or the returned Uni
results in failure, the transaction is aborted. Otherwise, it is committed.
Also, other variants of the withTransaction
receive a Message
and manages offset commit for exactly-once processing of the given message.
Using the Emitter-like API with a function to apply transactional work allows for clear demarcation of the transaction begin and commit/abort boundaries, allowing the application code to do work while the transaction is actually in progress.
Closed by #1680