cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

Proposal: IO#raceMany that accepts collection of IO[T], returns winner and cancels others

Open Adam-Kurzawa opened this issue 6 years ago • 14 comments
trafficstars

I've been considering a race scenario implementation for a collection of IOs. Concept itself is similar to IO#race but stretched out to many IOs. Input is a collection of IO[T]. The return value is a race winner (first completed) wrapped as IO[Either[Throwable, T]] while other IOs are canceled.

I've pushed an example to my repo, just have in mind this is quick and dirty effort of one evening ;) Based on original IO#race code.

https://github.com/Adam-Kurzawa/cats-race-many/blob/master/src/main/scala/cats/effect/internals/IORaceCollection.scala

Adam-Kurzawa avatar Apr 24 '19 21:04 Adam-Kurzawa

Hmm, what would this be useful for? The "cancel everything" semantic is pretty blunt.

djspiewak avatar May 26 '19 03:05 djspiewak

I think this could easily be done using something like foldMapK and a MonoidK instance that races, something similar to what's discussed in #405

LukaJCB avatar May 26 '19 15:05 LukaJCB

Exactly. The only advantage to putting it on IO would be for optimization reasons.

djspiewak avatar May 26 '19 19:05 djspiewak

I will reveal some more context behind this idea. I have seen a code depending on a racing scenario - it used from 2 to 4 (depending on config env) parrallel calls to cache to mitigate network latency spikes. Later it was implemented into HTTP request mechanism to handle one troublesome service. Implementation was done by folding list of IOs into IO.race, like

list.foldLeft(io) { racingAcc, nextInList => IO.race(racingAcc, nextInList) }

It worked but with some minor issues when used in racing HTTP requests. Also the whole IO-folding code was ugly when combined with some exception handling and logging (excerpt above is just to show a concept). I tried to refactor it but didn't end with anything better than fold. After that I experimented with Cats Effect internals and came with this idea.

  1. @LukaJCB I will have a look into this. Maybe that is the solution after all.
  2. I think generalisation is a good thing. Why are we limiting racing to just 2 elements? Why not to 3? IMHO racing a collection makes Cats Effects API more elegant and doesn't limit users' creativity.
  3. If this low-level race turns out more performant than some folding/flatmaping magic than hell yeah!

Adam-Kurzawa avatar May 26 '19 21:05 Adam-Kurzawa

@Adam-Kurzawa Did you have a chance to look at the SemigroupK solution? I can definitely see value in what you're proposing here (only two years later!).

djspiewak avatar Feb 06 '21 04:02 djspiewak

@djspiewak After all have not tried it. Although I still think raceMany would be a nice addition to IO API. What has changed though regarding your interest in it?

Adam-Kurzawa avatar Feb 06 '21 13:02 Adam-Kurzawa

I followed the threads, it looked like the MonoidK instance was added but then removed.

I'm not sure what the SemiGroupK solution is referring to (is that the same idea as MonadK?), but I couldn't find any simple way of doing this with ce3.

timbertson avatar Mar 18 '22 11:03 timbertson

@timbertson hmm, maybe like this?

import cats.effect._
import cats.syntax.all._

List(IO(1), IO(2), IO(3)).reduce(_.race(_).map(_.merge))

https://scastie.scala-lang.org/1rQyz1fuTpSkSIX65zARpg

armanbilge avatar Mar 18 '22 11:03 armanbilge

I'm not sure what the SemiGroupK solution is referring to (is that the same idea as MonadK?), but I couldn't find any simple way of doing this with ce3.

Wow, that was a trip down memory lane…

Okay so the MonoidK instance for Fiber was unlawful for two reasons, one of which is fundamental the other of which is simply a function of CE2. The latter reason (the lack of a reasonable empty) is now resolved because of IO.canceled, though it still isn't quite as simple as racing join and calling it a day. The fundamental unlawfulness is much more subtle, and basically boils down to the fact that the laws ignore wall clock time, and thus anything involving race is unlawful by definition. This is kind of a fascinating consequence because it extends well beyond race itself (e.g. the functor laws no longer hold for any program which is later composed into a race). Ultimately, this is just a consequence of the definition of concurrency itself, and there's not a lot we can really do about it. Term substitution is a lovely calculus but it breaks down in cases like this, taking all of our laws with it.

With all that said though, manipulating Fiber directly is definitely the wrong approach. Something like what @armanbilge posted feels a lot better and doesn't have any of the resource leak issues, and the ease of doing something like this makes me pretty leery about building a general combinator into the API.

djspiewak avatar Mar 18 '22 14:03 djspiewak

Thanks for the responses. I was worried that reduce idea would only race the first two initially, but then I remembered how IO works ;)

I still think it'd be a nice addition to the API since it's conceptually similar to race, and although it's not super complex I wouldn't say the implementation above is obvious. I'm porting from Monix where it's called raceMany, but Monix is admittedly more liberal with adding convenience methods, so it's up to you.

timbertson avatar Mar 19 '22 03:03 timbertson

We have methods parSequence and parTraverse, I feel like a "raceTraverse" could fit in alongside them.

armanbilge avatar Mar 19 '22 04:03 armanbilge

I like this idea, because the (I think common?) case of having a solution space where there is one solution (or you want any valid solution). Say find a prime.

Each worker may or may not find a prime. You want to stop all workers once one succeeds. It is possible None succeed. You may or may not have a timeout.

odenzo avatar Jun 23 '22 00:06 odenzo

def parCollectFirstOf[T](
     timeout: FiniteDuration,
     workers: NonEmptyList[IO[Option[T]]]
 ): IO[T] = {

   val onlyWorkers: NonEmptyList[IO[T]] = workers.map { ioOpt =>
     ioOpt.flatMap {
       case None        => IO.never
       case Some(value) => IO.pure(value)
     }
   }
   val racePairs: IO[T]                 = onlyWorkers.length match {
     case 1 => onlyWorkers.head
     case _ =>
       onlyWorkers.tail.foldLeft(onlyWorkers.head)((a: IO[T], b: IO[T]) =>
         a.race(b).map(or => or.fold(identity, identity))
       )
   }
   racePairs.timeout(timeout)
 }
}

is my approach. The cancelation is odd and would iterate down instead of blasting N cancels depending on which of the sub-Race completes.

This assumes any error in any fibre means life is bad and they all get killed and leaves done fibres which found no answers having around with an IO.never

So, I think a dedicated parCollectFirst could be implemented faster, and actually let the fibres that return None exit. The above is good enough for me, since very heavy worker threads.

odenzo avatar Jun 23 '22 00:06 odenzo