cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

parTraverseN performance problems in presence of errors or other cancellations

Open nviliunov-evolution-throwaway opened this issue 5 months ago • 14 comments

Quick reproduction

val v = Vector.fill(400_000)(0)
v.parTraverseN(10) { i =>
  IO.sleep(10.seconds) >> IO.raiseError(new RuntimeException)
}.void

This will most likely cause 100% CPU consumption after the initial wait 10 seconds and soon after that starvation warnings.

Explanation

parTraverseN is defined as follows:

MiniSemaphore[F](n).flatMap { sem => ta.parTraverse { a => sem.withPermit(f(a)) } }

Meaning it creates a MiniSemaphore and then starts a proper parTraverse. parTraverse launches a fiber for each element of the 400k Vector (which is ok) with sem.withPermit(f(a)). Inside the implementation of MiniSemaphore and withPermit we see that: 1) it has a Ref-based state with a Queue inside; 2) it has a cleanup routine that is executed when the fiber is cancelled. The implementation of Queue is two linked lists. Unpleasant, but not the worst. The implementation of Ref (class SyncRef) implements modify like this:

  def modify[B](f: A => (A, B)): F[B] = {
    @tailrec
    def spin: B = {
      val c = ar.get
      val (u, b) = f(c)
      if (!ar.compareAndSet(c, u)) spin
      else b
    }
    F.delay(spin)
  }

This is a CAS-loop: it gets the current value, calculate the next value using the provided function, sets the variable to the calcualted value if the variable hasn’t been changed since then. If the variable has been changed, the compareAndSet fails and the loop continues until morale improves. cleanup is called when the fiber is cancelled. Coincidentally, whenever one of fibers fails with an error, the rest of them get cancelled by parTraverse. After that we have 399999 fibers trying to call cleanup on a single ref using all available CPUs. The cleanup function is trying to filter out a single element out of the queue, given linked lists, this could be quite bad.

Potential fix

In my opinion, the best fix would be to somehow limit the number of fiber that exist concurrently to N. This means we have to forgo the "fairness" property that is stated in the documentation, but not provided in practice as discussed in https://github.com/typelevel/cats-effect/issues/4262. Limiting the number of fibers would also be good on the memory consumption, since they are not free, and in our particular case 400k fibers easily take additional ~500 MiB of RAM.

If you'd prefer not to do that, replacing Queue in MiniSemaphore with a different data structure would work. I have reports that Set works better, maybe Vector will, too. This doesn't solve the contention problem, of course, only makes it not so hard on the CPUs.

Magnificent issue description.

This makes total sense. I need to think a bit about it but I agree with the diagnosis. The solution is indeed quite simple but it could have some other consequences.

djspiewak avatar Jul 08 '25 22:07 djspiewak

I think limiting the number of fibers to N makes sense. And I think we can preserve (or rather, improve) fairness even with that. I've started working on something like that a while ago. (I think I've stopped because what I had caused performance regressions in some cases on the parTraverseN benchmark.)

durban avatar Jul 09 '25 00:07 durban

@SystemFw if you have a bit of time and interest, would you mind swinging by and dropping some wisdom on us?

djspiewak avatar Jul 09 '25 23:07 djspiewak

Interestingly, the following behaves somewhat reasonably:

  @Benchmark
  def parTraverseNCancel(): Unit = {
    val e = new RuntimeException
    val test = 1.to(size * 10).toList.parTraverseN(size / 100) { _ =>
      IO.sleep(100.millis) *> IO.raiseError(e)
    }

    test.attempt.void.unsafeRunSync()
  }

This gives me about 2 ops/s, which is at least in the ballpark (and identical to if I just do 1.to(size)). However, an extra order of magnitude size and suddenly everything falls off a cliff to 0.015 ops/s. My snap guess is that we're filling up our internal queues at this breakpoint, which results in a huge amount of churn and life sucks. It would be nice to optimize this degradation a bit better, but pathological scheduler cases aside, this certainly feels like something we can and should fix.

djspiewak avatar Jul 09 '25 23:07 djspiewak

@durban Are you thinking of replacing MiniSemaphore with MiniQueue? Because that's kind of where my head is at right now. It does result in an annoying double-traverse (one to deconstruct, one to reconstruct) due to the generality of the method, which is somewhat annoying. I'm curious if you have a better solution in mind.

djspiewak avatar Jul 09 '25 23:07 djspiewak

@djspiewak Kind of... I've found my old branch: https://github.com/typelevel/cats-effect/compare/series/3.x...durban:cats-effect:parTraverseN_3 (I'm still reading my code, and trying to remember what I was thinking 2 years(!) ago. It's possibly more complicated than necessary... but the main idea was to obtain "fairness" by scheduling the tasks in a pseudorandom order.) And yes, double-traverse, essentially (+ shuffling because "fairness").

durban avatar Jul 10 '25 00:07 durban

I have to say this implementation of parTraverseN always gave me some pause, even though I can see the rationale for it as well. Imho a better starting point is something that reduces contention in the first place, e.g splitting in chunks, then doing parTraverse on each chunk. Obvs that suffers from head of line blocking if a task is slow in the current chunk, but I think we can play with that, e.g. marking how many tasks are finished to pull new ones, and/or building chunks via random selection instead of sequentially. Essentially making parTraverseN more in charge of the scheduling rather than spawning everything and letting the runtime deal with it. Perhaps that's not functionally different than the MiniQueue idea that's already been floated though.

SystemFw avatar Jul 10 '25 00:07 SystemFw

I think the current implementation also has head of line blocking issues, since the permit is acquired before the effect is scheduled. Really the only thing which is parallelized in a non-blocked fashion is the pure stuff, but arguably that's unintuitive. I imagine if you asked most users whether the pure part of their parTraverseN bypasses the semaphore, they would absolutely not answer in the affirmative.

I don't see a way around the second traverse sadly. The biggest question I think we have to answer is whether it's better to have a small set of long lived workers (thus imposing rate limiting) pulling from an unbounded queue or a large set of short lived workers pulling from a bounded queue. The former has the advantage of being doable without a custom data structure, while the latter would require a MiniQueue.

djspiewak avatar Jul 10 '25 02:07 djspiewak

I had another idea: what about keeping MiniSemaphore but not using withPermit? Rather, just >> release. Ultimately, fibers are contending in order to cleanup the permit count of a semaphore that no one is going to use anymore since they are all getting cancelled anyway if one has failed or interruption has hit. We'd need to also modify acquire potentially so that fibers blocked on it don't finalize.

This wouldn't limit the memory consumption of having a lot of idle fibers, of course.

SystemFw avatar Jul 10 '25 06:07 SystemFw

Might be really tricky to get the release to happen only in the case of self cancelation and not when externally canceled

djspiewak avatar Jul 10 '25 13:07 djspiewak

@durban I gave it some thought, and I don't think we need to randomize the execution order. Instead, we should probably just document that our scheduling is biased according to the traverse order. Additionally, we should mention that traverse must have a consistent order from run to run, otherwise we'll end up with reordered results (at least for parTraverseN; obviously it doesn't matter for parTraverseN_). This is interestingly subtle because it's technically possible for traverse to have an entirely nondeterministic visitation order from run to run while still being pure.

But basically, I think every user's mental model is very likely already biased in traversal order, and no one should be surprised if that remains the case. So in other words, I suspect no one really understood what we meant by "fairness" in the docs, and in any case we never had it, so there's no point trying to stick to it.

I think the simplest answer here is probably to do a MiniQueue. I'm also very tempted to override parTraverseN in asyncForIO simply because we have access to a much more efficient Queue. That will also require some overriding in the transformer instances of GenConcurrent, but it shouldn't be too bad.

djspiewak avatar Jul 10 '25 15:07 djspiewak

Okay I ended up borrowing @SystemFw's idea (which was a much smarter idea than I realized when I first read it). Heading in this direction:

  def parTraverseN[T[_]: Traverse, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[T[B]] = {
    require(n >= 1, s"Concurrency limit should be at least 1, was: $n")

    implicit val F: GenConcurrent[F, E] = this

    // TODO we need to write a test for error cancelation
    F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision =>
      MiniSemaphore[F](n) flatMap { sem =>
        val results = ta traverse { a =>
          sem.acquire >> F.uncancelable { poll =>
            f(a).guarantee(sem.release).start map { fiber =>
              supervision.update(_ + fiber) *>
                poll(fiber.joinWithNever)
                  .onCancel(fiber.cancel)
                  .guarantee(supervision.update(_ - fiber))
            }
          }
        }

        results.flatMap(_.sequence) guaranteeCase {
          case Outcome.Succeeded(_) => F.unit
          // has to be done in parallel to avoid head of line issues
          case _ => supervision.get.flatMap(_.toList.parTraverse_(_.cancel))
        }
      }
    }
  }

And then parTraverseN_, which is interestingly quite different:

  def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[Unit] = {
    require(n >= 1, s"Concurrency limit should be at least 1, was: $n")

    implicit val F: GenConcurrent[F, E] = this

    // TODO we need to write a test for error cancelation
    F.ref[List[Fiber[F, ?, ?]]](Nil) flatMap { supervision =>
      MiniSemaphore[F](n) flatMap { sem =>
        // TODO this seems promising. we just need to sequence the errors/self-cancelation later
        val startAll = ta.foldLeftM(()) { (_, a) =>
          F.uncancelable { _ =>
            sem.acquire >> f(a).guarantee(sem.release).start flatMap { fiber =>
              // supervision is handled very differently here: we never remove from the set
              supervision.update(fiber :: _)
            }
          }
        }

        // we block until it's all done by acquiring all the permits
        startAll.onCancel(supervision.get.flatMap(_.parTraverse_(_.cancel))) *>
          sem.acquire.replicateA_(n)
      }
    }
  }

The latter isn't quite done yet because errors and self-cancelation don't propagate out. Additionally, we apparently don't have any tests at all for error/cancelation propagation. Additionally additionally, the tests we have can be simplified. But it's headed in this direction at least.

djspiewak avatar Jul 10 '25 20:07 djspiewak

@djspiewak In my old branch (see above) I've added some tests for parTraverseN; feel free to use them, if they seem useful.

durban avatar Jul 10 '25 20:07 durban

Update on this quickly: I have a mostly working reimplementation of parTraverseN and parTraverseN_ (btw we should add Void variants to be more uniform with Cats) on a local branch. There's still more work to do, particularly in precisely the error cascading interrupt case, but it's heading in the right direction. @durban's unit tests are really useful, not only because they beef up a lot of the direct semantic testing for errors and self-cancelation, but they also have some subtle framing which verifies that we don't suffer from head-of-line blocking, so I have a lot more confidence in this implementation.

djspiewak avatar Jul 17 '25 16:07 djspiewak