nats-server
nats-server copied to clipboard
(2.12) Initial atomic batch publish
This PR implements an initial version for atomic/batch publishing, as described in this ADR.
In general:
- A client can now be implemented (given the current limitations listed below) to use the new batch API. For an example, see
TestJetStreamAtomicPublish. - The stream config field for
AllowAtomicPublishis added, support for batch headersNats-Batch-Id, Nats-Batch-Sequence, Nats-Batch-Commit, feature versioning, and support for batching as per the ADR. - Dedupe state (
mset.ddmap, mset.ddarr, mset.ddindex) is now guarded by a separate lock (mset.ddMu). This is required because we need to keep holdingmset.clMuwhile accessing the dedupe state, and we can't usemset.muas that would be a lock order violation. (Usage added tolocksordering.txt) - All clustered header checks (like
Nats-Expected-*andNats-Msg-Id) prior to proposal have been moved tojetstream_batching.go, such that they can be used to do checks for all messages in the batch prior to accepting it fully.
Implementation:
- Client publishes batches as per the ADR.
- Server receives these, and:
- Stores each message in the batch into a
StreamStore, currentlymemstoreonly. - Once
Nats-Batch-Commit, we loop through all messages and do the required header checks prior to proposing.
- Stores each message in the batch into a
- Server rejects the batch if gaps are detected, or any required header checks fail.
- Otherwise, Server proposes append entries consisting of the batched entries.
- Followers check they've received all proposed entries containing the batch, IFF the full batch is received they can simply apply all entries and don't need to do further header checks.
Atomic publish is not complete after this PR, importantly it has the following limitations that will need to be fixed in separate PRs:
- https://github.com/nats-io/nats-server/issues/6974
- https://github.com/nats-io/nats-server/issues/6975
- https://github.com/nats-io/nats-server/issues/6976
- https://github.com/nats-io/nats-server/issues/6977
- https://github.com/nats-io/nats-server/issues/6978
- https://github.com/nats-io/nats-server/issues/6979
- https://github.com/nats-io/nats-server/issues/6980
- etc.
Relates to https://github.com/nats-io/nats-server/issues/6549
Signed-off-by: Maurice van Veen [email protected]
Headers like Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id should be rejected. For example, given these headers a batch would fail: if a batch starts and anything other than the first message contains these headers, or if between our batch start and commit a new publish or batch was admitted in. We also don't know with absolute certainty what the sequence or message ID of any proposed but not yet applied message is. Usage of these headers should be rejected so we can ensure batches don't need to be continuously retried.
I dont think this is right, if you accept these headers and check them against the pre-batch state before committing the batch and while the lock is held it all makes perfect sense.
I might want to put messages into a batch for many subjects and I want to be sure that none of those subjects had updates while the batch were moving from client to servers, then it makes sense to express these limits and to verify them pre-batch. The batch will always be consistent so they dont need to reference into the batch, but they totally can reference pre-batch state and that makes these headers do the right thing.
Restricting the headers to first message only doesn't make sense since the batch can have many subjects, first one isnt useful.
if you accept these headers and check them against the pre-batch state before committing the batch and while the lock is held it all makes perfect sense.
This is where the problem lies. We can't deny the batch up front, even with locks held. Because all the replicas do the last sequence and last msg ID checks on their own. So with the current code we simply can't deny it up front from the leader. Maybe we could make that work, but I highly doubt it because we can't guarantee what the last sequence is going to be until it is applied on a replica. I don't have an answer for this issue yet.. but I'd rather have us initially reject the headers all together if we don't have a fix, instead of partially applying a batch (what would happen now).
Restricting the headers to first message only doesn't make sense since the batch can have many subjects, first one isnt useful.
Headers are NOT restricted to the first message only. I meant that the last sequence and last msg ID checks only really work if set on the first message. For example, if you have a stream with 5 messages and you send the following batch:
- msg1 no headers, msg2 expected last seq must be 5 => this batch will fail, because the last sequence after applying msg1 will be 6
- msg1 expected last seq must be 5, msg2 expected last seq must be 5 => this batch will fail, because the last sequence is upped to 6, so the consistency check would fail on msg 2
last msg ID doesn't make sense to me for anything but the first message, because if you use last msg ID on all of them, you will error if one of them was a duplicate, so one msg ID didn't need to be applied. last seq does work if used for more than only the first message, but only if it increases for every single message, and even then it would be tricky because if you use a msg ID for deduplication and the message is a duplicate, then the whole batch will fail as well.
The current PR has no opinions about these specific headers, to be clear. But we need to do something with them. Either reject them or fix the problem. Rejecting is easy for 2.12, fixing the problem could be tricky..
OK, we definitely need to think about these, agree for now we can reject them but I think we will need some thing here as atomic publishes seem quite likely to want to express consistency
- msg1 no headers, msg2 expected last seq must be 5 => this batch will fail, because the last sequence after applying msg1 will be 6
Indeed, hence me saying the state as it was before the batch land, conceptually that makes complete sense but I hear you about the implementation issues
natscli main can create and edit these streams and @ploubser did some extensive end to end testing and found it to be working well for R>1
Because all the replicas do the last sequence and last msg ID checks on their own.
@MauriceVanVeen why doesn't just the leader do this?
I think we will need some thing here as atomic publishes seem quite likely to want to express consistency
Indeed, this is important for the event sourcing use case, specifically, Nats-Expected-Last-Subject-Sequence and the fairly new Nats-Expected-Last-Subject-Sequence-Subject, e.g.
PUB orders.123.order-placed
Nats-Expected-Last-Subject-Sequence-Subject: orders.123.*
Nats-Expected-Last-Subject-Sequence: 4
In this case, if there was a batch, let's say 3 messages with Sequence-Subject of orders.123.*, beyond the first message, the sequence number check should not be required (although in this case, it would be monotonic by 1).
For the case of a batch where the Sequence-Subject is not specified, for each message with a different subject, the last sequence should be checked.
So the checks are relative to the unique subjects in the batch (whether Sequence-Subject or the default case of the message subject).
For Nats-Expected-Last-Sequence and Nats-Expected-Last-Msg-Id this a stream-level sequence check, so any concurrent commits would fail the batch.
@bruth, to clarify because there are many moving pieces to the whole batching story. (And I'm trying to keep this PR as small and as reviewable as possible as well)
The Nats-Expected-Last-Subject-Sequence and Nats-Expected-Last-Subject-Sequence-Subject are already fully supported with just this PR. I believe that already covers the majority of the event sourcing use case you mentioned?
*There's one side-note I'll address at the bottom of this message, before the TL;DR.
For
Nats-Expected-Last-SequenceandNats-Expected-Last-Msg-Idthis a stream-level sequence check, so any concurrent commits would fail the batch. ... why doesn't just the leader do this?
Correct, it's specifically these two headers that the leader does not check, and all replicas check on their own. Why the leader doesn't do this is twofold. One is simply for historic reasons I think, it was never necessary, but for batching it will be important to have the leader do this check and subsequently fail the batch if there's a mismatch. The other issue is that we can't 100% guarantee if the checks work correctly if the leader does do the check (can we really 100% be sure the last sequence/msgId work even with multiple inflight proposals, etc.).
But maybe I'm just overly protective :sweat_smile:, i.e. we couldn't guarantee this before in previous server versions but now with all the Raft fixes we can. And I'll just need to go through the code again and confirm if the leader says "YES" to accept and propose the message, that the replica will also always say "YES" to applying the message.
I'll adjust the PR description to reflect this is a current limitation, and we shouldn't reject those headers but work on a fix for a subsequent PR. :+1: (assuming we can ensure the right guarantees, but I'm thinking it should be possible and just need to confirm)
Side-note
Side-note for the Nats-Expected-Last-Sequence and Nats-Expected-Last-Subject-Sequence headers. Like you specify here (if I understood you correctly):
beyond the first message, the sequence number check should not be required (although in this case, it would be monotonic by 1).
There are currently also some limitations that blocks any changes to a message when Nats-Expected-Last-Subject-Sequence is used. Once you publish one message under subject A, if you try to do another such consistency check while the first message is still actively being proposed and not applied yet, it will fail prompting you to retry.
This is what will currently prevent you from having a batch that uses the same Nats-Expected-Last-Subject-Sequence-Subject or message subject for multiple messages. This error was added because this was one reason how we could desync for the KV use case (or for event sourcing, etc.), so it can't simply be removed.
I believe the fix that's required for this will be very similar to the one that's needed for Nats-Expected-Last-Sequence to work and be checked by the leader. I'll work on a subsequent PR once that's figured out and works.
(This will probably also fix another issue where we could desync if you publish to subject A both with and without the expected last subject sequence check, have server restarts, catchups, and message deletions prior to catchup)
TL;DR
- Trying to keep this PR contained. There's still loads of work to do, so want to make sure we agree on the API and commit to fixing the listed (or to be discovered) current limitations in separate PRs. To not let this PR explode in terms of content and discussion (and make all of them easier to review and merge).
Nats-Expected-Last-SequenceandNats-Expected-Last-Msg-Idcurrently don't properly fail the batch if the check is invalid (because the replica, not the leader, does these checks). This will need to be fixed in a separate PR.- If a batch consists of multiple messages using the same
Nats-Expected-Last-Subject-Sequence-Subjector message subject, then the batch will fail instead of pass, even if the consistency checks should be successful. This will also need to be fixed in a separate PR. Probably the fix will be very similar to the one needed to fixNats-Expected-Last-Sequenceusage, so that'll likely be the same PR instead of separate ones.
Have created sub-issues for the current set of limitations, so they are already tracked.
Okay to merge as-is, and start work on the remaining items in separate PRs?
Wrote the simplest possible API for atomic batch publish in Orbit.go, if someone would like to take it for a spin.
Returns a PubAck just like jetstream.PublishMsg(ctx, msg), and takes a list of messages as part of the batch:
pubAck, err := jetstreamext.AtomicBatchPublish(ctx, nc, msgs)
Side-note: I can imagine an alternative API for this could be to not specifically pass the messages in a single method, but instead call the method for each individual message (similar to PublishAsync, but then with the new batch semantics). The client could then send the batch automatically and transparently when a configurable threshold is reached, either a timeout or max messages in the batch. Along with a setting like "max batches inflight", it could be set to "1" if you'd want to do very fast guaranteed ordered publishing into the stream (what PublishAsync can't currently guarantee).
I would imagine the use case you mention will be more popular vs staging all the messages in a slice/array first. Be interested in what others think as well.
Wrote the simplest possible API for atomic batch publish in Orbit.go, if someone would like to take it for a spin.
Returns a
PubAckjust likejetstream.PublishMsg(ctx, msg), and takes a list of messages as part of the batch:pubAck, err := jetstreamext.AtomicBatchPublish(ctx, nc, msgs)Side-note: I can imagine an alternative API for this could be to not specifically pass the messages in a single method, but instead call the method for each individual message (similar to
PublishAsync, but then with the new batch semantics). The client could then send the batch automatically and transparently when a configurable threshold is reached, either a timeout or max messages in the batch. Along with a setting like "max batches inflight", it could be set to "1" if you'd want to do very fast guaranteed ordered publishing into the stream (whatPublishAsynccan't currently guarantee).
I would imagine the use case you mention will be more popular vs staging all the messages in a slice/array first. Be interested in what others think as well.
I agree with generalizing it as the core API and then a convenience function that takes a slice as a one-shot.