EventStore-Client-Dotnet icon indicating copy to clipboard operation
EventStore-Client-Dotnet copied to clipboard

Draft: FoldStreamAsync

Open thinkbeforecoding opened this issue 4 years ago • 10 comments

This is a proposition for a FoldStreamAsync method on the .net client. The goal is to make it easy to both fold the stream and get the last event number. If you don't need the last event number, it not too compicated using Linq.Async:

client.ReadAsync(...).SelectMany(e => deserialize(e).ToAsyncEnumerable()).AggregateAsync(aggregator, seed);

The reason for the SelectMany is that, this way, the serializer can return an empty list to ignore an event, or a list with multiple value when a stored event would be better splitted in several events. In most cases the list contains a single element. It is to be noted that a check on the client.ReadAsync result is necessary to avoid an exception on NotFound stream.

When appending an event to the stream, an expected version is required, and it should be the version of the last event. In the code above, it means adding the version number along the results of the deserializer, the in the aggregator, maintaining the last version. it also has to be fed with the seed (this is not necessarily totally valid C# syntax):

   client.ReadAsync(..version..)
       .SelectMany(e => deserialize(e).Select(v => (v,e.Event.EventNumber)).ToAsyncEnumerable())
       .AggregateAsync(((_,state), (v,e)) => (v,aggregator(state,e)), (version, seed))

This is doable but tedious and introduce several intermediate enumerable and tuples that will bloat the GC.

This PR propose an implementation of FoldAsync including this recurring pattern in a more integrated way.

 client.FoldAsync(deserialize, aggregator, streamname, revision, seed);
  • The deserializer is a ResolvedEvent -> IEnumerable<E>.
  • The aggregator a (T,E) -> T
  • The seed a T. The result is a structure that contains the final value of the aggregator and the revision of the last event

If the stream is empty of not found, it returns the seen with revision None or the input revision.

thinkbeforecoding avatar Nov 28 '20 09:11 thinkbeforecoding

There is of course more work to do on this PR (tests, documentation, I'm not even sure it's correctly using the GRPC Api), but the idea is to start a discussion. Especially about the ordering of parameters (there are good reasons for ordering them differently). About the deserializer (returning an IEnumerable or not), perf etc.

(and of course take time to answer this on your work time. Sorry for posting this during the weekend)

thinkbeforecoding avatar Nov 28 '20 11:11 thinkbeforecoding

(seems the builds failled for another reason..)

thinkbeforecoding avatar Nov 30 '20 09:11 thinkbeforecoding

About the argument order, for now I choose the following:

client.FoldAsync(deserialize, aggregator, streamname, revision, seed);

the reason is that deserialize and aggregator are the most stable arguments. after that, the streamname is usually changing but for a single stream we have many revisions. The revision and seed go together, as in the FoldResult. this is a revision an a state.

But to look more like the Linq Aggregate method, we could group (aggregator, seed), and (streamname,revision) .. one part being the fold part, the other the collection part..

thinkbeforecoding avatar Dec 01 '20 09:12 thinkbeforecoding

Last thing I need to be validated:

The function returns the Revision of the last Event read in the stream, or None if the stream is not found.

This is what is expected by AppendToStreamAsync expectedRevision, right ?

thinkbeforecoding avatar Dec 01 '20 09:12 thinkbeforecoding

Something else... Let's say, I used Fold to compute current state, and get the last Event revision in the result (FoldResult<T>). From there, after some time I want to make sure that my state is uptodate but without reloading everything. I can just fold from result.Value. But if I pass result.Revision as a parameter to Fold, it will fold the last event of the previous call again...

Would you recomment a NextRevision property on FoldResult to easily pass it to fold again ? (it would return Start when result.Revision is None)

thinkbeforecoding avatar Dec 01 '20 09:12 thinkbeforecoding

Now with better documentation, and unit tests.

thinkbeforecoding avatar Dec 11 '20 11:12 thinkbeforecoding

And this time, everything is green 🎉

thinkbeforecoding avatar Dec 11 '20 11:12 thinkbeforecoding

To be consistent, the FoldResult.Revision would ideally always be equal to the last event StreamPosition. This works as expected for empty stream and when there are returned events but is not really possible yet when the input revision is at the end of the stream and no event gets returned. Current implementation does a -1 on the input revision, but it's definitely a hack. Could I have access to the last Event revision in this case ? For instance, when the stream doesn't exist, a single element indicating it's not found is returned. Could it be possible in this case that a similar element would be returned indicating the last revision in the stream ?

thinkbeforecoding avatar Dec 17 '20 12:12 thinkbeforecoding

This is mainly due to the way expectedVersion is understood by AppendToStream. in AppendToStream, the expectedVersion is the position of the current last event in the stream. The return value indicated this new position. But this position cannot be used in a snapshot, because reading from this position returns the last event that has already been taken into account. So in the snapshot, the version should be the position of the last event + 1. This way, when reading a snapshot that is uptodate, the read/fold should return no events. The problem then is that the expected version to use is.... the version in the snapshot - 1. we could avoid this complexity if expectedVersion was not the position of the last event, but the possition where we want to append: for an empty stream it would be 0. for a stream with a single event a pos 0, it would be 1. and the function AppendToStream would return the number where we should append next. Of course, AppendToStream would reject writes to streams where this version is not exactly the next one.

thinkbeforecoding avatar Dec 17 '20 12:12 thinkbeforecoding

After discussing with the team it looks like we'll need to make a proto change to make this work.

thefringeninja avatar Jan 08 '21 12:01 thefringeninja