Ability to reset consumer start sequence
Feature Request
Use Case:
The motivation comes from event sourcing-style use cases, but can apply to general sourcing of a stream into some materialized form.
Event sourcing is a persistence model where events are the source of truth and all state materializations are derived. What this means for NATS is that consumers are created and named corresponding to these materializations. Additionally, it is common for these consumers to only target subset of a stream (subject filter applied).
For one-off cases, an ephemeral consumer works great. For larger or persisted materializations, a durable will be used. Since these materialized views can be thrown away, rebuilt, etc. after bugs are found, materialization logic is changed, etc it is common that a consumer will need to be reset the sequence to the initial message to replay from the start of the stream to rebuild this view.
The start sequence may need to be reset to the original first event, or if snapshots are being used, it may need to be reset to an arbitrary sequence relative to that snapshot.
Proposed Change:
The ability to dynamically reset/change the start sequence on a consumer.
Who Benefits From The Change(s)?
Anyone wanting to reset the sequence on a consumer (event sourcing or not).
Alternative Approaches
Today, the consumer start sequence cannot be changed, so to achieve this behavior, the consumer has to be deleted and then recreated with the same name.
Why is delete->create not a good answer here?
The feature request is actually not resetting the start sequence what you're asking is entirely resetting the consumer to fresh (optionally with a new start configuration).
This is becuase the various start options are used once only (at start). So updating them makes no sense - once the consumer exist those settings are never used again.
So to change them, conceptually, you need to entirely reset and go back to beginning blowing away all state of the consumer. Recreating the consumer might be a better signal there.
The event sourcing callout is definitely high on my priority list. In event sourcing, I may need to do ad hoc replays at any given time. It could be because I'm migrating schema, or because I'm recovering from data loss, or because I've updated aggregate or projector code. In all of those cases I need to be able to tell any given component to start again from a given point (usually start, but as mentioned, arbitrary start points are useful for snapshots, etc). I usually use server-managed consumers to do this in non-NATS environments (lookin' at you, Kafka).
Whether or not I'm using event sourcing, the biggest problem with today's workaround (delete and recreate) is that durable consumers have meaningful configuration. If I want to delete and recreate one, I have to be sure that I copy the old configuration properly. I can definitely envision messing up the config backup/restore operation and wrecking an environment. Delete and recreate is not something anyone feels comfortable doing in a production environment, no matter how safe you tell them it is.
In large distributed systems, deleting a durable consumer and recreating can potentially break or interrupt running code. If something is attempting to query consumer metadata during the recreation, it will either fail to get data, or it could get the wrong metadata.
So, the operations I want on the durable consumer is to set the last read position to a) the beginning b) a specific time c) a specific index or d) the end
@ripienaar Yes, understood all the state will need to be blown and reset. This is from the perspective of UX and (possibly) continuity of configuration or subscribers bound to that consumer as well.
Why is delete->create not a good answer here?
Because this is super dangerous to do to a running system. That consumer is -missing- during this operation, which can bust all kinds of things. It's also highly error prone. Further, if I want to reset my start position to something other than the beginning of time, I can't do that with delete/recreate. If I want to wind back to a specific snapshot, that's not possible.
You can of course go back to a specific point in time with a delete/create since you can give it any specific point you wish.
I suspect, though not 100% sure, doing this will require clients changes also as some current client operations will not appreciate sequences going backward.
Anyway, to be clear, not against the idea, just wanted to get some more background information.
Wouldn't deleting and recreating a durable consumer with an earlier index while clients are connected also result in the same issue (backwards sequence)?
Wouldn't deleting and recreating a durable consumer with an earlier index while clients are connected also result in the same issue (backwards sequence)?
Yeah, I think the biggest one that might be exposed to that would be ordered consumers - they are ephemeral anyway.
Point is this would be quite a change, all clients authors would need to go and verify assumptions at the very least.
In large distributed systems, deleting a durable consumer and recreating can potentially break or interrupt running code. If something is attempting to query consumer metadata during the recreation, it will either fail to get data, or it could get the wrong metadata.
Just wanted to comment on this part here. You should expect consumer info or other metadata fetch will fail at any time. There are many scenarios that can interrupt this. If your aim is to write JS client code that will not fail you need to have vast amount of retries and backoffs in your code as these kinds of things can happen at any time.
Just taking some notes here:
Not sure how much would apply the https://developers.eventstore.com/clients/grpc/persistent-subscriptions.html#acknowledgements but worth to bring it up. I think
- Unknown: The client does not know what action to take. Let the server decide.
- Park: Park the message and do not resend. Put it on poison queue.
- Retry: Explicitly retry the message.
- Skip: Skip this message do not resend and do not put in poison queue.
From: @bruth
The Unknown ack is interesting... need to read more about that. The description isn't clear on the semantics. Park would be kick over to DLQ. Retry is the default behavior, but can be controlled with Nak or NakWithDelay by the client. And Skip is equivalent to the Term message ack.
For reference, in the context of Go, https://pkg.go.dev/github.com/nats-io/nats.go#hdr-Types_of_Acknowledgements
Sorry if I jump and have misunderstood everything. I’m also heavy into eventsourcing but this topic confuses me.
My (durable) consumers are usually not shared in eventsourcing. If have some shared for work fan out, but that’s not aggregates/projections.
I’m always creating my consumer in the same process that needs it (usually as a first step, and if it’s removed I’ll exit/restart to make sure it’s recreated the same way every time)
and if I need to reread something or make a short lived “consume with a good subject filter” to directly aggregate those msgs and maybe emit a new msg with constraints that nothing relevant changed between aggregate and publish, I will do an ephemeral consumer just for that situation. And if I’m just aggregating a read view, I use the ordered-delivery (it’s magical but maybe only available in go).
So I’m really interested in what cases you use the same durable consumer for multiple concurrent processes (the only situation where there would be a danger to drop/create the consumer as I understand it)?
/T
If you are using a push durable consumer then wouldn't the client applications subscribing to that consumer not be bothered when the consumer is deleted and re-created?
In any case I’m not sure there’s that much difference between delete and re-create the consumer and keep consumer and reset state because I doubt the client libraries are actually monitoring the subject to know when the consumer gets deleted or created, so from the client library’s point of view it doesn’t notice the difference between the consumer being deleted and recreated and the consumer being reset: either way either it won't notice (push consumer) or you just have to call Fetch again (pull consumer, which will then send a new request on $JS...next (same subject as before)) .
If each “receiver” have and handle their own consumer, then no side effects. And maybe someone looking on the implementation can compare resources needed to have one consumer per receiver vs sharing a consumer and still get one message each. But so far I have used one consumer per receiver and it works. And I think it’s a better “separation of concern and decoupling”
share by communicating instead of communicate by sharing. I may be a minimalist sharer, trying to share nothing between components
I am running evenstore for several years now and re-generated full projections several times.
At the end the server-side consumer does not matter for the projection, It is important to store the seq-number inside the projection itself.
A easy and good option for an ACID DB would be, inside a table with a row for the projection and a column with the last-seq-number applied.
If I need to store this kind of information without using durable consumers, i will add another stream with max msgs per subject 1, discard old and then emit a new msgs with how far i am in the stream when i am Acking the msgs. That way i can get last msg to find where to start the ephemeral consumer.
Why is delete->create not a good answer here?
If the stream has interest-based retention, then deleting and recreating the consumer may cause us to lose messages by signalling no interest. I do think having the ability to reset consumers would be great to have and it's much easier for a restarting application to reason about that given that redeliveries are extremely hard to get right.
@tpihl Sry for my late reply, to hold the last-seq-nr from a projection also in nats is will not work properly in some failure situations.
The acid db guaranties that the write of the data records and the update of the last-seq-nr is an atomic process aka transaction,
without it any aggregate value will get out of sync on infrastructure failure.
@Zetanova,
If you look inside a ACID transaction in any database you will find some kind of saga-ish pattern and compensatory actions if things didn’t work out.
When an ACID database restarts, it will look for transactions that have logged that they started but didn’t log they finished.
So you can do same thing with the tracking stream, will be some more overhead but still decoupled. Emit event stating that your client started to process event x, process event x including pushlishing other data, then emit that you completed event x.
When starting up, read the tracking stream, if is say ”starting” then reprocess that event but eliminate outputs that already exist, and then issue the completed tracking event.
You will of course open up for race conditions if you would run the same consuming logic with multiple parallel progresses. That is the reason for me where I’m normally assuming that I may have already processed part of the event after the tracked seq no. Idempotent design is important in at least once delivery.
I know that are some write up somewhere here on how to achieve exactly once delivery, maybe that is what you are after in some other way?
@tpihl If double processing/delivery of an aggregate value is not important for the business domain, your approach will is working fine. But if you need an exact aggregate value like an account balance you need to track the processed seq-nr inside the projections itself. Even if you track the last-sq-nr outside with processing/completed event pair, you will have no guaranty that the last-seq-nrwill get persisted after the aggregate value inside the project got updated.
@Zetanova, I agree. But why not include (in msg or in a header) what max seqno(s) was processed creating this aggregated record.
When he aggregator restart, it will use the last aggregated msg to get the start-point of or each ephemeral consumer?
However, this topic was about to to reset a consumer, and I thing we left that domain here
@tpihl what you are proposing is can be resolved with a Distributed transactions and this would be not a good idea in the first place. Beside that nats does not support it and should not do so.
@neilalexander the redivelriviers are in seq ordner. But you can include a guard for it in your processor. Assert(lastSeq + 1 == curSeq). Beside this, a good restart pattern is to make a two phase processing. On startup the processor first queries the latest-seq-nr and set the it state to catchedUp = false and processes the messages until the latest-seq-nr. After catchedUp = true and processing all new incoming messages. The catchedUp can also be emitted as an internal process event or an flag/lock on a recourses.
@Zetanova, no need for distributed transaction. Store the seq no in the aggregate (Indata or header)