akka.net
akka.net copied to clipboard
Akka.Streams: Memory Leak with circular GraphInterpreter reference
Version Information Akka.Streams 1.5.13
Describe the bug
GraphInterpreter <> GraphStageLogic
and/or
GraphInterpreter <> Connections <> GraphStageLogic
and/or
GraphInterpreter <> GraphInterpreterShell
generating a circular reference and the full Graph and its stages are found under gcdump "dead objects". the result will be an OOM
To Reproduce I am getting this issue with an grpc service to akka.remote (streamRef) but it should be already be present with an local system.
The Graph/Stages instance is relatively small, but if used with Source.From(IEnumerable<T>)
this Stage will hold the full IEnumerable<T>
as reference.
If it contains >1MB on data it will lead to an OOM faster.
There are no errors with the following code. But it leaks the full MyRecord entries in memory until OOM.
//inside grpc service
public override async Task<UpdateResponse> Update(UpdateRequest request, ServerCallContext context)
var records = Enumerable.Empty<MyRecord>(); //over 1MB memory
var factory = _services.GetRequiredService<Akka.Actor.IActorRefFactory>();
//defensive instance, it does not free/fix this issue
using var materializer = ActorMaterializer.Create(factory, factory.Materializer().Settings, "update");
try
{
var offer = await _domain.Ask<MyDomainTenantMessages.UpdateStreamOffer>(
new MyDomainTenantCommands.GetUpdateRecordStream(_domain.Tenant),
context.CancellationToken);
var streamTask = Source.From(records)
.CompletionTimeout(context.Deadline - DateTime.UtcNow)
.WatchTermination(Keep.Right)
.To(offer.SinkRef.Sink)
.Run(materializer);
await streamTask.WaitAsync(context.CancellationToken);
return new UpdateResponse
{
Changes = count //todo calc real change count
};
}
catch (Exception ex)
{
_logger.LogError(ex, "update records failed");
throw new RpcException(new Status(StatusCode.Internal, ex.Message));
}
}
Expected behavior All instances of the Enumerable and GraphInterpreter and Stages should be collectable by the GC after the successful or unsuccessful execution of the stream.
Actual behavior The GraphInterpreter Stages and the full used Enumerable are found in "dead objects" with circular references and getting never collected. (sometimes after the OOM and a successful ActorSystem termination).
Environment dotnet 6.0 ubuntu-jammy Docker Desktop and k8n
Additional context
Because the GraphInterpreter
set the GraphStageLogic.Interpreter
property
I tried to unset it on TerminateStage
but this breaks a lot of assumptions/tests
that the GraphStageLogic.Interpreter
is always available even after stage termination.
The one liner can be found here: https://github.com/Zetanova/akka.net/commit/9842d4d7b847cb9aa7fdabe29bb7bf5cbf3aac26
The one liner can be found here:
is that a fix for this @Zetanova ?
@Aaronontheweb
no GraphInterpreter <> Connections <> GraphStageLogic
is very strongly linked
with the combination of the Callbacks in GraphInterpreter
it leaks most likely
over the capture context.
Most likely the solution would be to fix the callback registration and/or create a context instance for the shared types and all reference to it on stream completion.
It makes a big difference not only for performance to use
something.OnComplete(() => _state.Complete())
vs. something.OnComplete(s => s.Complete(), _state)
The first can create a memory leak, the second very unlikely.
Optional would be good to create first a failing memory unload unit test.
@Aaronontheweb is it poissble to add some special integrations tests somewhere to test for memory leaks ?
- Memory leaks could be tested over WeakReferences and "GC.Collect"
- Death lock issues could be tested over a cpu core limit of 1
But both tests would have side-effects on the testing system itself.
I've observed behavior like his before as well. I'll note that in addition to the Single
stage, StatefulSelectMany
and IteratorAdapter
have a tendency to be bad at 'holding on' to things when they possibly shouldn't, although whether that's really an issue when this is fixed remains to be seen.
It makes a big difference not only for performance to use something.OnComplete(() => _state.Complete()) vs. something.OnComplete(s => s.Complete(), _state) The first can create a memory leak, the second very unlikely.
Yep. We probably need to add some APIs to help with this too (GraphDSL and some of the source/flow variants come to mind.)
Its the whole materialized graph including the stage instance construct that are not released.
The special thing about Source.From(records)
is that it holds a reference to the full data list
and if it's large in memory size, it leads fast to an OOM
StatefulSelectMany
FYI, this does this by design - it's meant to hang onto data indefinitely over long periods. We wrote it for .PersistenceIds()
queries in Akka.Persistence.Query.
I've tracked this down to how peculiar ActorMaterializer.Dispose()
is behaving. In essence, it is not like a proper Dispose
but a suggestion for the materializer supervisor actor to shut down by sending it a PoisonPill
. This means that the materializer will not shut down immediately, but will linger until the PoisonPill
message is processed.
Depending on how busy the supervisor actor is, it can take some time until all of the resources being held by all the logic inside the stream to be released, this is especially true for buffered stream stages where it will hold on to enumerator references even after it completed its job.
Specific to the From
stage, we can sort of optimize it by letting the IteratorAdapter
dispose its internal IEnumerator
reference as soon as it is empty.