`OutOfMemoryError` not propagated when `IO` originates from `CompletableFuture`
In our application, an OutOfMemoryError raised inside a fiber created from a CompletableFuture does not crash the JVM process. Instead, the error is caught and returned as a failed fiber outcome.
This differs from the behavior when the same error is thrown directly from an IO, where the error bubbles up and terminates the process as expected.
Example:
import cats.effect._
import cats.implicits._
import java.util.concurrent.{CompletableFuture, Executor, Executors}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
Resource.fromAutoCloseable(IO(Executors.newFixedThreadPool(1))).use { executor =>
for {
pingFiber <- pingIO.start
// _ <- boomFromIO.start // -> crashes the app (expected)
_ <- boomFromCompletableFuture(ExecutionContext.fromExecutor(executor)).start // -> does NOT crash the app
_ <- pingFiber.join
} yield ExitCode.Success
}
private val pingIO =
(IO.println("ping") *> IO.sleep(1.seconds)).foreverM
private def boomFromIO: IO[Unit] = IO {
println("Waiting 2 seconds before boom...")
Thread.sleep(2000)
println("Gonna boom!")
throw new OutOfMemoryError("Boom!")
}
private def boomFromCompletableFuture(executor: Executor): IO[Unit] =
IO.fromCompletableFuture(IO(CompletableFuture.runAsync(() => {
println("Waiting 2 seconds before boom...")
Thread.sleep(2000)
println("Gonna boom!")
throw new OutOfMemoryError("Boom!")
}, executor))).void
}
Analysis:
The difference seems to come from IO.fromCompletableFuture, which relies on CompletableFuture.handle. Since handle catches all Throwable, the OutOfMemoryError ends up wrapped in the failed outcome instead of escaping and crashing the process.
Question:
Is this the intended behavior? If not, should fromCompletableFuture avoid intercepting fatal errors like OutOfMemoryError to align with how IO behaves?
Notes:
I experimented by modifying the implementation to re-surface fatal errors in onError, and in that case the OutOfMemoryError bubbled up as expected:
def fromCompletableFuture[F[_], A](fut: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] = F.cont {
new Cont[F, A, A] {
def apply[G[_]](implicit G: MonadCancelThrow[G]): (Either[Throwable, A] => Unit, G[A], F ~> G) => G[A] = {
(resume, get, lift) =>
G.uncancelable { poll =>
G.flatMap(poll(lift(fut))) { cf =>
val go = F.delay {
cf.handle[Unit] {
case (a, null) => resume(Right(a))
case (_, NonFatal(t)) =>
resume(Left(t match {
case e: CompletionException if e.getCause ne null => e.getCause
case _ => t
}))
}
}
val await = G.onCancel(
poll(get.onError(_ => G.unit)), // re-surface the OutOfMemoryError to main IO
// if cannot cancel, fallback to get
G.ifM(lift(F.delay(cf.cancel(true))))(G.unit, G.void(get))
)
G.productR(lift(go))(await)
}
}
}
}
}
I don't think that modified fromCompletableFuture is necessary. This seems to work fine: IO.fromCompletableFuture(...).onError(_ => IO.unit).
The fact that an .onError(_ => IO.unit) changes behavior seems like a bug though.
It is also not fromCompletableFuture specific, it is rather async specific:
IO.async_ { cb => cb(Left(new OutOfMemoryError("Boom!"))) }
This has the same behavior (and it is "fixed" by .onError(_ => IO.unit) similarly).
@durban Thanks for your response! Do you have any insights into the mechanisms involved in onError that might cause the error to propagate?
From what I looked at it look like both Cont->resume->Left and onError implementations end their course with a raiseError, so I do not see how it end behave differently.
For Cont: https://github.com/typelevel/cats-effect/blob/series/3.x/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala#L327
For onError: https://github.com/typelevel/cats-effect/blob/fd2f83f633df6d552a30525111b8b1228a0385d2/core/shared/src/main/scala/cats/effect/IO.scala#L606
@tpetillot IO doesn't use defaultCont (it has its own cont), so the first code you've linked doesn't run in this case.
Right! I tried to dig a bit more with this, I think I understand why onError re-surface the fatal error.
IO.onError rely on IO.handleErrorWith, which result in the following check of the exception, that treat fatal failures:
https://github.com/typelevel/cats-effect/blob/fd2f83f633df6d552a30525111b8b1228a0385d2/core/shared/src/main/scala/cats/effect/IOFiber.scala#L1306-L1315
@durban does this make sense? Before I draft a proposal, I’d like to clarify what we see as the correct behavior.
No, that f in the code you link is the lambda passed to handleErrorWith, which does not throw a fatal (or otherwise) exception. The .onError(_ => IO.unit) thing "works", because it catches, then reraises the exception. That it is fatal, is detected because of the reraising (the *> IO.raiseError(t) part).
To be clear, the behavior is still not entirely correct with the .onError(_ => IO.unit) thing. That's just a maybe useful workaround until the bug is fixed. (Not correct, because the fatal error is detected late, after the IO.unit is executed.)
Thanks for the additional insights @durban. Does this happen to be how the error propagation works?
How the error comes about:
CompletableFuture.handle()catches ALL exceptions, including fatal ones likeOutOfMemoryErrorIO.async_wraps callback registration inIO.delay(), which also catches fatal errors- This prevents fatal errors from crashing the JVM immediately
How the workaround works:
onError(_ => IO.unit)catches the wrapped fatal error- Executes
IO.unit(does nothing) - Re-raises the original error with
*> IO.raiseError(t) - The re-raised error then gets detected as fatal and crashes the JVM
- But this happens late - after the
IO.unitexecutes
Possible solution:
- Detect fatal errors immediately when they occur
- Re-throw them right away instead of wrapping them
Would that would be by checking for fatal errors in both
fromCompletableFutureandasync_and re-throwing them immediately?
I don't think that modified
fromCompletableFutureis necessary. This seems to work fine:IO.fromCompletableFuture(...).onError(_ => IO.unit).The fact that an
.onError(_ => IO.unit)changes behavior seems like a bug though.It is also not
fromCompletableFuturespecific, it is ratherasyncspecific:IO.async_ { cb => cb(Left(new OutOfMemoryError("Boom!"))) } This has the same behavior (and it is "fixed" by
.onError(_ => IO.unit)similarly).
It looks like the problem happens with both mechanisms independently catching fatal errors:
CompletableFuture.handle()catches fatal errors from the CompletableFutureIO.delay()catches fatal errors during callback registration inasync_
-
raiseErrorfatal propagation: https://github.com/typelevel/cats-effect/blob/series/3.x/core/shared/src/main/scala/cats/effect/IOFiber.scala#L267 -
IO.contI'm still struggling to extract the clear road of the exception:
- IO.scala#L1548
- OFiber.scala#L681
- IOFiber.scala#L124
- IOFiber.scala#L1410
- IOFiber.scala#L1303(probably, not sure as I don't see
contsbeings updated fromIOContevaluation 🤷 ) - IOFiber.scala#L1498 =
Outcome.Errored(t)we observed
should failed start with a check of fatal error and call onFatalFailure(t) remove the call on handleErrorWithK (catch would simply call failed).
@durban wdyt?
@tpetillot I'm sorry, I don't understand you last sentence.
In general, the approach in other (correct) cases is to detect the fatal error as soon as it occurs. An exception is raiseError, as it is possible to create it with a fatal error, which is detected when the raiseError node is executed. I'm pretty sure this is intentional. A tricky thing with cont is that the callback might be called with something which might not be used. (Multiple results, and also cont can complete synchronously.)