Overhead in parEvalMap?
First of all, thanks for a great library: FS2 is a pleasure to use!
I'm using parEvalMap and see around 15% overhead. I understand that the expressiveness of FS2 comes at a perf cost but CPU costs :-) and I'm wondering whether there are some cherries one could pick to improve perf?
Here's my test:
import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits.catsSyntaxApplicative
import fs2.Stream
object TestAppCE3 extends IOApp {
implicit class RichIO[T](val io: IO[T]) extends AnyVal {
def timed(msg: String) =
IO(System.nanoTime()).bracket(_ => io)(start =>
IO {
val elapsed = System.nanoTime() - start
println(s"$msg elapsed: $elapsed ns")
}
)
}
override def run(args: List[String]) = {
val action: IO[Unit] = Stream
.range[IO](1, 10000) // 10k tasks, each taking 1ms
.evalMap(f(_)) // … so in an (unrealistically) ideal world this would take 10s
.compile
.drain
action
.timed("Iteration")
.replicateA(20)
.as(ExitCode.Success)
}
def f(i: Int): IO[Int] = IO { wait1ms(); i }
private def wait1ms() = {
val t = System.nanoTime() + 1000000
while (System.nanoTime() < t) {}
}
}
I get
Iteration elapsed: 10093772531 ns
Iteration elapsed: 10030164115 ns
Iteration elapsed: 10022108168 ns
Iteration elapsed: 10022818254 ns
…
which points to a very low overhead of <1%.
Switching val action to:
val action: IO[Unit] = Stream
.range[IO](1, 40000) // 40k tasks, each taking 1ms, require 40s of CPU …
.parEvalMap(4)(f(_)) // … so running on 4 threads should in an (unrealistically) ideal world take 10s
.compile
.drain
yields
Iteration elapsed: 12789891357 ns
Iteration elapsed: 12005666107 ns
Iteration elapsed: 11930729719 ns
Iteration elapsed: 11646465369 ns
Iteration elapsed: 11579067252 ns
Iteration elapsed: 11610247786 ns
Iteration elapsed: 11586968552 ns
Iteration elapsed: 11644422875 ns
Iteration elapsed: 11661371395 ns
Iteration elapsed: 11571651559 ns
Iteration elapsed: 11639190254 ns
…
(and parEvalMapUnordered is similar). Here, the overhead is more pronounced.
This is using FS2 3.0.0-M8 together with the latest Cats Effect (i.e., I forced the dependency to 3.0.0-RC2).
+--- co.fs2:fs2-core_2.12:3.0.0-M8
| +--- org.scala-lang:scala-library:2.12.13
| +--- org.typelevel:cats-core_2.12:2.4.1 -> 2.4.2
| | +--- org.scala-lang:scala-library:2.12.13
| | +--- org.typelevel:cats-kernel_2.12:2.4.2
| | | \--- org.scala-lang:scala-library:2.12.13
| | \--- org.typelevel:simulacrum-scalafix-annotations_2.12:0.5.4
| | \--- org.scala-lang:scala-library:2.12.13
| +--- org.typelevel:cats-effect_2.12:3.0.0-RC1 -> 3.0.0-RC2
| | +--- org.scala-lang:scala-library:2.12.13
| | +--- org.typelevel:cats-effect-kernel_2.12:3.0.0-RC2
| | | +--- org.scala-lang:scala-library:2.12.13
| | | \--- org.typelevel:cats-core_2.12:2.4.2 (*)
| | \--- org.typelevel:cats-effect-std_2.12:3.0.0-RC2
| | +--- org.scala-lang:scala-library:2.12.13
| | \--- org.typelevel:cats-effect-kernel_2.12:3.0.0-RC2 (*)
| \--- org.scodec:scodec-bits_2.12:1.1.23
| \--- org.scala-lang:scala-library:2.12.11 -> 2.12.13
Comparison to FS2
I've also ran the same code with FS2 2.5.0 (Cats Effect 2.3.1). The numbers aren't much different (actually, sightly better):
# evalMap
Iteration elapsed: 10091977558 ns
Iteration elapsed: 10032869267 ns
Iteration elapsed: 10025386044 ns
Iteration elapsed: 10031929952 ns
Iteration elapsed: 10032904025 ns
Iteration elapsed: 10031275881 ns
Iteration elapsed: 10028500724 ns
Iteration elapsed: 10028133010 ns
…
# parEvalMap
Iteration elapsed: 12341162718 ns
Iteration elapsed: 11320087636 ns
Iteration elapsed: 11306991261 ns
Iteration elapsed: 11371308569 ns
Iteration elapsed: 11356323834 ns
Iteration elapsed: 11417376151 ns
Iteration elapsed: 11347617323 ns
…
# parEvalMapUnordered
Iteration elapsed: 12001404854 ns
Iteration elapsed: 11297600868 ns
Iteration elapsed: 11340505464 ns
Iteration elapsed: 11322217518 ns
Iteration elapsed: 11272687681 ns
Iteration elapsed: 11255546589 ns
Iteration elapsed: 11265575877 ns
To the extreme To measure the "pure" overhead, I tried:
override def run(args: List[String]) = {
val action: IO[Unit] = Stream
.range[IO](1, 80000)
.parEvalMap(4)(f(_))
.compile
.drain
action
.timed("Iteration4")
.replicateA(20)
.as(ExitCode.Success)
}
def f(i: Int): IO[Int] = IO.pure(i)
which gives
Iteration4 elapsed: 6060436201 ns
Iteration4 elapsed: 5080852565 ns
Iteration4 elapsed: 4549954372 ns
Iteration4 elapsed: 4603239151 ns
Iteration4 elapsed: 4643495467 ns
Iteration4 elapsed: 4583655952 ns
Iteration4 elapsed: 4609358231 ns
Iteration4 elapsed: 4508897732 ns
which amounts to 4508.89ms * 4 threads / 80000 = 0.225ms overhead per element.
CC @djspiewak in case you're looking for some data on Cats Effect 3.0.0-RC2
This is very interesting! I'd like to better understand what Fs2 is doing in this scenario.
A lot of stuff. Fundamentally, the underlying implementation is parJoin, which solves a more complex problem: lazily generating the computations to run concurrently, and streaming the results of these (streaming) computations (this concern is not present in parEvalMap, but the implementation doesn't know).
That being said, I suspect there is some relatively low hanging fruit to be had with a more specialised implementation, the reason parJoin tends to be used directly is that it takes care of complex scoping concerns which have been hardened by years of fixing bugs with it.
What would a more specialised implementation consist of? A bounded queue to insert the items that are executed and extract them in the order they come out?
Looking at the code of parJoin, which I think is the longest one of Stream, I suppose you could make a copy for the case when rather than Stream[F, Stream[F, O]], you only have one F[O] to join in each one. Also, the output from parJoin could be discarded.
I'm not quite ready to get to that level of detail, but yeah trying to remove the indirection that comes from treating those F as Streams. Interested to hear from Daniel, if he has some specific insight about the perf results here.
Would a asynch-profiler flame graph help here? And if so, for both (old and new) Cats Effect or just the latest? If it helps, happy to work on this (running on a Mac), let me know.
yeah trying to remove the indirection that comes from treating those
FasStreams.
Out of curiosity, would flatMap, eg (s: Stream[IO, Seq[A]]).flatMap(seq => Stream.emits(seq)), benefit from an explicit optimisation for Seq as well? Or is the overhead there small ? Intuitively it seems not having to deal with possible cancellation etc of the inner streams would allow for more speed.
@balthz The Stream.emits method generates a stream with a single chunk, that has all the elements of the given sequence. Cancellation can only happen between chunks.
So, in your example, your result stream would have as many chunks as elements of type Seq[A] were in the stream s. If you would prefer to improve on that, and keep one chunk in result per chunk in s, you should do
def flattenChunk(ch: Chunk[Seq[A]]): Chunk[A] = Chunk.iterable(ch.iterator.flatMap(_.iterator))
(s: Stream[IO, Seq[A]]).mapChunks(flattenChunk)
So, in your example, your result stream would have as many chunks as elements of type
Seq[A]were in the streams. If you would prefer to improve on that, and keep one chunk in result per chunk ins, you should dodef flattenChunk(ch: Chunk[Seq[A]]): Chunk[A] = Chunk.iterable(ch.iterator.flatMap(_.iterator)) (s: Stream[IO, Seq[A]]).mapChunks(flattenChunk)
Thank you for the detailed explanation, @diesalbla!
parEvalMap & parEvalMapUnordered were reimplemented with 5x & 4x performance boosts respectively.
I'm not sure whether we should port them to 2.x branch, because parEvalMap* methods currently contain cancelation and I don't know the difference between ce2/ce3 cancellations. Thus, backporting may induce hard to track bugs.