nats-server
nats-server copied to clipboard
Jetstream: transactional batch publish messages
Feature Request
New pub style analogous to fetch for pull based consumers to allow multiple messages for a stream to be published with a single call and all succeed or fail (effectively a transaction). Messages can be for multiple subjects in the same stream.
Use Case:
There are cases where multiple messages must all successfully publish to ensure consistent state on the system. An example of this using ORDERS would be when an incoming order will be fulfilled by multiple locations and has its line items split on publish. A single order may then result in multiple messages with a format such as:
ORDERS.{locationId}.{itemId}
If there is a connection failure in the middle of publishing the orders messages the systems state is now inconsistent (a partial order is in the stream). The ideal way of handling this would be to publish all of these messages in a single batch which would succeed / fail as a whole. Note messages should not be available to consumers until the batch was successful.
Another scenario where this may be useful is to allow clients to buffer messages before sending, ie bulk insert.
Proposed Change:
Using the Go client as an example something along the lines of:
type batch struct {
subj String
payload [][]byte
}
PublishBatch(batches ...batch, opts ...PubOpt) (*PubAck, error)
Who Benefits From The Change(s)?
- Applications requiring transaction based publishing of events
- Applications with high publish rates of small messages
Alternative Approaches
- An application could attempt to do a rollback in the situation of connection failure mid publish (ie delete events for the partial orders line items) however as connection was lost these may be published to the stream a significant time afterwards. This is problematic if there are downstream systems immediately acting on the initial line item pub. Additionally in the case that the upstream application dies mid publish it may be difficult to determine that the order was only partially published. Effectively the rollback can be handled via distributed transactions however these can be quite complex to implement successfully and it is much simpler to catch it on the initial publish.
- An application could publish all the line items as a single "batch" message however this removes the ability to direct traffic to consumers based on subject, removing a core benefit of Jetstream in this scenario.
For higher speed publishing you can do async publishing like the following in Go. Other clients are adding as well.
for i := 1; i <= 1_000_000; i++ {
js.PublishAsync(fmt.Sprintf("orders.%d", i), []byte("OK"))
}
<-js.PublishAsyncComplete()
For full TX semantics which proper batch would require, would need quite a bit of work.
There may be ways to explore that could accomplish these goals with the system today.
I think this is a bit dangerous. The semantics where a publish to stream can validate that no new messages was persisted in between, may already create issues in busy streams. If we’d try to make multi msg transactions we would add a lot of possibility for dead locking and similar.
I would rather receive an order and publish the whole order as one msg on a stream. If I then need it to be split up into multiple streams, I’d have a listener on the whole-order-stream that would (with dedup/idempotency) publish each part to their own topics as per the original request above and then ack the whole-order-msg. Maybe even publish a ‘order processed’ msg in its own topic if there may be consumers that cannot start until all parts is written.
this would, as I understand it, do the same thing while not requiring any transaction (and is also fairly easy to scale)
just my .02 euro
T
The comment on preventing new messages before the batch is processed and how that might impact a busy stream is something I hadn't fully considered.
Currently I am implementing the following:
- Each part is published with a header containing the order id
- Downstream systems receive without ack their parts from separate subjects grouping by the header order id (extending Ackwait via AckProgress)
- Once the entire order is successfully pushed to the stream an order.created event is published signalling the downstream systems to process the parts, and ack them.
- If order sending interrupted order.deleted sent when reconnected and client system receives error for the order submission
- Downstream systems clean up the waiting parts on order.deleted received
We need this
We need this
Need what?
We need this
Need what?
This is a feature request, right?
Yes but I am asking what specific parts? Batch, local TX, cross asset TX, etc..
Batch sending multiple messages. They might be for the same subject or different subjects in a one stream. It's important to have all-or-nothing behavior so that either all messages in a batch are sent or none. It doesn't matter whether they can be interleaved with other messages sent at the same time.
This is our usage case
Additionally in the case that the upstream application dies mid publish it may be difficult to determine that the order was only partially published. Effectively the rollback can be handled via distributed transactions however these can be quite complex to implement successfully and it is much simpler to catch it on the initial publish.
Also we already have to use this approach for better performance
An application could publish all the line items as a single "batch" message however this removes the ability to direct traffic to consumers based on subject, removing a core benefit of Jetstream in this scenario.
Having a built-in batch functionality would allow us to use multiple subjects without loosing performance.
For performance we have async publish which async waits for the pubAcks, might want to try that for better perf if that is a concern since normal js Publish is sync RPC by default.
We can't use async publish because ordering of messages from a same producer is important. Messages order in the same batch should be preserved (of course only if they belong to a same stream). Also ordering of multiple batches sent from a same producer should be preserved too. However, other producer might insert its messages anywhere in between which is OK.
Async publishes preserve order.
I'm not sure about that. If you call send on a TCP socket with async io in .NET is there a guarantee that another send call immediately after this will be delivered after the first buffer, not before?
Async in this case means we do not wait in place for the PubAck but track them and process them asynchronously. We can limit how many can be allowed to be outstanding.
If the order is only important per subject, sender (and batch), why not send all that data in same message? Sort and group by subject and then send a msg that will either be sent or not.
Combined with a batch message containing all expected msgs to be sent, where this msg could be acked/removed when all parts was acked by the relevant stream(s) should give same behavior lock free. The sender will, when starting, re-receive full batch msgs, an can read streams to validate if each parts is there or still need to be published.
This could be parallel for the happy-flow if it’s time critical while offering eventually consistent guarantee with minimal scaling limitations
- The order is important per stream but there could be different subjects in the same stream.
- I can't send data to the same subject because I have different consumers which should not receive things they don't need. Filtering on their side is not an option because the traffic is very noticeable and such cpu usage is not acceptable.
Thats not what I meant.
don’t filter on their side. But if order only is relevant per subject, publish one msg per subject with all msgs for that subject in the right order inside
Maybe if you give concrete example?
publish one msg per subject with all msgs for that subject in the right order inside
@tpihl If I do it this way I can't guarantee that multiple related messages are sent to their subjects. It's possible that app dies after sending to one subject and before sending to another. But I need the all-or-nothing semantics.
Imagine you transfer 20 USD to your friend Bob. Your balance and Bob balance are processed on different nodes. The system sends to one node (subject 1): "subtract 20 USD from tpihl", and to another node (subject 2) - "add 20 USD to Bob". If it dies in between then Bob doesn't receive your money though it's subtracted from your balance. If each node listens to both subjects it causes too big resource usage (imagine 100 nodes filtering their related messages from a common stream).
Instead all related messages could be sent to NATS as a single batch, even messages with different subjects.
What I tried to explain before is that your case should be one event (the transfer) with a from, to and amount
One optimization I’m assuming you do is to keep some kind of persistent balance. And that’s reasonable, even banks and bookkeeping does it, but it can always we recreated by rereading all events (the balance is a snapshot in time, and when all events is read, that time is now)
to make my example something more similar to yours, imagine we have different banks and the banks cannot, should not read each other’s stream so we must make multiple writes;
well, start just as above with one event. My bank records that event and when aggregating my transactions will find the -$20. The “to” is an external entity in another bank so currently those $20 is in my banks deposit for Bobs bank.
my bank would then run a process subscribing to transactions from/to Bobs bank, and receiving my transfer it would write a new message in Bobs banks stream with from: mybank.tpihl and to Bob with $20. So whatever holds a snapshot of Bobs account will now receive this msg and Bob can spend his money.
No transaction, no messy locks, just three processes listening to each other. And if one is down, things are delayed a bit, that’s all. Each of the processes must be idempotent and handle ack and other things, so not simpler or easier than a database transaction, just very different.
But that’s the cost of asynchronous and loosely coupled eventbased solutions.
@tpihl thanks, that might work. Anyway, if I'm forced to batch messages myself it's harder to use some JetStream features like deduplication (because there are multiple logical messages in a single NATS messages) and manual acks (because I have to ack the whole batch instead of a single logical message). And it's impossible to publish to multiple subjects atomically - yes, I can workaround it with "chaining" like in your solution but in some cases it would be much easier (and have better latency) to just publish into multiple subjects in a way that consumers don't need to know anything about each other.
I think this feature is still necessary.
Just a quesrion; if it’s the same work needed to be done to support your use case, why do you think latency will be smaller if it’s solved inside nats jetstream?
I’m more afraid that make Ing this complex behavior info the standard jet stream will also make all simple operations slower since they will have to take transactions info account. And since I have 99% simple operations I myself prefer those to be simple and fast while accepting that I have to do the few complex things myself.
I fear that we’re diving into feature complexity hell a small step at the time. My view of jetstreams edge over more complex engines is the simplicity. Less simplicity, maybe better to use Kafka instead
@tpihl Latency is smaller because processing is started by two banks simultaneously. But in your approach the message is sent to the second bank only after the first bank receives it.
You mean the latency induced by 2 steps vs latency induced by locking, committing, rollbacks and similar?
Ofc you’re right. If we only consider one msg with two destination subjects, this is slower, since there is a publish, a subscribe reception, two publish and one ack involved before the real consumers subscriptions will get their own msgs
with transaction it would just be two publish and one commit with either error or ack
But on the server side we now must hold a lot more stuff in memory, and since we’re in “generic world” we must now do ACID. (Ref https://en.wikipedia.org/wiki/ACID).
Another way for you could be to write your stuff to a acid(sql) database and then read it to send messages. Because that’s where we might end up if we continue to add complexity here.
I think the checks already in place to fail publish to stream based on expected last message is great. And async publish is also great to optimize when there is no need for a rollback of the writes that did succeed. My maybe humbler opinion is that more complex cases either should be decomposed (as I tried to make examples for) or solved outside the messaging/streaming solution
/T
In our system there is no need to do rollbacks and locking.
The latency of send -> receive on bank1-> send to subject 2 -> receive on bank2
is higher then send (2 messages, atomic) -> simultaneously receive on bank1 and bank2
. In the second example bank2
doesn't need to wait for bank1
to receive the message.
So then you don’t need to batch send, just send them async. The rollback/lock was for the ‘all or nothing”, and if that’s not needed, no issue. Or do a saga pattern
No transaction, no messy locks, just three processes listening to each other. And if one is down, things are delayed a bit, that’s all. Each of the processes must be idempotent and handle ack and other things, so not simpler or easier than a database transaction, just very different.
@tpihl This is something that I have been going back and forth on whether to design for the expectation that multiple events could be returned from, say, a command handler, or model it such as there is always the primary event that is produced and written to a stream and then there is simply (not necessarily simple) listeners to react to that event and publish follow-up events that may be relevant.
It sort of always bothered me that a single command could result in multiple events produced. It feels like a short circuit of event X being produced that would lead to the condition of event Y. For example, a command to move a chess piece that would result in checkmate. One could model this as the "piece-moved" and then an assert of the move to determine "checkmate" or "game-finished". Instead the command results in the moved pieced, and the evaluation post-move determines the state of the game and any implicit result from that.
I absolutely agree that transactions, as well as exactly once, could simplify some scenarios, but at what cost? Because the more complexity you add, the more corner-cases will appear.
Moving a check-piece should not be considered an event, because the move will be undone if it was illegal (moving into check) and since we talk cqrs;
why does everyone assume that there is any correlation between a command and a “main” event derived from that command? I’ve seen frameworks on GitHub that assume an event is the command.
a command is a wish for something, right. It may result in an event stating it succeeded or that some entity properties changed. Or many events over time (the usual example is travelagency and travel with taxi, hotel, flight and future cancellations and so on). Or an event stating it failed. And possibly several commands to other services. There is no sync to real events, they happened, they are facts and not conditional on anything else. If multiple events happened, they should be documented as they happen (I.e. not based on return value).
I think I need another example to understand your scenario @bruth