Possible Issue with Timeout in Cats-Effect 3.6.x which resulted in Http4s Connection leaks
Summary
After upgrading to cats effect 3.6.x we've observed some apparent client connection leaks in our http4s applications. Initial experimentation perhaps point to this change to timeout being a contributing factor: https://github.com/typelevel/cats-effect/pull/4059.
Reproducing
Cats Effect version: 3.6.3
This gist has a—somewhat—minimal reproducer. It does require http4s and a random web server available to throw a lot of requests at (here's the basic one we used). The gist uses Blaze as the client for ease of demonstration, but we've experienced the same symptoms using Ember as well.
The setup involves a client middleware that times out requests exceeding a configurable duration, historically that was implemented using timeout directly. Since upgrading to CE 3.6.x, this approach appears to cause leaked connections. Both reverting to Cats Effect 3.5.7 and re-implementing timeout using race directly resolve the issue. The withCETimeoutIncludingSleep variant that sleeps for any period after the timeout seems to avoid the leak as well.
Expected Behavior
All requests should complete without overflowing the wait queue since the number of concurrent requests (10) is less than max connections + wait queue size (10 + 3)
Actual Behavior
The client using the middleware with timeout directly throws a WaitQueueFull exception. The other client middlewares succeed.
This sounds a bit like https://github.com/typelevel/fs2/issues/3590 but not quite the same. Will take a look at the reproducer.
Quick question, does the following also work?
def withCETimeoutIncludingPure(client: Client[IO], timeout: Duration): Client[IO] = Client {
(req: Request[IO]) =>
client.run(req).timeout(timeout).flatTap(_ => Resource.pure(()))
}
For me, this does not work, I'm still getting the WaitQueueFullFailures
My theory is that this is #3111, which was fixed in #3226 by overriding race in Resource. Which was fine, as long as timeout used race. However, since #4059 it doesn't; it now uses racePair directly, which for Resource is the default GenConcurrent#racePair, which uses start. And Resource#start has this unintuitive behavior described by @djspiewak here: https://github.com/typelevel/cats-effect/pull/3226#pullrequestreview-1229683885.
If what I wrote above is right, I fear the next step is also overriding timeout in Resource...
Ooooooh this is interesting. It also makes sense then that the downstream timeout would get the release to happen, since that runs through a full allocated cycle/re-wrap due to the fiber involvement. This feels like a very plausible explanation to me.
I really wish I could think of a more general answer than overriding timeout. I think the problem here is that Resource (sadly) admits a range of valid semantics, but the very reasonable user expectation is only a subset of those possibilities.
I have further minimized the reproducer to remove the dependencies on http4s and fs2. Instead of a client we have a resource that decrements a ref on acquire and increments on release. The ref is initialized to 10, and we only run 10 at a time, so we should never go negative. The bug is that we do go negative, so we're not releasing the ref resource before moving on to running more
- I implemented the
racePairversion of timeout in place, and instead of racing against a sleep we're now racing against aResource.neverwhich means that the timeout will never trigger. The bug persists, so the issue isn't isolated to the process of timing out - The
List.fillis still necessary. A list of 100 doesn't reliably reproduce the failure, but a list of 5000 does.
//> using scala "2.13.15"
//> using dep "org.typelevel::cats-core:2.13.0"
//> using dep "org.typelevel::cats-effect:3.6.3"
//> using plugin org.typelevel:::kind-projector:0.13.3
import cats.syntax.all._
import cats.effect.syntax.all._
import cats.effect.kernel.GenSpawn
import cats.effect.kernel.Ref
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration.{Duration, DurationInt}
def withTimeoutRace(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
incRes
.race(Resource.sleep(timeout))
.flatMap {
case Left(resp) => Resource.pure(resp)
case Right(_) => Resource.raiseError[IO, Unit, Throwable](new RuntimeException(timeout.toString))
}
}
def withTimeoutRacePair(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
Resource.uncancelable[IO, Unit] { poll =>
val x = GenSpawn.apply[Resource[IO, *], Throwable]
// note that we are racing against a Resource.never, so our resource will always win
poll(x.racePair(incRes, Resource.never)) flatMap {
case Left((oc, f)) => f.cancel *> oc.embed(poll(Resource.canceled) *> Resource.never)
case Right((f, _)) =>
f.cancel *> f.join.flatMap { oc => oc.fold(Resource.raiseError(new Throwable("blerf fallback")), Resource.raiseError[IO, Unit, Throwable], identity) }
}
}
}
def withTimeoutCE(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
incRes.timeout(timeout)
}
val counterRef = Ref[IO].of(10)
// decrement the ref on acquire, increment on release
// if we only run this resource 10 at a time, then we should never hit the exception
def decAndIncResource(ref: Ref[IO, Int]): Resource[IO, Unit] =
Resource.make(
ref.updateAndGet(_ - 1)
.map(_ >= 0)
.ifM(IO.unit, IO.raiseError(new RuntimeException("tried to use ref more than 10 times without closing")))
)(_ => ref.update(_ + 1))
counterRef
.map{ ref => decAndIncResource(ref) }
.toResource
.use { decAndIncRes =>
// succeeds
//val timeoutResource = withTimeoutRace(decAndIncRes, 30.seconds)
// failing
val timeoutResource = withTimeoutRacePair(decAndIncRes, 30.seconds)
//val timeoutResource = withTimeoutCE(decAndIncRes, 30.seconds)
List.fill(5000)(IO.unit)
.parTraverseN(10)(_ => timeoutResource.use_)
.flatMap(_ => IO.println("completed successfully"))
}
.unsafeRunSync()
I golfed it down even more. tl;dr after applying the timeout combinator to Resource, there is a race condition where it might not actually release before use returns
//> using dep org.typelevel::cats-effect::3.6.3
import cats.effect.*
import cats.effect.syntax.all.*
import scala.concurrent.duration.*
object Bug extends IOApp.Simple {
def run = IO.ref(false).flatMap { ref =>
val res = Resource.make(ref.set(true))(_ => ref.set(false))
val timedRes = res.timeout(1.hour)
val test = timedRes.use_ *>
ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
test.replicateA_(10000)
}
}
Another minimization, replacing timeout with racePair against a never. This usually takes a few iterations to fail, so it is a race condition.
Note, the following minimization doesn't cancel the "never" fiber that always loses the race, it simply discards it, so I'm not sure if this is correct. Still, it is not obvious to me why resource release of the winning fiber would depend on handling of the losing fiber, but perhaps this relates to the "unintuitive" behavior that @durban was referencing above?
//> using dep org.typelevel::cats-effect::3.6.3
//> using option -Xkind-projector
import cats.effect.*
object Bug extends IOApp.Simple {
def run = IO.ref(false).flatMap { ref =>
val res = Resource.make(ref.set(true))(_ => ref.set(false))
val racedRes = Spawn[Resource[IO, *]].racePair(res, Resource.never)
val test = racedRes.use_ *>
ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
def go(i: Int): IO[Nothing] =
test.onError(_ => IO.println(s"failed on iteration $i")) >> go(i + 1)
go(0)
}
}
This feels like a bug in the Resource fiber implementation tbh. The semantics are that the finalizer of res should have been carried through the race into racedRes, so I would have expected that ref would still be true within the use, but the test here is checking whether it is true after the use_, when I would absolutely have expected it to be finalized.
I would absolutely have expected it to be finalized
Me too. But are we sure this is not just the expected semantics of start? Specifically case (1)(a): the scope has terminated (after the use_), but because the fiber is incomplete, it will run the finalizer (ref.set(false)) when it completes (concurrently to the termination of the scope). Which I think is exactly the "unintuitive" behavior @durban was referencing in https://github.com/typelevel/cats-effect/issues/4489#issuecomment-3325843860.
https://github.com/typelevel/cats-effect/blob/54e35fbb1b14a4c81d8edfd671053af0f4c182ae/kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala#L610-L621
In that above snippet, "fiber" is scoped to just Resource. If you use, you're basically terminating the fiber from this point of view. So precisely because of 1.a, I would expect the ref to be set to false in the guarantee for the use_ effect.
@armanbilge
...but because the fiber is incomplete, it will run the finalizer (ref.set(false)) when it completes (concurrently to the termination of the scope). Which I think is exactly the "unintuitive" behavior @durban was referencing
Yeah.
But also, I think there is an actual bug (i.e., not just "unintuitive", but a bug) in Resource#start, where the finalizer never runs (i.e., not just late, but actually never). In particular, when finalizeOuter runs after the guarantee, but before the flatMap (of poll(this.allocated)). I believe this small modification of @armanbilge's latest minimization shows this bug:
//> using dep org.typelevel::cats-effect::3.6.3
//> using option -Xkind-projector
import cats.effect.*
import scala.concurrent.duration.*
object Bug extends IOApp.Simple {
def run = IO.ref(false).flatMap { ref =>
def res(d: Deferred[IO, Unit]) = Resource.make(ref.set(true))(_ => ref.set(false) *> d.complete(()).void)
def racedRes(d: Deferred[IO, Unit]) = Spawn[Resource[IO, *]].racePair(res(d), Resource.never)
def test(d: Deferred[IO, Unit]) = racedRes(d).use_ *>
ref.get.ifM(d.get, IO.unit)
def go(i: Int): IO[Nothing] = IO.deferred[Unit].flatMap { d =>
test(d).onError(_ => IO.println(s"failed on iteration $i")) >> IO.println(i) >> go(i + 1)
}
go(0)
}
}
It hangs (possibly after printing some numbers). If the finalizer was just late, I think it should never hang.
So, to summarize, I think there are 2 things: an "unintuitive behavior", and an "actual bug". I think we can fix the bug (and we should), but I also think that won't fix with the original issue.
(For future reference, I believe this was the latest commit modifying start: https://github.com/typelevel/cats-effect/commit/43690c295d9221eaf930195edc5d43b093807318)