propulsion icon indicating copy to clipboard operation
propulsion copied to clipboard

Cosmos: Guidance, examples and/or provide pit of success regarding reading your writes in handlers

Open bartelink opened this issue 3 years ago • 1 comments

A CFP consumer based on dotnet new proReactor currently has the following key ingredients:

  • the Propulsion Cosmos Source uses the V2 CFP lib to read from the ChangeFeed (internally using a V2 DocumentClient). Each set of changes received is a set of documents added/updated since the last poll (based on it holding the continuation token and supplying it per poll)
  • these are parsed by the ingester (which in this case is in CFP parlance a Change Feed Observer) into event-messages bearing an Index and passed to a Scheduler
  • the scheduler passes a contiguous span of events for a given stream to a handler function. Because handlers get re-invoked if there are transient exceptions (and reading is async and independent of the processing), its frequently the case that a handler will be supplied a sequence of buffered events (even outside of catchup scenarios)
  • the handler then gets to decide what to do and convey what events in the span have been processed back to the scheduler. The result is emits drives deduplication (ingester dropping events that have already been confirmed as processed so they don't even need to be passed to the scheduler) and/or marking progress (checkpointing) on the source via a SpanResult

Reading your writes

A frequent need/desire in the context of a handler that processes events by in some form going back to the source to e.g. read the overall state is to be able to read your writes.

In a typical subscription scenario using ESDB, some ways to guarantee reading will include the event that prompted the handler invocation are:

  • subscribe from either Leader or Follower node
  • do all reads from the Leader node so it will always be as new or newer than the observed event

While there are equivalent strong consistency models in CosmosDB that could theoretically be utilized to afford such a topology, those come with severe penalties in the context of a georedundant system and hence in practice are not used. Equinox in particular is designed to function with only the guarantees afforded by Session Consistency mode. The rest of this discussion will not consider any other models.

More details/background/related issues:

  • Equinox: Umbrella issue re session token support https://github.com/jet/equinox/issues/192
  • Cosmos V3 lib: Question-issue with lots of good examples and/or authoritative answers on how consistency model works https://github.com/Azure/azure-cosmos-dotnet-v3/issues/1945

Approaches when using Equinox.Cosmos with Propulsion.Cosmos

Session consistency in Cosmos is based on a session token - the session token can be passed from context to context and is used to wait for writes to percolate so you can read your writes [that you performed elsewhere in your notional session]. In the normal case, the CosmosClient tracks the session token coming back from responses (esp writes, but also reads) and makes sure to take the newest every time so subsequent requests will observe consistent state.

The issue with an out-of-the-box dotnet new proReactor is that the CosmosClient used to receive the query response from the ChangeFeed is not the same one that's being used in the context of the handler to then re-read the state from the origin logical partition. This is how you end up in a situation of not seeing the write the CFP clearly showed to have occurred.

Using the normal building blocks/idioms that the Cosmos SDK and CosmosDB offers to operate with the Session Consistency mode, the valid solutions are one of:

  1. obtain the session token from the CFP read and propagate it to the handler so it can apply it on its CosmosClient (not sure if CFP libs even expose it; for V3 I'm sure they'll take a PR if needed, after the blocker preventing our moving to V3 is resolved)
  2. share the session via sharing the same DocumentClient for both the read and the CFP read (see the cited Equinox issue and PRs)

Workaround: having the handler logic recognize the prompting event is not reflected in what it reads and trigger a retry

One approach that one might take is to say "Look, lets solve this without infrastructural support". The Simplest Thing That Could Possibly Work is to:

  1. re-read the aggregate
  2. if it does not see the anticipated state, throw an exception so that the handler does not mark the prompting event as processed (triggering a retry, which will eventually observe the required synced state)

The problem with such an approach is that you can end up in all sorts of scenarios where you have to do one of:

  1. ensure there is only one base state for a given handler and reject loaded/folded state that's clearly incomplete relative to that
  2. (if you don't have exactly one possible valid trigger state that you look for), risk seeing an event thats part of state X+1 but loading state X _and then saying "event X+1 is handled"

Specifically wrt problem 2, the issue is that required reactions can potentially end up being skipped. While you might say "OK well then don't do that", but IMO its often something that a dev is not going to realise while writing it

Recommended Solutions

There are 2 clean and correct ways to resolve those forces in my book:

  1. use the Consistency Model: use Session Tokens to provide a read your writes guarantee (or, in this instance, "the state of the stream, guaranteed to include the prompting event that's demonstrably been read")

  2. use Propulsion's SpanResult.OverrideWritePosition: Have the handler grab the Index of the stream as it reads and return StreamResult.OverrideWritePosition (res.Index+1)

    In other words, if event 5 triggers an action and the processing sees only up to 4, the handler says "I know the next work needs to start from Index 5", as opposed to "all done" (SpanResult.AllProcessed), which means "5 is done, don't call me until you have event 6".

    This also covers the case where handling event 5 sees [and handles] event 6 as part of the same work, yields OverrideWritePosition 7, which causes event 6 to be dropped either a) immediately if the CFP reader has read it while the handler was running (frequently what happens) or b) when the batch containing that event arrives from the CFP feed

    In some cases, this technique can provide huge benefits when dealing with catchup scenarios, i.e. the minute event 0 is observed, the handler says :"done, we are at 2000 now", and the next 1999 events get dropped without even invoking the handler.

    I'd venture that this is the most generally applicable technique in Propulsion in general; this strategy is employed by all handlers in https://github.com/jet/dotnet-templates

bartelink avatar Feb 18 '21 11:02 bartelink

And be careful when combining AllowStale with OverrideWritePosition. - that's a recipe for a stuck handler...

bartelink avatar May 13 '21 11:05 bartelink