cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

Possible Issue with Timeout in Cats-Effect 3.6.x which resulted in Http4s Connection leaks

Open scottmartin-jh opened this issue 3 months ago • 12 comments

Summary

After upgrading to cats effect 3.6.x we've observed some apparent client connection leaks in our http4s applications. Initial experimentation perhaps point to this change to timeout being a contributing factor: https://github.com/typelevel/cats-effect/pull/4059.

Reproducing

Cats Effect version: 3.6.3

This gist has a—somewhat—minimal reproducer. It does require http4s and a random web server available to throw a lot of requests at (here's the basic one we used). The gist uses Blaze as the client for ease of demonstration, but we've experienced the same symptoms using Ember as well.

The setup involves a client middleware that times out requests exceeding a configurable duration, historically that was implemented using timeout directly. Since upgrading to CE 3.6.x, this approach appears to cause leaked connections. Both reverting to Cats Effect 3.5.7 and re-implementing timeout using race directly resolve the issue. The withCETimeoutIncludingSleep variant that sleeps for any period after the timeout seems to avoid the leak as well.

Expected Behavior

All requests should complete without overflowing the wait queue since the number of concurrent requests (10) is less than max connections + wait queue size (10 + 3)

Actual Behavior

The client using the middleware with timeout directly throws a WaitQueueFull exception. The other client middlewares succeed.

scottmartin-jh avatar Sep 18 '25 13:09 scottmartin-jh

This sounds a bit like https://github.com/typelevel/fs2/issues/3590 but not quite the same. Will take a look at the reproducer.

djspiewak avatar Sep 18 '25 13:09 djspiewak

Quick question, does the following also work?

def withCETimeoutIncludingPure(client: Client[IO], timeout: Duration): Client[IO] = Client {
  (req: Request[IO]) =>
    client.run(req).timeout(timeout).flatTap(_ => Resource.pure(()))
}

djspiewak avatar Sep 23 '25 20:09 djspiewak

For me, this does not work, I'm still getting the WaitQueueFullFailures

samspills avatar Sep 23 '25 20:09 samspills

My theory is that this is #3111, which was fixed in #3226 by overriding race in Resource. Which was fine, as long as timeout used race. However, since #4059 it doesn't; it now uses racePair directly, which for Resource is the default GenConcurrent#racePair, which uses start. And Resource#start has this unintuitive behavior described by @djspiewak here: https://github.com/typelevel/cats-effect/pull/3226#pullrequestreview-1229683885.

If what I wrote above is right, I fear the next step is also overriding timeout in Resource...

durban avatar Sep 23 '25 23:09 durban

Ooooooh this is interesting. It also makes sense then that the downstream timeout would get the release to happen, since that runs through a full allocated cycle/re-wrap due to the fiber involvement. This feels like a very plausible explanation to me.

I really wish I could think of a more general answer than overriding timeout. I think the problem here is that Resource (sadly) admits a range of valid semantics, but the very reasonable user expectation is only a subset of those possibilities.

djspiewak avatar Sep 24 '25 17:09 djspiewak

I have further minimized the reproducer to remove the dependencies on http4s and fs2. Instead of a client we have a resource that decrements a ref on acquire and increments on release. The ref is initialized to 10, and we only run 10 at a time, so we should never go negative. The bug is that we do go negative, so we're not releasing the ref resource before moving on to running more

  1. I implemented the racePair version of timeout in place, and instead of racing against a sleep we're now racing against a Resource.never which means that the timeout will never trigger. The bug persists, so the issue isn't isolated to the process of timing out
  2. The List.fill is still necessary. A list of 100 doesn't reliably reproduce the failure, but a list of 5000 does.
//> using scala "2.13.15"
//> using dep "org.typelevel::cats-core:2.13.0"
//> using dep "org.typelevel::cats-effect:3.6.3"
//> using plugin org.typelevel:::kind-projector:0.13.3

import cats.syntax.all._
import cats.effect.syntax.all._
import cats.effect.kernel.GenSpawn
import cats.effect.kernel.Ref
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration.{Duration, DurationInt}

def withTimeoutRace(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
  incRes
    .race(Resource.sleep(timeout))
    .flatMap {
      case Left(resp) => Resource.pure(resp)
      case Right(_) => Resource.raiseError[IO, Unit, Throwable](new RuntimeException(timeout.toString))
    }
}

def withTimeoutRacePair(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
  Resource.uncancelable[IO, Unit] { poll =>
    val x = GenSpawn.apply[Resource[IO, *], Throwable]
    // note that we are racing against a Resource.never, so our resource will always win
    poll(x.racePair(incRes, Resource.never)) flatMap {
      case Left((oc, f)) => f.cancel *> oc.embed(poll(Resource.canceled) *> Resource.never)
      case Right((f, _)) =>
        f.cancel *> f.join.flatMap { oc => oc.fold(Resource.raiseError(new Throwable("blerf fallback")), Resource.raiseError[IO, Unit, Throwable], identity) }
    }
  }
}

def withTimeoutCE(incRes: Resource[IO, Unit], timeout: Duration): Resource[IO, Unit] = {
  incRes.timeout(timeout)
}

val counterRef = Ref[IO].of(10)
// decrement the ref on acquire, increment on release
// if we only run this resource 10 at a time, then we should never hit the exception
def decAndIncResource(ref: Ref[IO, Int]): Resource[IO, Unit] =
  Resource.make(
    ref.updateAndGet(_ - 1)
      .map(_ >= 0)
      .ifM(IO.unit, IO.raiseError(new RuntimeException("tried to use ref more than 10 times without closing")))
  )(_ => ref.update(_ + 1))

counterRef
  .map{ ref => decAndIncResource(ref) }
  .toResource
  .use { decAndIncRes =>

    // succeeds
    //val timeoutResource = withTimeoutRace(decAndIncRes, 30.seconds)

    // failing
    val timeoutResource = withTimeoutRacePair(decAndIncRes, 30.seconds)
    //val timeoutResource = withTimeoutCE(decAndIncRes, 30.seconds)

    List.fill(5000)(IO.unit)
      .parTraverseN(10)(_ => timeoutResource.use_)
      .flatMap(_ => IO.println("completed successfully"))
  }
  .unsafeRunSync()

samspills avatar Sep 25 '25 15:09 samspills

I golfed it down even more. tl;dr after applying the timeout combinator to Resource, there is a race condition where it might not actually release before use returns

//> using dep org.typelevel::cats-effect::3.6.3

import cats.effect.*
import cats.effect.syntax.all.*
import scala.concurrent.duration.*

object Bug extends IOApp.Simple {

  def run = IO.ref(false).flatMap { ref =>
    val res = Resource.make(ref.set(true))(_ => ref.set(false))
    val timedRes = res.timeout(1.hour)
    val test = timedRes.use_ *>
      ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
    test.replicateA_(10000)
  }

}

armanbilge avatar Sep 25 '25 16:09 armanbilge

Another minimization, replacing timeout with racePair against a never. This usually takes a few iterations to fail, so it is a race condition.

Note, the following minimization doesn't cancel the "never" fiber that always loses the race, it simply discards it, so I'm not sure if this is correct. Still, it is not obvious to me why resource release of the winning fiber would depend on handling of the losing fiber, but perhaps this relates to the "unintuitive" behavior that @durban was referencing above?

//> using dep org.typelevel::cats-effect::3.6.3
//> using option -Xkind-projector

import cats.effect.*

object Bug extends IOApp.Simple {

  def run = IO.ref(false).flatMap { ref =>
    val res = Resource.make(ref.set(true))(_ => ref.set(false))
    val racedRes = Spawn[Resource[IO, *]].racePair(res, Resource.never)
    val test = racedRes.use_ *>
      ref.get.ifM(IO.raiseError(new Exception("not released")), IO.unit)
    
    def go(i: Int): IO[Nothing] =
      test.onError(_ => IO.println(s"failed on iteration $i")) >> go(i + 1)

    go(0)
  }

}

armanbilge avatar Sep 25 '25 17:09 armanbilge

This feels like a bug in the Resource fiber implementation tbh. The semantics are that the finalizer of res should have been carried through the race into racedRes, so I would have expected that ref would still be true within the use, but the test here is checking whether it is true after the use_, when I would absolutely have expected it to be finalized.

djspiewak avatar Sep 25 '25 19:09 djspiewak

I would absolutely have expected it to be finalized

Me too. But are we sure this is not just the expected semantics of start? Specifically case (1)(a): the scope has terminated (after the use_), but because the fiber is incomplete, it will run the finalizer (ref.set(false)) when it completes (concurrently to the termination of the scope). Which I think is exactly the "unintuitive" behavior @durban was referencing in https://github.com/typelevel/cats-effect/issues/4489#issuecomment-3325843860.

https://github.com/typelevel/cats-effect/blob/54e35fbb1b14a4c81d8edfd671053af0f4c182ae/kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala#L610-L621

armanbilge avatar Sep 25 '25 20:09 armanbilge

In that above snippet, "fiber" is scoped to just Resource. If you use, you're basically terminating the fiber from this point of view. So precisely because of 1.a, I would expect the ref to be set to false in the guarantee for the use_ effect.

djspiewak avatar Sep 25 '25 20:09 djspiewak

@armanbilge

...but because the fiber is incomplete, it will run the finalizer (ref.set(false)) when it completes (concurrently to the termination of the scope). Which I think is exactly the "unintuitive" behavior @durban was referencing

Yeah.

But also, I think there is an actual bug (i.e., not just "unintuitive", but a bug) in Resource#start, where the finalizer never runs (i.e., not just late, but actually never). In particular, when finalizeOuter runs after the guarantee, but before the flatMap (of poll(this.allocated)). I believe this small modification of @armanbilge's latest minimization shows this bug:

//> using dep org.typelevel::cats-effect::3.6.3
//> using option -Xkind-projector

import cats.effect.*
import scala.concurrent.duration.*

object Bug extends IOApp.Simple {

  def run = IO.ref(false).flatMap { ref =>

    def res(d: Deferred[IO, Unit]) = Resource.make(ref.set(true))(_ => ref.set(false) *> d.complete(()).void)
    def racedRes(d: Deferred[IO, Unit]) = Spawn[Resource[IO, *]].racePair(res(d), Resource.never)
    def test(d: Deferred[IO, Unit]) = racedRes(d).use_ *>
      ref.get.ifM(d.get, IO.unit)

    def go(i: Int): IO[Nothing] = IO.deferred[Unit].flatMap { d =>
      test(d).onError(_ => IO.println(s"failed on iteration $i")) >> IO.println(i) >> go(i + 1)
    }

    go(0)
  }

}

It hangs (possibly after printing some numbers). If the finalizer was just late, I think it should never hang.

So, to summarize, I think there are 2 things: an "unintuitive behavior", and an "actual bug". I think we can fix the bug (and we should), but I also think that won't fix with the original issue.

(For future reference, I believe this was the latest commit modifying start: https://github.com/typelevel/cats-effect/commit/43690c295d9221eaf930195edc5d43b093807318)

durban avatar Sep 25 '25 23:09 durban