Reduce memory usage when large number of GraphStageLogic involves
@queimadus's report in https://github.com/apache/pekko/discussions/1566 refs: https://github.com/apache/pekko/pull/1623
and with the code below
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import scala.concurrent.Await
object PekkoQuickstart extends App {
private implicit val system: ActorSystem = ActorSystem()
val s = Source
.repeat(())
.map(_ => ByteString('a' * 400000))
.take(1000000)
.prefixAndTail(50000)
.flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }
val r = Source.empty
.concatAllLazy(List.tabulate(30000)(_ => s): _*)
.runWith(Sink.ignore)
Await.result(r, scala.concurrent.duration.Duration.Inf)
println(r.value)
// Source
// .repeat(s)
// .take(30000)
// .flatMapConcat(x => x)
// .runWith(Sink.ignore)
// .onComplete(println(_))
// Source.empty
// .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
// .runWith(Sink.ignore).onComplete(println(_))
}
we can get a heap dump of
To fix the problem, I think we need to clean the logic once we are done with a sub-graph, but the current code needs to get a snapshot for the materializer
To fix the problem I think we need to recycle the graph stage logics, which means more changes need to be done in the interpreter.
We can see, that there are 30000 logics, which are brought in by the concatAll, to reduce memory, we may need to recycle the logics once it's been done (then will not show up in the snapshot maybe?)
Another problem is, to support the debug, we may let the GraphStageLogic extends GraphStageLogicInfo, etc thing after the logic is done, switch the GraphStageLogic with GraphStageLogicInfo implementation