scala-async icon indicating copy to clipboard operation
scala-async copied to clipboard

Is async macro leaking?

Open hurlebouc opened this issue 4 years ago • 11 comments

Hi,

when you run code

object Main {

implicit val syncExecutor : ExecutionContext = new ExecutionContext {
    override def execute(runnable: Runnable): Unit = runnable.run()

    override def reportFailure(cause: Throwable): Unit = throw cause
  }

  def stop : Unit = {
    Thread.sleep(60000)
    println("stop")
  }

  class Witness

  def main(args: Array[String]): Unit = {
    async{
      await(Future{new Witness})
      stop
    }
  }
}

and then, during the sleep, you take a heap dump with VisualVM, you see that there is still an object of type Witness in the heap. Is this normal?

hurlebouc avatar May 06 '21 11:05 hurlebouc

@hurlebouc While I realize that this is just a reproducer, that syncExecutor implementation is completely broken. I think you can just replace all of it with ExecutionContext.parasitic and make the reproducer shorter in the process.

viktorklang avatar May 06 '21 11:05 viktorklang

The syncExecutor runs futures in the current thread. It's by design.

hurlebouc avatar May 06 '21 12:05 hurlebouc

@hurlebouc parasitic is a correct implementation of a current thread ExecutionContext. The syncExecutor above has several problems (execute will lead to stack issues and reportError is not supposed to throw exceptions.)

viktorklang avatar May 06 '21 12:05 viktorklang

Ok. @viktorklang : do you think this is related with the leaking problem?

hurlebouc avatar May 06 '21 14:05 hurlebouc

@hurlebouc No idea, but it's a bit of a two-for-one to reduce the size of the reproducer AND help people avoid problems with their sync ExecutionContexts. :)

viktorklang avatar May 06 '21 15:05 viktorklang

@viktorklang Thank you for the advice.

But event replacing syncExecutor by parasitic leads to the same problem. My hypothesis is that line await(Future{new Witness}) is converted to line val tmp = await(Future{new Witness}) by ANF transformation, which results in memory leaks because tmp is not freed during execution of stop.

hurlebouc avatar May 06 '21 19:05 hurlebouc

This is the translation:

         final class stateMachine$async extends scala.async.FutureStateMachine {
            def <init>(): stateMachine$async = {
              stateMachine$async.super.<init>(ec);
              ()
            };
            override def apply(tr$async: scala.util.Try): Unit = while$(){
              try {
                stateMachine$async.this.state() match {
                  case 0 => {
                    val awaitable$async: scala.concurrent.Future = scala.concurrent.Future.apply({
                      final <artifact> def $anonfun$apply(): scala.async.Leak.Witness = new scala.async.Leak.Witness();
                      (() => $anonfun$apply())
                    }, ec);
                    tr$async = stateMachine$async.this.getCompleted(awaitable$async);
                    stateMachine$async.this.state_=(1);
                    if (null.!=(tr$async))
                      while$()
                    else
                      {
                        stateMachine$async.this.onComplete(awaitable$async);
                        return ()
                      }
                  }
                  case 1 => {
                    <synthetic> val await$1: Object = {
                      val tryGetResult$async: Object = stateMachine$async.this.tryGet(tr$async);
                      if (stateMachine$async.this.==(tryGetResult$async))
                        return ()
                      else
                        tryGetResult$async.$asInstanceOf[Object]()
                    };
                    await$1;
                    Leak.this.stop();
                    stateMachine$async.this.completeSuccess(scala.runtime.BoxedUnit.UNIT);
                    return ()
                  }
                  case _ => throw new IllegalStateException(java.lang.String.valueOf(stateMachine$async.this.state()))
                }
              } catch {
                case (throwable$async @ (_: Throwable)) => {
                  stateMachine$async.this.completeFailure(throwable$async);
                  return ()
                }
              };
              while$()
            };
            override <bridge> <artifact> def apply(v1: Object): Object = {
              stateMachine$async.this.apply(v1.$asInstanceOf[scala.util.Try]());
              scala.runtime.BoxedUnit.UNIT
            }
          };

Your analysis is correct: the ANF transform introduces <synthetic> val await$1: Object = { to hold the result of the future, even though you don't use this subsequently.

The workaround would be to manually discard the result of the future await(Future{new Witness; ()}). It would be preferable to the ANF transform smarter to avoid the problem but I don't see a straighforward way to implement it yet.

retronym avatar Jul 12 '21 04:07 retronym

You would have the same issue using map or flatMap and an executor that doesn't bounce the stack frame:

object Main {
  def stop: Unit = {
    Thread.sleep(60000)
    println("stop")
  }

  class Witness

  def main(args: Array[String]): Unit = {
    Future(new Witness).map(_ => stop)
    ()
  }
}

You'll see the same issue above. The problem isn't really the async macro. The problem is the fact that Future memoizes its results. As long as a Future is on the heap, its results are (once evaluated).

You're basically asking Future to behave a bit like IO. For example, if we port your example to Cats Effect:

import cats.effect._
import cats.effect.cps._

import scala.concurrent.duration._

object Main extends IOApp.Simple {

  val stop: IO[Unit] =
    IO.sleep(1.minute) >> IO.println("stop")

  class Witness

  val run = 
    async[IO] {
      IO(new Witness).await
      stop.await
    }
}

If you run the above and take a heap dump during stop, you'll find no Witness anywhere. And just to prove nothing strange is going on, we can still run the Witness creation through Future and observe the same effect:

import cats.effect._
import cats.effect.cps._

import scala.concurrent.duration._

object Main extends IOApp.Simple {

  val stop: IO[Unit] =
    IO.sleep(1.minute) >> IO.println("stop")

  class Witness

  val run = 
    async[IO] {
      IO.fromFuture(IO(Future(new Witness))).await
      stop.await
    }
}

Same idea. Since Future retains its results, anything that retains the future also retains the results. IO doesn't retain its results, so it doesn't exhibit the same issue. The async macro itself has no real impact one way or another aside from obscuring the val which is holding the Future (which would otherwise be more apparent in a direct construction).

djspiewak avatar Jul 22 '21 01:07 djspiewak

The PR turns the capturing val into a var, and if we see the await$1 all by its lonesome in "statement position", we know it wasn't "consumed" by an enclosing expression or definition (which possibly would have to be a val or a method arg); at that point, null out the var.

At stage or round N of the state machine, we receive the result of the F at N-1, so presumably there is no live reference to the F.

F itself should be off the hook.

som-snytt avatar May 21 '24 00:05 som-snytt