marten
marten copied to clipboard
MultiStreamProjection - as async projection does not execute apply method when a lot of pending events
I encountered an issue with MultiStreamProjection
I have projection defined like this
public class MyViewProjection: MultiStreamProjection<MyView, Guid>
{
public MyViewProjection()
{
Identity<ParentCreated>(e => e.Id);
Identity<ChildCreatedEvent>(e => e.ParentId);
Identity<ChildSomeEvent1>(e => e.ParentId);
}
public MyView Create(ParentCreated e) => new() { Id = e.Id, TenantId = e.TenantId.ToString()};
public void Apply(ChildCreatedEvent _, MyView view) => view.IncSomeCounter();
public void Apply(ChildSomeEvent1 _, MyView view) => view.IncSomeOtherCounter();
}
View
public sealed record MyView
{
public Guid Id { get; set; }
public string TenantId { get; set; } = null!;
public int SomeCounter { get; set; }
public int SomeOtherCounter { get; set; }
public void IncSomeCounter()
{
SomeCounter++;
}
public void IncSomeOtherCounter()
{
SomeOtherCounter++;
}
}
It is defined as an async
projection.
For<MyView>()
.Index(z => z.Id)
.MultiTenanted();
options.Events.AddEventType<ParentCreated>();
options.Events.AddEventType<ChildCreatedEvent>();
options.Events.AddEventType<ChildSomeEvent1>();
options.Projections.Add(new MyViewProjection(), ProjectionLifecycle.Async);
It works fine for most of times, when events per SaveChangesAsync
are about 100-300 it always properly stores counters values.
Issues are happening once 1000
or more child entities are created, causing 1000
or more events to be inserted at once.
In such cases Apply(ChildCreatedEvent _, MyView view)
is not being invoked and counters are not incremented.
In mt_event_progression
projection (MyView:All
) last_seq_id
matches HighWaterMark
last_seq_id
No errors are thrown and it logs that event batches were successfully updated
{"Timestamp": 12/29/2023 6:36:40 PM, "CategoryName": Marten.Events.Daemon.AsyncProjectionHostedService, "LogLevel": Information,"FormattedMessage": Shard 'MyView:All': Executed updates for Event range of 'Identity: MyView:All', 0 to 1, "Exception": }
{"Timestamp":"2023-12-29T19:17:07.5460649Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1 to 501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1,"SequenceCeiling":501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5486838Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 501 to 1001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":501,"SequenceCeiling":1001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5506023Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1001 to 1501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1001,"SequenceCeiling":1501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5524935Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1501 to 2001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1501,"SequenceCeiling":2001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5545154Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 2001 to 2002","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":2001,"SequenceCeiling":2002,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
Marten version: 6.4.1
.NET version: .NET 8
Or maybe I am missing something?
I am seeing the exact same behavior. Funny enough I'm also trying to count instances of streams :D
I found a fix.
You should remove the void
return type and return your model.
I found a fix.
You should remove the
void
return type and return your model.
Thanks. I will try this out :)
Weird that it works for Inline
projections without any issues and even in docs sample it returns void
@czadokonradnetigate ya, can't speak to that, I just know I tried like 10 permutations before I got it
Seeing the same kind of behavior with async lifetime event projections. Document creation doesn't happen when pushing a lot of events at a time.