akka.net
akka.net copied to clipboard
Akka.Persistence: Persist async overload
Is your feature request related to a problem? Please describe. It's kinda hard to use async delegate with persist
Describe the solution you'd like add a Func<TEvent, Task> handler overload
Describe alternatives you've considered
In order to use async method within Persist(...) you must have to use PipeTo(...) or send a message to self but it cannot be used the way it's designed since PipeTo
or a new message not prevent stashed messages from dequeuing. (correct me If I'm wrong).
In order to achieve the way it's designed (the stash thing), you have to either block the thread until the task is finished (might lead to thread pool exhaustion..) or add a custom priority mailbox just to manage the coroutine. it adds another layer of complexity and not very straightforward
Additional context I've recently done a lot of implementation with this and found room for improvement. I would very grateful if there's another workaround for this
Can you share some code?
here you go: https://gist.github.com/christallire/269fc4ba25c73be1df53b983f6924244
So, these two scenarios are actually behaving like they should, it is the tests expectations that are wrong.
Blocking output should be:
[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4
whereas the default one should have this other output instead:
[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 4
[akka://persist-async/user/$b] 3
The explanation for this is that actors are re-entrant: while the TestActor
is waiting for the message NextCoroutineCommand
it is free to keep processing other messages in its mailbox (LastCommand
in this case), hence why "4" will be produced before "3".
In fact, even if you use async/await
instead of PipeTo
(anti-pattern):
Persist("test", async _ =>
{
Context.Sender.Tell("2");
var sender = Context.Sender; // prevents no active ActorContext exception
await Task.Delay(5000).ConfigureAwait(false);
sender.Tell("3");
});
... you still get the same result:
[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 4
[akka://persist-async/user/$b] 3
The correct way to solve this in Akka is by using stashing and behaviors. Modify your OnTestCommand
method to look like this:
private Task OnTestCommand(TestCommand arg)
{
Context.Sender.Tell("1");
Persist("test", _ =>
{
Context.Sender.Tell("2");
Task.Delay(5000).PipeTo(Context.Self, Context.Sender, // preserve sender
() => new NextCoroutineCommand());
});
Context.BecomeStacked(msg =>
{
switch (msg)
{
case NextCoroutineCommand _:
NextCoroutine();
Stash.UnstashAll();
Context.UnbecomeStacked();
break;
default:
Stash.Stash();
break;
}
});
return Task.CompletedTask;
}
Now the output is what it's expected, and without blocking:
[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4
@ismaelhamed yes, your code is what I meant. we need to stash everything to achieve what I want and why don't we just simply support async delegate in persist(...) instead of stash everything?
Persist("test", async (s) =>
{
Context.Sender.Tell("2");
await Task.Delay(5000);
NextCoroutine();
});
Don't you think this is much simpler and intuitive compared to your code?
But there's more to it than just having an async callback.
IMHO, the ~~whole ReceiveActor
~~ ReceiveAsync
feels like ~~is~~ an anti-pattern: a concession to newcomers trying to bend Akka instead of embracing it, because they're having a hard time wrapping their heads around the actor model in the first place.
And like any good anti-pattern it will leak everywhere, in this case an asynchronous persist callback would eventually leak into the UntypedActors
too.
Anyway, if you insist in going down that road, the only real magic happening inside the *Async
methods in the ReceiveActor
is that the handlers are wrapped using the ActorTaskScheduler.RunTask
, which will suspend the actor's mailbox until the task returns. And since that API is public you can use it yourself:
private Task OnTestCommand(TestCommand arg)
{
Context.Sender.Tell("1");
Persist("test", _ =>
{
RunTask(async () =>
{
Context.Sender.Tell("2");
var sender = Context.Sender; // prevents no active ActorContext exception
await Task.Delay(5000).ConfigureAwait(false);
sender.Tell("3");
});
});
// NOTICE: anything that you put past this line will be executed right away,
// because the Persist method returns immediately
return Task.CompletedTask;
}
This will give you the output you expected:
[akka://persist-async/user/$b] 1
[akka://persist-async/user/$b] 2
[akka://persist-async/user/$b] 3
[akka://persist-async/user/$b] 4
At this point your CommandAsync
handler actually looks like this:
RunTask(() => // this first RunTask is free, thanks to CommandAsync/ReceiveAsync
{
Context.Sender.Tell("1");
Persist("test", _ =>
{
RunTask(async () =>
{
Context.Sender.Tell("2");
var sender = Context.Sender; // prevents no active ActorContext exception
await Task.Delay(5000).ConfigureAwait(false);
sender.Tell("3");
});
});
// NOTICE: anything that you put past this will be executed right away,
// because the Persist method returns immediately
return Task.CompletedTask;
}
But now you have some nested RunTask
s. Actually, the one way I see to fix this would be to also have a Persist
method that returned a Task
so that it could be awaited from the CommandAsync
itself. Talking about leaking...
cc @Aaronontheweb
Do you mean ReceiveActor
an anti-pattern compared to akka-typed?
Anyways, thanks for your workaround.
I am also suggesting to add async callback overload, https://github.com/akkadotnet/akka.net/issues/2197 this approach seems to be better.
solution for all awaitable persist, callback and leaking.