fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Topic suffers under heavy load, `Set` comparison CPU intensive

Open SystemFw opened this issue 6 years ago • 30 comments

There is a problem with Topic (and I suspect with Signal as well), probably due to how CPU intensive Set comparison is. One of my teams had problems where reasonably heavy traffic was causing thread starvation due to the CPU work needed for that comparison. I was able to fix by porting the old Topic (without PubSub), but we need to address this because it's a pretty serious regression right now. I'm trying to come up with a reproducible example, which isn't super easy.

SystemFw avatar Jan 29 '19 12:01 SystemFw

In the meantime, I can share the stack trace commonly seen in the problematic scenario

"ForkJoinPool-2-worker-6" #83 daemon prio=5 os_prio=0 tid=0x00007f56fb6d3000 nid=0x13bc runnable [0x00007f56db7fd000]
   java.lang.Thread.State: RUNNABLE
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:149)
        at scala.Tuple2.hashCode(Tuple2.scala:24)
        at scala.runtime.Statics.anyHash(Statics.java:122)
        at scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
        at scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
        at scala.collection.immutable.HashMap.contains(HashMap.scala:59)
        at scala.collection.MapLike$DefaultKeySet.contains(MapLike.scala:176)
        at scala.collection.GenSetLike.apply(GenSetLike.scala:48)
        at scala.collection.GenSetLike.apply$(GenSetLike.scala:48)
        at scala.collection.AbstractSet.apply(Set.scala:51)
        at scala.collection.immutable.MapLike$ImmutableDefaultKeySet.apply(MapLike.scala:112)
        at scala.collection.Iterator.forall(Iterator.scala:953)
        at scala.collection.Iterator.forall$(Iterator.scala:951)
        at scala.collection.AbstractIterator.forall(Iterator.scala:1429)
        at scala.collection.IterableLike.forall(IterableLike.scala:77)
        at scala.collection.IterableLike.forall$(IterableLike.scala:76)
        at scala.collection.AbstractIterable.forall(Iterable.scala:56)
        at scala.collection.GenSetLike.subsetOf(GenSetLike.scala:107)
        at scala.collection.GenSetLike.subsetOf$(GenSetLike.scala:107)
        at scala.collection.AbstractSet.subsetOf(Set.scala:51)
        at scala.collection.GenSetLike.liftedTree1$1(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals$(GenSetLike.scala:119)
        at scala.collection.AbstractSet.equals(Set.scala:51)
        at fs2.concurrent.Topic$.$anonfun$apply$1(Topic.scala:89)
        at fs2.concurrent.Topic$.$anonfun$apply$1$adapted(Topic.scala:89)
        at fs2.concurrent.Topic$$$Lambda$4862/705260399.apply(Unknown Source)
        at cats.kernel.Eq$$anon$106.eqv(Eq.scala:85)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:662)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:637)
        at fs2.concurrent.PubSub$.go$1(PubSub.scala:123)
        at fs2.concurrent.PubSub$.consumeSubscribers$1(PubSub.scala:132)
        at fs2.concurrent.PubSub$.loop$1(PubSub.scala:171)
        at fs2.concurrent.PubSub$.$anonfun$apply$13(PubSub.scala:196)
        at fs2.concurrent.PubSub$$$Lambda$5107/462313517.apply(Unknown Source)
        at cats.effect.concurrent.Ref$SyncRef.spin$1(Ref.scala:249)
        at cats.effect.concurrent.Ref$SyncRef.$anonfun$modify$1(Ref.scala:253)

As the number of subscribers grows, more and more threads end up doing this most of the time, until complete starvation (in our case, kubernetes was restarting the service since the healthcheck endpoint would become unresponsive)

SystemFw avatar Jan 29 '19 12:01 SystemFw

We have the same issue in our team. In our case we have an income stream with ~ 1k elements per second and ~100 subscribers who just put every elements to the cache. Simple example can looks like:

object Example extends IOApp {
  def doSomething(x: Any) = IO(println(x))
  def arbitraryData = IO(Random.nextInt(100))

  override def run(args: List[String]): IO[ExitCode] = {
    for {
      // topic ← CustomTopic[IO, Int] <- will be used in comparison with default implementation
      topic ← fs2.concurrent.Topic[IO, Int](Random.nextInt(100))
      subscriptions = (1 to 100).map(
        i ⇒ topic.subscribe(1).collect({ case d if d == i ⇒ d }).evalMap(doSomething)
      )
      _ ← (Stream.fixedDelay(10.millis).evalMap(_ ⇒ arbitraryData).to(topic.publish) merge Stream
        .emits(subscriptions)
        .parJoinUnbounded).compile.drain
    } yield {
      ExitCode.Success
    }
  }
}

We tried to solve it by using implementation of the Topic with fs2.concurrent.Queue (not full example):

class CustomTopic[F[_], A](cache: Ref[F, List[Queue[F, A]]])(implicit C: Concurrent[F])
    extends Topic[F, A] {
  override def publish: Sink[F, A] =
    _.evalMap(publish1)

  override def publish1(a: A): F[Unit] =
    cache.get.flatMap { subscribers ⇒
      subscribers.traverse(_.enqueue1(a)).void
    }

  override def subscribe(maxQueued: Int): Stream[F, A] =
    emptyQueue(maxQueued).evalTap(q ⇒ cache.update(_ :+ q)).flatMap(_.dequeue)

  private def emptyQueue(maxQueued: Int): Stream[F, fs2.concurrent.Queue[F, A]] = {
    Stream.bracket(fs2.concurrent.Queue.bounded[F, A](maxQueued))(
      queue ⇒ cache.update(_.filter(_ ne queue))
    )
  }
}

object CustomTopic {
  def apply[F[_], A](implicit C: Concurrent[F]): F[CustomTopic[F, A]] =
    Ref.of[F, List[fs2.concurrent.Queue[F, A]]](List.empty).map(new CustomTopic(_))
}

When we use our custom topic in the example above then we have a very different picture in the profiler: Results with default topic: results with default Topic Result with our implementation: results with custom Topic

UPD: I created PR to illustrate our full solution: #1407

vladimir-popov avatar Jan 29 '19 17:01 vladimir-popov

Besides it we noticed that current implementation of the Topic in our case generates a lot of living instances of the immutable.Queue and the PubSub.Strategy.Inspectable.State: screenshot 2019-01-29 at 20 05 28

vladimir-popov avatar Jan 29 '19 17:01 vladimir-popov

@SystemFw the current implementation of topic won't perform super-well on heavy contention between busy publisher and lot of subscribers. The older implemenatiton was slightly better, specifically due the fact that each subscriber has its own queue where things were fed to, that was in fact registered with the state, and set during publish.

The observation of lot of queues and states is correct, as what you see is a lot of spins and attempts to fight for setting that single ref, which has own single state. Simply too many subscribers :-)

I think there is a workaround to reimplement Topic with slightly modified State and PubSub, again part of registration will be actually the queue of the subscriber.

Another alternatives with current implementation:

Change the topic to deeper hierarchy, i.e. 1 topic handling 16 subs, and these will handle each lets say 16 subs will give you 6 pow 16 subscribers with really low contention.

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

pchlupacek avatar Jan 29 '19 17:01 pchlupacek

@vladimir-popov I am not surprised, the current implementation of topic will perfrom poorly under your load. This is sort of worst case for the current implementation. However I am quite confident topic may be improved with PubSub to perform similarly like your solution.

pchlupacek avatar Jan 29 '19 17:01 pchlupacek

Also @SystemFw, @vladimir-popov perhaps if we just change the internals of how pubsub works, perhaps we get a better performance w/o changing anything in state logic. Essentially we could try instead of using the ref to store state, to use Lock, and then use guarded var for state. That will cause no waste in state calculations and could be perhaps almost in pair for good paths of pubsub.

pchlupacek avatar Jan 29 '19 17:01 pchlupacek

Actually I'm not sure the problem is contention on Ref, and I don't even know whether it's a problem with PubSub in general, necessarily. I can't be sure cause I don't have an exact reproduction yet, but the initial hypothesis is just that relying on Set comparison like the Strategy for Topic does just wastes a lot of CPU cycles.

The older implemenatiton was slightly better,

Well, the thing is that in my scenario the old implementation just works with no issues, whereas the new one chokes and causes the service to restart continuously, so that's more than slightly better :P

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

Well, that's a hot fix that I need to do right now, otherwise everything just chokes, but ideally I would want something based on PubSub, but more scalable, so I agree with you on that. Whether that is striping with 16 subs or something else, I guess we will have to see, but for now I just wanted to signal this, given that it's effectively a regression compared to the old implementation

SystemFw avatar Jan 29 '19 18:01 SystemFw

@SystemFw I think the issue, this demonstrates here is that you have so many concurrent attempts to CAS the State, so the overhead of this is killing the whole thing.

I hope to squeeze this over weekend to ake a look on this, but I feel these are two approach that may fix this

  • instead of using ref to hold state, we may just use it to sort of lock the subscribers, but actually computation to modify state will be always performed only once and only by winner
  • or we could sort of revert back to queue per subscriber, but sill use pub sub. That will be little more woodoo on Topic implementation, but I am quite sure performance will be almost identical to old Topic.

pchlupacek avatar Jan 30 '19 10:01 pchlupacek

@SystemFw, @vladimir-popov, I have diven a bit into it, and wrote a simple bechmark, that was run with 8 threads, 500 messages in topic and 1 to 128 subscribers.

Code is here(https://gist.github.com/pchlupacek/f033993302ee2a741a6473286306c9b3), the results are below:

[info] Benchmark                          Mode  Cnt      Score       Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3      1.124 ±     0.059  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3      5.128 ±     0.905  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3     20.289 ±     2.331  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3     27.693 ±     7.103  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3    200.506 ±   106.002  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   1965.638 ±   316.895  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  12560.988 ± 17843.409  ms/op

From the code you see there are no operations on messages, so this shall measure overhead only.

Hence this is run on 6 core / 12 hyper machine but on 8 jvm threads I would expect the sweet point to be somewhere at 8-16 subscribers. For the larger amount of subscribers, in ideal state the execution shall be something like (Nx/8)*T8, where Nx is number of subscribers to execute, and T8 is the time for 8 subscribers.

I do not understand that much the Error ranges, they seem a bit off for me, and when I compared the individual results they were usually +/- 5% range, not like the 128 shows to be more than 100%. So I suspect this will be something wrong with jmh, or my lack of understanding it.

As you see the results are somewhat expected for 1-16 subscribers, where there is just different from setting up the program and subscribers. Interesting is the difference between 8-16 subscribers, which is getting close to ideal.

Anything after that is complete disaster, and clearly demonstrates the issue with concurrency. I think it confirm my original suspicion of single point contention problem.

I will follow up with more experiments to see how we can avoid the problem.

pchlupacek avatar Feb 03 '19 15:02 pchlupacek

Thanks for looking into this @pchlupacek :)

FYI, the old implementation in the service I mentioned above is handling 1400 subs at peak load without breaking a sweat, on a 2 CPU machine which also has some blocking code on it

SystemFw avatar Feb 03 '19 15:02 SystemFw

@SystemFw I am not surprised that old implementation performs much better. I would say the only limit is the RAM, and obvioulsy more subscribers -> more time to publish one element. Problem of that implementation is that it is highly specialised and yours only control over the OOM in case of slow subscriber is to use bounded queues.

pchlupacek avatar Feb 03 '19 15:02 pchlupacek

Ok, so I have changed the PubSub to use Lock + Var and this is the result :

[info] Benchmark                          Mode  Cnt     Score    Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3     7.103 ±  0.588  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3    22.579 ±  2.968  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3    42.171 ±  1.685  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3    83.709 ± 13.964  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3   187.778 ± 11.095  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   438.190 ± 36.365  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  1192.197 ± 94.964  ms/op

Now this is much better, however still you see that we are blocked by the single contention, so that why you see almost linear time, instead of using parallelism. Also, you will see that this uses only 1.5 cores, clearly showing that the bottleneck is the single ref, however, now, there is no wasted CPU cycles, overall resulting in much better performance.

pchlupacek avatar Feb 03 '19 15:02 pchlupacek

The non lock implementation has the advantage of being...well, lock free, so I'd prefer it if possible. Are there any other strategies to improve this case, for example striping the topic as you suggested above to reduce contention?

I really like the abstraction given by PubSub, however the old Topic was both lock-free and had lower contention, so I feel that anything we do should be proven to be better than the old implementation

SystemFw avatar Feb 03 '19 16:02 SystemFw

@SystemFw this is still lock free. It uses semaphore. So it is semantically locking.

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

pchlupacek avatar Feb 03 '19 16:02 pchlupacek

If it uses semaphore it's not blocking, but it's not lock-free either: if the thread holding the lock cannot proceed you get stuck all the same. More seriously, in the current cancelation model writing safe code with semaphore is hard or even impossible in some cases (that is a problem per se, but for another day).

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

You're probably right, it still has a single Ref, but it's spending less time on it because then you go on the queues. The thing is that one publisher and many subscribers is like the perfect Topic use case, and we're talking an order of magnitude better, so that's great (and after all, you wrote both :P )

SystemFw avatar Feb 03 '19 16:02 SystemFw

@SystemFw yes, and I think there is a solution with this. Event with Ref implementation. I'll update you. Essentially we use PubSub to implement topic in old way :-)

pchlupacek avatar Feb 03 '19 16:02 pchlupacek

Essentially we use PubSub to implement topic in old way

that would be ideal :)

SystemFw avatar Feb 03 '19 16:02 SystemFw

@SystemFw I think current pubsub signatures doesn't allow for the more concurrent subscriber behaviour. I have some ideas on improving it.

I think, that the improved implementation with locking reduces significantly contention. However as you have pointed out still, the one element to pub/sub is taking about 2*N+1 exclusive accesses on single contention ref, whereas the former implementation had only one exclusive access, and N exclusive accesses distributed over individual subscribers, resulting in much better contention control.

I need some time to think how to improve pubsub to have same performance characteristics, whilst not losing the features of it as we have today.

pchlupacek avatar Feb 03 '19 17:02 pchlupacek

@SystemFw I went ahead a bit and just created the benchmark of the queue-based topic implementation. I took liberty of using the code submitted by @vladimir-popov, as a reference point. Alos I have added 1k, and 2k subscriber benhcmarks.

These are the results :

[info] Benchmark                           Mode  Cnt     Score      Error  Units
[info] TopicBenchmark.topicBroadcast_1     avgt    3     5.468 ±    1.231  ms/op
[info] TopicBenchmark.topicBroadcast_4     avgt    3    13.200 ±    1.766  ms/op
[info] TopicBenchmark.topicBroadcast_8     avgt    3    23.985 ±    2.859  ms/op
[info] TopicBenchmark.topicBroadcast_16    avgt    3    41.069 ±    7.160  ms/op
[info] TopicBenchmark.topicBroadcast_32    avgt    3    78.257 ±   16.757  ms/op
[info] TopicBenchmark.topicBroadcast_64    avgt    3   137.253 ±   80.961  ms/op
[info] TopicBenchmark.topicBroadcast_128   avgt    3   276.561 ±   10.986  ms/op
[info] TopicBenchmark.topicBroadcast_512   avgt    3  1198.688 ±  450.535  ms/op
[info] TopicBenchmark.topicBroadcast_1024  avgt    3  2518.192 ± 1873.308  ms/op
[info] TopicBenchmark.topicBroadcast_2048  avgt    3  5719.329 ± 1568.653  ms/op

One interesting observation I made is that neither this implementation was able to utilize cores at full. Also from the performance standpoint, it is interesting that this is not that better compare to semantically locking solution. That means there is good chance we can get in pair performance with PubSub

pchlupacek avatar Feb 10 '19 07:02 pchlupacek

update: Somehow current implementation when Semantic lock is put in place is always using single thread. This should not be the case, as the F.start is used on completion of subscriber. I am investigating this more.

pchlupacek avatar Feb 10 '19 08:02 pchlupacek

update: these are top contributors to running a lot of subscribers under pub/sub:

screenshot 2019-02-10 at 09 32 30

Neither of them is related to concurrency. My theory is that cost of looking up the subscriber status in map is extraordinaly high.

pchlupacek avatar Feb 10 '19 08:02 pchlupacek

Just a quick update on this one. I am still experimenting with the ways how to tackle on this one. It seems that i may need to touch pubsub strategy signature a bit, but not yet sure 100% how.

Not sure about urgency of this one, but I am targeting this to be resolved until next major release

In the menarime there is workaround with manual implementatio.

Let me know if that works with you.

pchlupacek avatar Mar 27 '19 09:03 pchlupacek

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

SystemFw avatar Mar 27 '19 09:03 SystemFw

@pchlupacek Do you think this is solvable in next week or so? We'll have a minor API breakage release in 1.1 where you could change PubSub signature if needed.

mpilquist avatar Jun 07 '19 17:06 mpilquist

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

yes this would be ideal. @SystemFw any pointers? I'd love to find a workaround while this gets fixed properly

epifab avatar Jan 28 '21 14:01 epifab

I think the latest commit with the old implementation is this one https://github.com/typelevel/fs2/blob/4ddd75a2dc032b7604dc1205c86d7d6adc993859/core/shared/src/main/scala/fs2/concurrent/Topic.scala

Which should be mostly source compatible. Are you hitting a problem because of Topic under load?

SystemFw avatar Jan 28 '21 14:01 SystemFw

@SystemFw awesome, thanks a lot! I wasn't 100% sure that the poor performances were indeed caused by Topic under heavy load, but I am indeed now. Blindly reverting to the old implementation made a huge impact, talking about 2x or 3x times better...

epifab avatar Jan 28 '21 15:01 epifab

actually, the performance boost is even more impressive than that. I think I can handle approximately 5x more messages. what I didn't discover yet though are the implications of this workaround

epifab avatar Jan 28 '21 16:01 epifab

FYI in fs2 3.0 we have reinstated the implementation strategy of the old version of Topic

SystemFw avatar Mar 02 '21 02:03 SystemFw

As far as I understand, this issue is now present only on series/2.5.x?

vasilmkd avatar May 15 '21 01:05 vasilmkd