marten icon indicating copy to clipboard operation
marten copied to clipboard

MultiStreamProjection - as async projection does not execute apply method when a lot of pending events

Open czadokonradnetigate opened this issue 1 year ago • 5 comments

I encountered an issue with MultiStreamProjection

I have projection defined like this

public class MyViewProjection: MultiStreamProjection<MyView, Guid>
{
    public MyViewProjection()
    {
        Identity<ParentCreated>(e => e.Id);
        Identity<ChildCreatedEvent>(e => e.ParentId);
        Identity<ChildSomeEvent1>(e => e.ParentId);
    }

    public MyView Create(ParentCreated e) => new() { Id = e.Id, TenantId = e.TenantId.ToString()};
    
    public void Apply(ChildCreatedEvent _, MyView view) => view.IncSomeCounter();

    public void Apply(ChildSomeEvent1 _, MyView view) => view.IncSomeOtherCounter();

}

View

public sealed record MyView
{
    public Guid Id { get; set; }
    public string TenantId { get; set; } = null!;
    
    public int SomeCounter { get; set; }
    public int SomeOtherCounter { get; set; }

    public void IncSomeCounter()
    {
         SomeCounter++;
    }

    public void IncSomeOtherCounter()
    {
         SomeOtherCounter++;
    }
}

It is defined as an async projection.


For<MyView>()
    .Index(z => z.Id)
    .MultiTenanted();

options.Events.AddEventType<ParentCreated>();
options.Events.AddEventType<ChildCreatedEvent>();
options.Events.AddEventType<ChildSomeEvent1>();

options.Projections.Add(new MyViewProjection(), ProjectionLifecycle.Async);

It works fine for most of times, when events per SaveChangesAsync are about 100-300 it always properly stores counters values. Issues are happening once 1000 or more child entities are created, causing 1000 or more events to be inserted at once. In such cases Apply(ChildCreatedEvent _, MyView view) is not being invoked and counters are not incremented.

In mt_event_progression projection (MyView:All) last_seq_id matches HighWaterMark last_seq_id No errors are thrown and it logs that event batches were successfully updated

{"Timestamp": 12/29/2023 6:36:40 PM, "CategoryName": Marten.Events.Daemon.AsyncProjectionHostedService, "LogLevel": Information,"FormattedMessage": Shard 'MyView:All': Executed updates for Event range of 'Identity: MyView:All', 0 to 1, "Exception":  }
{"Timestamp":"2023-12-29T19:17:07.5460649Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1 to 501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1,"SequenceCeiling":501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5486838Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 501 to 1001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":501,"SequenceCeiling":1001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5506023Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1001 to 1501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1001,"SequenceCeiling":1501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5524935Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1501 to 2001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1501,"SequenceCeiling":2001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5545154Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 2001 to 2002","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":2001,"SequenceCeiling":2002,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}

Marten version: 6.4.1 .NET version: .NET 8

Or maybe I am missing something?

czadokonradnetigate avatar Dec 29 '23 19:12 czadokonradnetigate

I am seeing the exact same behavior. Funny enough I'm also trying to count instances of streams :D

ElanHasson avatar Jan 12 '24 06:01 ElanHasson

I found a fix.

You should remove the void return type and return your model.

ElanHasson avatar Jan 12 '24 07:01 ElanHasson

I found a fix.

You should remove the void return type and return your model.

Thanks. I will try this out :) Weird that it works for Inline projections without any issues and even in docs sample it returns void

czadokonradnetigate avatar Jan 15 '24 08:01 czadokonradnetigate

@czadokonradnetigate ya, can't speak to that, I just know I tried like 10 permutations before I got it

ElanHasson avatar Jan 16 '24 02:01 ElanHasson

Seeing the same kind of behavior with async lifetime event projections. Document creation doesn't happen when pushing a lot of events at a time.

koenmetsu avatar Jul 09 '24 11:07 koenmetsu