cats-effect
cats-effect copied to clipboard
CE3 IOApp finishes early before all finalizers
Could be shown with a simple test case (several run may be needed):
import cats.effect.std.Console
import cats.effect.{ IO, IOApp }
import cats.syntax.all._
import scala.concurrent.duration._
object GuaranteeBroken extends IOApp.Simple {
val workers: Vector[IO[Nothing]] = Vector.tabulate(10) { i =>
IO.never.guarantee(Console[IO].println(s"Worker #$i is done"))
}
val run: IO[Unit] = IO.race(IO.sleep(3.seconds), workers.parSequence_).void
}
Usually, especially when run from IDE or under load only around 7 finalizers are run
(or maybe they all run, but stdout
cache not flushed?).
Similar code but in form of Ammonite script always runs all finalizers:
import $ivy.`org.typelevel::cats-effect:3.0.0-M5`
import cats.effect._
import cats.effect.std.Console
import cats.effect.unsafe.implicits.global
import cats.syntax.all._
import scala.concurrent.duration._
val workers = Vector.tabulate(10)(i => IO.never.guarantee(Console[IO].println(s"Worker #$i is done")))
IO.race(IO.sleep(3.seconds), workers.parSequence_).void.unsafeRunSync()
This is very very interesting! I agree this seems like IOApp
is finishing too aggressively, but at the same time I don't exactly see how that's possible, since the race
itself should not have fired its continuation prior to the guarantee
s being run. Something's fishy here and it needs investigating. Thank you for the report!
I am looking into this now, and I modified the code slightly:
import cats.effect.std.Console
import cats.effect.{ Deferred, IO, IOApp }
import cats.syntax.all._
import scala.concurrent.duration._
object GuaranteeBroken extends IOApp.Simple {
def workers(latch: Deferred[IO, Unit]): Vector[IO[Nothing]] = Vector.tabulate(10) { i =>
IO.never.guarantee(Console[IO].println(s"Worker #$i is done") *> IO.blocking(System.out.flush()) *> latch.get)
}
val effect = IO.uncancelable { poll =>
for {
latch <- Deferred[IO, Unit]
_ <- (IO.sleep(15.seconds) *> latch.complete(())).start
_ <- poll(workers(latch).parSequence_)
} yield ()
}
val run: IO[Unit] = IO.race(IO.sleep(3.seconds), effect).void
}
This seems to be working as expected and I think it's just a race condition with flushing the system out...
Hmm. That's a lot less worrisome then. Flush race conditions seem totally reasonable. The blocking
thing though makes me think a bit about the semantics of thread creation during VM shutdown. Like, if there are no threads available on that pool and we need to allocate a new one, does that work?
I don't know if you're going to like what I'm proposing but I honestly don't see another way, so here goes. I think we can set the "core" pool size of the cached thread pool to be 1. That does mean that there will always be at least 1 blocking thread idling somewhere.
I don't know if you're going to like what I'm proposing but I honestly don't see another way, so here goes. I think we can set the "core" pool size of the cached thread pool to be 1. That does mean that there will always be at least 1 blocking thread idling somewhere.
This is kind of horrifying :-) Not the worst thing I suppose though
I think what I was getting at is that I don't honestly know what the JVM does with thread spawning semantics under those circumstances. Probably merits some testing.
Is it just me, or I feel like we are missing Console#flushOut
and Console#flushErr
, wouldn't that make it easy to solve this?
Even better: just flush
after every call. Good enough for Console
, and fs2.io.{stdout,stderr}
can be used for more fine-grained control over buffering.