fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Overhead in parEvalMap?

Open balthz opened this issue 4 years ago • 10 comments

First of all, thanks for a great library: FS2 is a pleasure to use!

I'm using parEvalMap and see around 15% overhead. I understand that the expressiveness of FS2 comes at a perf cost but CPU costs :-) and I'm wondering whether there are some cherries one could pick to improve perf?

Here's my test:

import cats.effect.ExitCode
import cats.effect.IO
import cats.effect.IOApp
import cats.implicits.catsSyntaxApplicative
import fs2.Stream

object TestAppCE3 extends IOApp {
  implicit class RichIO[T](val io: IO[T]) extends AnyVal {
    def timed(msg: String) =
      IO(System.nanoTime()).bracket(_ => io)(start =>
        IO {
          val elapsed = System.nanoTime() - start
          println(s"$msg elapsed: $elapsed ns")
        }
      )
  }

  override def run(args: List[String]) = {
    val action: IO[Unit] = Stream
      .range[IO](1, 10000) // 10k tasks, each taking 1ms
      .evalMap(f(_)) // … so in an (unrealistically) ideal world this would take 10s
      .compile
      .drain
    action
      .timed("Iteration")
      .replicateA(20)
      .as(ExitCode.Success)
  }

  def f(i: Int): IO[Int] = IO { wait1ms(); i }

  private def wait1ms() = {
    val t = System.nanoTime() + 1000000
    while (System.nanoTime() < t) {}
  }
}

I get

Iteration elapsed: 10093772531 ns
Iteration elapsed: 10030164115 ns
Iteration elapsed: 10022108168 ns
Iteration elapsed: 10022818254 ns
…

which points to a very low overhead of <1%.

Switching val action to:

    val action: IO[Unit] = Stream
      .range[IO](1, 40000) // 40k tasks, each taking 1ms, require 40s of CPU …
      .parEvalMap(4)(f(_)) // … so running on 4 threads should in an (unrealistically) ideal world take 10s
      .compile
      .drain

yields

Iteration elapsed: 12789891357 ns
Iteration elapsed: 12005666107 ns
Iteration elapsed: 11930729719 ns
Iteration elapsed: 11646465369 ns
Iteration elapsed: 11579067252 ns
Iteration elapsed: 11610247786 ns
Iteration elapsed: 11586968552 ns
Iteration elapsed: 11644422875 ns
Iteration elapsed: 11661371395 ns
Iteration elapsed: 11571651559 ns
Iteration elapsed: 11639190254 ns
…

(and parEvalMapUnordered is similar). Here, the overhead is more pronounced.

This is using FS2 3.0.0-M8 together with the latest Cats Effect (i.e., I forced the dependency to 3.0.0-RC2).

+--- co.fs2:fs2-core_2.12:3.0.0-M8
|    +--- org.scala-lang:scala-library:2.12.13
|    +--- org.typelevel:cats-core_2.12:2.4.1 -> 2.4.2
|    |    +--- org.scala-lang:scala-library:2.12.13
|    |    +--- org.typelevel:cats-kernel_2.12:2.4.2
|    |    |    \--- org.scala-lang:scala-library:2.12.13
|    |    \--- org.typelevel:simulacrum-scalafix-annotations_2.12:0.5.4
|    |         \--- org.scala-lang:scala-library:2.12.13
|    +--- org.typelevel:cats-effect_2.12:3.0.0-RC1 -> 3.0.0-RC2
|    |    +--- org.scala-lang:scala-library:2.12.13
|    |    +--- org.typelevel:cats-effect-kernel_2.12:3.0.0-RC2
|    |    |    +--- org.scala-lang:scala-library:2.12.13
|    |    |    \--- org.typelevel:cats-core_2.12:2.4.2 (*)
|    |    \--- org.typelevel:cats-effect-std_2.12:3.0.0-RC2
|    |         +--- org.scala-lang:scala-library:2.12.13
|    |         \--- org.typelevel:cats-effect-kernel_2.12:3.0.0-RC2 (*)
|    \--- org.scodec:scodec-bits_2.12:1.1.23
|         \--- org.scala-lang:scala-library:2.12.11 -> 2.12.13

Comparison to FS2 I've also ran the same code with FS2 2.5.0 (Cats Effect 2.3.1). The numbers aren't much different (actually, sightly better):

# evalMap
Iteration elapsed: 10091977558 ns
Iteration elapsed: 10032869267 ns
Iteration elapsed: 10025386044 ns
Iteration elapsed: 10031929952 ns
Iteration elapsed: 10032904025 ns
Iteration elapsed: 10031275881 ns
Iteration elapsed: 10028500724 ns
Iteration elapsed: 10028133010 ns
…

# parEvalMap
Iteration elapsed: 12341162718 ns
Iteration elapsed: 11320087636 ns
Iteration elapsed: 11306991261 ns
Iteration elapsed: 11371308569 ns
Iteration elapsed: 11356323834 ns
Iteration elapsed: 11417376151 ns
Iteration elapsed: 11347617323 ns
…

# parEvalMapUnordered
Iteration elapsed: 12001404854 ns
Iteration elapsed: 11297600868 ns
Iteration elapsed: 11340505464 ns
Iteration elapsed: 11322217518 ns
Iteration elapsed: 11272687681 ns
Iteration elapsed: 11255546589 ns
Iteration elapsed: 11265575877 ns

To the extreme To measure the "pure" overhead, I tried:

  override def run(args: List[String]) = {
    val action: IO[Unit] = Stream
      .range[IO](1, 80000)
      .parEvalMap(4)(f(_))
      .compile
      .drain
    action
      .timed("Iteration4")
      .replicateA(20)
      .as(ExitCode.Success)
  }

  def f(i: Int): IO[Int] = IO.pure(i)

which gives

Iteration4 elapsed: 6060436201 ns
Iteration4 elapsed: 5080852565 ns
Iteration4 elapsed: 4549954372 ns
Iteration4 elapsed: 4603239151 ns
Iteration4 elapsed: 4643495467 ns
Iteration4 elapsed: 4583655952 ns
Iteration4 elapsed: 4609358231 ns
Iteration4 elapsed: 4508897732 ns

which amounts to 4508.89ms * 4 threads / 80000 = 0.225ms overhead per element.

balthz avatar Feb 24 '21 00:02 balthz

CC @djspiewak in case you're looking for some data on Cats Effect 3.0.0-RC2

balthz avatar Feb 24 '21 00:02 balthz

This is very interesting! I'd like to better understand what Fs2 is doing in this scenario.

djspiewak avatar Feb 24 '21 23:02 djspiewak

A lot of stuff. Fundamentally, the underlying implementation is parJoin, which solves a more complex problem: lazily generating the computations to run concurrently, and streaming the results of these (streaming) computations (this concern is not present in parEvalMap, but the implementation doesn't know).

That being said, I suspect there is some relatively low hanging fruit to be had with a more specialised implementation, the reason parJoin tends to be used directly is that it takes care of complex scoping concerns which have been hardened by years of fixing bugs with it.

SystemFw avatar Feb 24 '21 23:02 SystemFw

What would a more specialised implementation consist of? A bounded queue to insert the items that are executed and extract them in the order they come out?

Looking at the code of parJoin, which I think is the longest one of Stream, I suppose you could make a copy for the case when rather than Stream[F, Stream[F, O]], you only have one F[O] to join in each one. Also, the output from parJoin could be discarded.

diesalbla avatar Feb 25 '21 00:02 diesalbla

I'm not quite ready to get to that level of detail, but yeah trying to remove the indirection that comes from treating those F as Streams. Interested to hear from Daniel, if he has some specific insight about the perf results here.

SystemFw avatar Feb 25 '21 00:02 SystemFw

Would a asynch-profiler flame graph help here? And if so, for both (old and new) Cats Effect or just the latest? If it helps, happy to work on this (running on a Mac), let me know.

balthz avatar Feb 25 '21 02:02 balthz

yeah trying to remove the indirection that comes from treating those F as Streams.

Out of curiosity, would flatMap, eg (s: Stream[IO, Seq[A]]).flatMap(seq => Stream.emits(seq)), benefit from an explicit optimisation for Seq as well? Or is the overhead there small ? Intuitively it seems not having to deal with possible cancellation etc of the inner streams would allow for more speed.

balthz avatar Feb 25 '21 03:02 balthz

@balthz The Stream.emits method generates a stream with a single chunk, that has all the elements of the given sequence. Cancellation can only happen between chunks.

So, in your example, your result stream would have as many chunks as elements of type Seq[A] were in the stream s. If you would prefer to improve on that, and keep one chunk in result per chunk in s, you should do

def flattenChunk(ch: Chunk[Seq[A]]): Chunk[A] = Chunk.iterable(ch.iterator.flatMap(_.iterator))
(s: Stream[IO, Seq[A]]).mapChunks(flattenChunk)

diesalbla avatar Feb 25 '21 04:02 diesalbla

So, in your example, your result stream would have as many chunks as elements of type Seq[A] were in the stream s. If you would prefer to improve on that, and keep one chunk in result per chunk in s, you should do

def flattenChunk(ch: Chunk[Seq[A]]): Chunk[A] = Chunk.iterable(ch.iterator.flatMap(_.iterator))
(s: Stream[IO, Seq[A]]).mapChunks(flattenChunk)

Thank you for the detailed explanation, @diesalbla!

balthz avatar Feb 25 '21 04:02 balthz

parEvalMap & parEvalMapUnordered were reimplemented with 5x & 4x performance boosts respectively. I'm not sure whether we should port them to 2.x branch, because parEvalMap* methods currently contain cancelation and I don't know the difference between ce2/ce3 cancellations. Thus, backporting may induce hard to track bugs.

nikiforo avatar Dec 08 '21 09:12 nikiforo