akka.net icon indicating copy to clipboard operation
akka.net copied to clipboard

Akka.Persistence: Persist async overload

Open christallire opened this issue 3 years ago • 6 comments

Is your feature request related to a problem? Please describe. It's kinda hard to use async delegate with persist

Describe the solution you'd like add a Func<TEvent, Task> handler overload

Describe alternatives you've considered In order to use async method within Persist(...) you must have to use PipeTo(...) or send a message to self but it cannot be used the way it's designed since PipeTo or a new message not prevent stashed messages from dequeuing. (correct me If I'm wrong).

In order to achieve the way it's designed (the stash thing), you have to either block the thread until the task is finished (might lead to thread pool exhaustion..) or add a custom priority mailbox just to manage the coroutine. it adds another layer of complexity and not very straightforward

Additional context I've recently done a lot of implementation with this and found room for improvement. I would very grateful if there's another workaround for this

christallire avatar Jun 03 '21 04:06 christallire

Can you share some code?

ismaelhamed avatar Jun 03 '21 11:06 ismaelhamed

here you go: https://gist.github.com/christallire/269fc4ba25c73be1df53b983f6924244

christallire avatar Jun 10 '21 15:06 christallire

So, these two scenarios are actually behaving like they should, it is the tests expectations that are wrong.

Blocking output should be:

[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4

whereas the default one should have this other output instead:

[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 4 
[akka://persist-async/user/$b] 3

The explanation for this is that actors are re-entrant: while the TestActor is waiting for the message NextCoroutineCommand it is free to keep processing other messages in its mailbox (LastCommand in this case), hence why "4" will be produced before "3".

In fact, even if you use async/await instead of PipeTo (anti-pattern):

Persist("test", async _ =>
{
    Context.Sender.Tell("2");

    var sender = Context.Sender; // prevents no active ActorContext exception
    await Task.Delay(5000).ConfigureAwait(false);
    sender.Tell("3");
});

... you still get the same result:

[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 4
[akka://persist-async/user/$b] 3

The correct way to solve this in Akka is by using stashing and behaviors. Modify your OnTestCommand method to look like this:

private Task OnTestCommand(TestCommand arg)
{
    Context.Sender.Tell("1");

    Persist("test",  _ =>
    {
        Context.Sender.Tell("2");
        Task.Delay(5000).PipeTo(Context.Self, Context.Sender, // preserve sender
            () => new NextCoroutineCommand());
    });

    Context.BecomeStacked(msg =>
    {
        switch (msg)
        {
            case NextCoroutineCommand _:
                NextCoroutine();
                Stash.UnstashAll();
                Context.UnbecomeStacked();
                break;
            default:
                Stash.Stash();
                break;
        }
    });
    
    return Task.CompletedTask;
}

Now the output is what it's expected, and without blocking:

[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4

ismaelhamed avatar Jun 12 '21 08:06 ismaelhamed

@ismaelhamed yes, your code is what I meant. we need to stash everything to achieve what I want and why don't we just simply support async delegate in persist(...) instead of stash everything?

Persist("test", async (s) =>
{
	Context.Sender.Tell("2");

	await Task.Delay(5000);
	NextCoroutine();
});

Don't you think this is much simpler and intuitive compared to your code?

christallire avatar Jun 13 '21 02:06 christallire

But there's more to it than just having an async callback.

IMHO, the ~~whole ReceiveActor~~ ReceiveAsync feels like ~~is~~ an anti-pattern: a concession to newcomers trying to bend Akka instead of embracing it, because they're having a hard time wrapping their heads around the actor model in the first place.

And like any good anti-pattern it will leak everywhere, in this case an asynchronous persist callback would eventually leak into the UntypedActors too.

Anyway, if you insist in going down that road, the only real magic happening inside the *Async methods in the ReceiveActor is that the handlers are wrapped using the ActorTaskScheduler.RunTask , which will suspend the actor's mailbox until the task returns. And since that API is public you can use it yourself:

private Task OnTestCommand(TestCommand arg)
{
    Context.Sender.Tell("1");

    Persist("test", _ =>
    {
        RunTask(async () =>
        {
            Context.Sender.Tell("2");

            var sender = Context.Sender; // prevents no active ActorContext exception
            await Task.Delay(5000).ConfigureAwait(false);    
            sender.Tell("3");
        });
    });

    // NOTICE: anything that you put past this line will be executed right away,
    // because the Persist method returns immediately

    return Task.CompletedTask;
}

This will give you the output you expected:

[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4

At this point your CommandAsync handler actually looks like this:

RunTask(() => // this first RunTask is free, thanks to CommandAsync/ReceiveAsync
{
    Context.Sender.Tell("1");

    Persist("test", _ =>
    {
        RunTask(async () =>
        {
            Context.Sender.Tell("2");

            var sender = Context.Sender; // prevents no active ActorContext exception
            await Task.Delay(5000).ConfigureAwait(false);    
            sender.Tell("3");
        });
    });

    // NOTICE: anything that you put past this will be executed right away,
    // because the Persist method returns immediately

    return Task.CompletedTask;
}

But now you have some nested RunTasks. Actually, the one way I see to fix this would be to also have a Persist method that returned a Task so that it could be awaited from the CommandAsync itself. Talking about leaking...

cc @Aaronontheweb

ismaelhamed avatar Jun 13 '21 08:06 ismaelhamed

Do you mean ReceiveActor an anti-pattern compared to akka-typed? Anyways, thanks for your workaround.

I am also suggesting to add async callback overload, https://github.com/akkadotnet/akka.net/issues/2197 this approach seems to be better.

solution for all awaitable persist, callback and leaking.

christallire avatar Jun 14 '21 02:06 christallire