cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

Implement metrics for external queue

Open Atharva-Kanherkar opened this issue 8 months ago • 24 comments
trafficstars

WorkStealingThreadPool Metrics Implementation

This PR implements metrics for tracking singleton and batch task submissions to the external queue in WorkStealingThreadPool, as in issue #4269 .

Changes Made:

  1. Added Atomic Counters to WorkStealingThreadPool:

    • singletonsSubmittedCount - tracks total singleton task submissions
    • singletonsPresentCount - tracks current singleton tasks in the queue
    • batchesSubmittedCount - tracks total batch task submissions
    • batchesPresentCount - tracks current batch tasks in the queue
  2. Updated Submission Logic:

    • Changed submitToExternalQueue() to increment counters when tasks are added
    • Changed pollTask() to decrement present counters when tasks are polled
  3. Added Accessor Methods:

    • getSingletonsSubmittedCount() - returns total singleton submissions
    • getSingletonsPresentCount() - returns current singletons in queue
    • getBatchesSubmittedCount() - returns total batch submissions
    • getBatchesPresentCount() - returns current batches in queue
    • logQueueMetrics() - Helper for logging all metrics
  4. Added Verification Test: I made a test (which i am not sure about, but It did work -->) image

    • Submits singleton tasks and verifies the counter increases
    • Triggers batch creation through local queue overflow
    • Verifies both types of metrics track correctly
  • Singleton submissions: Created 10,000 tasks → Counter increased by exactly 10,000 ✓
  • Batch submissions: Generated queue overflow → Counter increased by 2,053 ✓

This might confirm that the implementation works as fine as it should be. The code too compiles without any errors, at least in my terminal :) The test is included in the PR for reference, showing how to access these metrics from client code.

Atharva-Kanherkar avatar Mar 05 '25 08:03 Atharva-Kanherkar

Hey @armanbilge! Please ignore some of the previous commits as i got myself into a git command hell :p , Anyways, the PR is now ready to review. Please only check the recent commit. It will clearly show what the code I have changed. Thanks :)

Atharva-Kanherkar avatar Mar 05 '25 11:03 Atharva-Kanherkar

Thanks for your work on this!

May I suggest an alternative approach? I think we should make ScalQueue no longer generic. Internally, it can be backed by queues of AnyRef. Then, we can adjust its API to be:

def offer(a: Runnable, random: ThreadLocalRandom): Unit
def offer(a: Array[Runnable], random: ThreadLocalRandom): Unit

Then, we can keep the counters inside of ScalQueue, and increment the appropriate counters in the specific offer methods.

poll will have to return AnyRef and use isInstanceOf checks to decrement the appropriate counter.

For sure. Give me a day, i will look into it!

Atharva-Kanherkar avatar Mar 06 '25 20:03 Atharva-Kanherkar

Hey @armanbilge :) I have added the requested changes. Please review! If there is anything to fix again, please let me know!

Atharva-Kanherkar avatar Mar 07 '25 21:03 Atharva-Kanherkar

Thanks for making those changes! It looks like the code doesn't compile currently.

armanbilge avatar Mar 07 '25 23:03 armanbilge

Thanks for making those changes! It looks like the code doesn't compile currently.

Yes! I am sorry, I actually slept after pushing the changes and forgot fixing the issues. I will be doing it right away! ~Atharva

Atharva-Kanherkar avatar Mar 08 '25 07:03 Atharva-Kanherkar

@armanbilge Actually there were so many references and inter dependencies of how scalqueue was generic, that is why this happened. It seems like there is a lot of code which is written kept in mind that it is generic. However, Ill fix it soon:)

Atharva-Kanherkar avatar Mar 08 '25 07:03 Atharva-Kanherkar

Update : image I guess the code works as we wanted it to work! There are still warnings and formatting issues, which can be fixed promptly, but the functionality is working fine now.

Atharva-Kanherkar avatar Mar 08 '25 08:03 Atharva-Kanherkar

Let's change this to a ScalQueue[Runnable], I think that will help us keep track of singletons vs batches.

https://github.com/typelevel/cats-effect/blob/7102a235fd8beeeb8b13131552706c4127b0fb7a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala#L125-L126

With this change, we'll also need to update related parts of the code:

In the liveTraces() method (line ~541): In the stealFromOtherWorkerThread method (line ~317): Is it okay if I change these methods too? For the record, I have changed it to Runnable.

Atharva-Kanherkar avatar Mar 10 '25 06:03 Atharva-Kanherkar

Is it okay if I change these methods too? For the record, I have changed it to Runnable.

Yes, that makes sense!

armanbilge avatar Mar 10 '25 07:03 armanbilge

Hi @armanbilge,

I've implemented the changes you suggested:

  1. Added a new ExternalQueueMetrics trait in the metrics package
  2. Added the externalQueue property to WorkStealingThreadPoolMetrics
  3. Removed the metrics methods from WorkStealingThreadPool
  4. Updated offerAll in ScalQueue to track as individual submissions
  5. Removed the unused methods (getPool(), offerBatchToExternalQueue, etc.)

However, I'm running into some type mismatch errors:

  1. When trying to change externalQueue from ScalQueue[AnyRef] to ScalQueue[Runnable]: [error] found: cats.effect.unsafe.ScalQueue[Runnable] [error] required: cats.effect.unsafe.ScalQueue[AnyRef]
  2. In liveTraces() method when processing elements from the queue: [error] found: AnyRef [error] required: Runnable

I see two possible solutions:

  1. Keep externalQueue as ScalQueue[AnyRef] and continue to do type checking
  2. Make ScalQueue covariant in its type parameter (define as ScalQueue[+A <: AnyRef])

Which approach would you prefer? The covariance change would be larger but could simplify the code overall.

Thank you!

Atharva-Kanherkar avatar Mar 10 '25 07:03 Atharva-Kanherkar

I think we should use ScalQueue[Runnable], but we don't need to make any covariance changes. In all the places where there is an type mismatch, that means we should change the type :) if that still doesn't make sense, push your code up and I can help you work through the errors.

armanbilge avatar Mar 10 '25 07:03 armanbilge

I think we should use ScalQueue[Runnable], but we don't need to make any covariance changes. In all the places where there is an type mismatch, that means we should change the type :) if that still doesn't make sense, push your code up and I can help you work through the errors.

I'll push my code up because this got a little, a little only;) confusing

Atharva-Kanherkar avatar Mar 10 '25 08:03 Atharva-Kanherkar

@Atharva-Kanherkar I think you are getting compile errors, because you didn't change it in WorkerThread.

https://github.com/typelevel/cats-effect/blob/1f084bf6aaafd179fe64b4d004521c91e81e2eb4/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala#L54

You may need to change it in other places as well.

armanbilge avatar Mar 10 '25 18:03 armanbilge

@Atharva-Kanherkar I think you are getting compile errors, because you didn't change it in WorkerThread.

https://github.com/typelevel/cats-effect/blob/1f084bf6aaafd179fe64b4d004521c91e81e2eb4/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala#L54

You may need to change it in other places as well.

Yes, but this would require changing many other parts of the code, including method signatures and usage patterns, and could introduce more bugs. Are you sure we can proceed with this? To maintain compatibility with the existing codebase, which uses ScalQueue[AnyRef] consistently throughout. we can't change just one part to use ScalQueue[Runnable] without changing all the related parts, which would be a much larger and riskier change. I dont have a problem doing that, Just hoping to get a green flag on this one :)

Atharva-Kanherkar avatar Mar 10 '25 19:03 Atharva-Kanherkar

Yes, please go ahead and make the change 🙂

armanbilge avatar Mar 10 '25 21:03 armanbilge

@armanbilge I've completed the changes to make ScalQueue non-generic with a type-safe API that distinguishes between singleton and batch tasks:

  1. Removed the type parameter from ScalQueue, making it non-generic

    • Changed from ScalQueue[A <: AnyRef] to just ScalQueue
  2. Updated internal implementation to use AnyRef for heterogeneous storage:

    • private[this] val queues: Array[ConcurrentLinkedQueue[AnyRef]]
  3. Created a type-safe public API with specialized methods:

    • offer(a: Runnable, random: ThreadLocalRandom): Unit - for singleton tasks
    • offerBatch(batch: Array[Runnable], random: ThreadLocalRandom): Unit - for batch tasks
    • offerAll(as: Array[Runnable], random: ThreadLocalRandom): Unit - for multiple singletons
  4. Added metrics tracking in appropriate methods:

    • In offer(): increment singletonsSubmittedCount & singletonsPresentCount
    • In offerBatch(): increment batchesSubmittedCount & batchesPresentCount
    • In poll(): decrement the appropriate counter based on element type
  5. Updated all references to ScalQueue throughout the codebase:

    • WorkStealingThreadPool.scala
    • WorkerThread.scala
    • LocalQueue.scala
    • ScalQueueBenchmark.scala
    • WorkStealingThreadPoolMetrics.scala
  6. Added proper type checking and casting in code that uses ScalQueue:

    • element.asInstanceOf[Runnable] for singleton tasks
    • element.asInstanceOf[Array[Runnable]] for batch tasks

This keeps all the performance-focused design choices intact while improving type safety at the API level. 🚀 Let me know if you have any thoughts! If I have forgotten something or something is wrong, please let me know!

Atharva-Kanherkar avatar Mar 12 '25 09:03 Atharva-Kanherkar

As I am running into the issues, i'd like to document them for future ease.

The errors specifically mention two methods:

offer(java.lang.Object, ThreadLocalRandom) changed to offer(java.lang.Runnable, ThreadLocalRandom) ``offerAll(Array[java.lang.Object], ThreadLocalRandom) changed to offerAll(Array[java.lang.Runnable], ThreadLocalRandom)

Why I think, these changes Are important :

  • Type Safety: The original design used ScalQueue[AnyRef] which wasn't type-safe. By making the API methods explicitly accept Runnable and Array[Runnable] , we've improved type safety.

  • Clear API Distinction: The changes make it explicit what types of elements can be enqueued:

offer(Runnable, ...) - for individual tasks offerBatch(Array[Runnable], ...) - for batch tasks

  • Performance Focus: As @armanbilge said, In Cats Effect's internal runtime, we often make 'non-idiomatic' choices to optimize for performance. The new design maintains this by:
  1. Using AnyRef internally for heterogeneous storage (avoiding wrappers)
  2. Keeping direct access to elements without additional indirection
  3. Avoiding Either or wrapper types that would require extra allocations

Atharva-Kanherkar avatar Mar 12 '25 11:03 Atharva-Kanherkar

@armanbilge Does it seem okay now, The CI Is green, tell if you need any more changes!:)

Atharva-Kanherkar avatar Mar 18 '25 18:03 Atharva-Kanherkar

Hello @armanbilge ! I have added the changes! Please review :)

Atharva-Kanherkar avatar Apr 03 '25 11:04 Atharva-Kanherkar

Hey @armanbilge! Just wanted to follow up on this PR — marked it as ready a couple days ago. No rush at all, but when you get a chance, your review would be super appreciated! 🙌

Atharva-Kanherkar avatar Apr 10 '25 20:04 Atharva-Kanherkar

hey @armanbilge ! do you need anything else from my side! :) happy to help..

Atharva-Kanherkar avatar Apr 13 '25 19:04 Atharva-Kanherkar

@djspiewak do you have any performance concerns here, should we run some benchmarks?

armanbilge avatar Apr 14 '25 16:04 armanbilge

I would definitely like to run some benchmarks on this one. I'll review the diff more closely though.

djspiewak avatar Apr 14 '25 16:04 djspiewak

Hi @armanbilge @djspiewak! Hope you're doing well. Just wanted to follow up on this—any updates by chance?

Atharva-Kanherkar avatar Apr 20 '25 09:04 Atharva-Kanherkar

Finally got to the benchmarks. Looks like there is a notable impact. Let's see if we can do some micro-optimization to gain back those millis.

Before

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt    Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10  288.943 ± 1.853    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10   48.077 ± 0.019    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    4.473 ± 0.033  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    6.820 ± 0.147  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  700.437 ± 3.968  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  597.162 ± 0.807  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    8.120 ± 0.346  ops/min

After

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt    Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10  289.627 ± 1.154    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10   48.061 ± 0.012    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    4.427 ± 0.009  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    6.713 ± 0.132  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  626.559 ± 3.414  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  579.224 ± 2.218  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    7.908 ± 0.388  ops/min

djspiewak avatar Jul 23 '25 21:07 djspiewak

@djspiewak thank you so much for taking your time to review this!

Atharva-Kanherkar avatar Jul 23 '25 21:07 Atharva-Kanherkar

Finally got to the benchmarks. Looks like there is a notable impact. Let's see if we can do some micro-optimization to gain back those millis.

Before

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt    Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10  288.943 ± 1.853    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10   48.077 ± 0.019    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    4.473 ± 0.033  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    6.820 ± 0.147  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  700.437 ± 3.968  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  597.162 ± 0.807  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    8.120 ± 0.346  ops/min

After

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt    Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10  289.627 ± 1.154    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10   48.061 ± 0.012    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    4.427 ± 0.009  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    6.713 ± 0.132  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  626.559 ± 3.414  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  579.224 ± 2.218  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    7.908 ± 0.388  ops/min

i see. could you please elaborate a little bit on how we could do that?

Atharva-Kanherkar avatar Jul 23 '25 21:07 Atharva-Kanherkar

No magic bullet really! We just have to experiment and see. The most expensive things you added have to do with the AtomicLongs, so I would start by commenting out the mutations in the hot path and see if that changes the benchmark results.

Or rather, I would start by trying to replicate my benchmark results locally. benchmarks / Jmh / run -f 1 -wi 10 -i 10 cats.effect.benchmarks.WorkStealingBenchmark is the relevant one. See if you can reproduce the statistically significant difference before and after your change, then see if you can manipulate it by commenting out lines. If we can figure out what's especially expensive, we can start thinking of creative ways to do it differently so it's less expensive!

djspiewak avatar Jul 23 '25 22:07 djspiewak