cats-effect
cats-effect copied to clipboard
Implement metrics for external queue
WorkStealingThreadPool Metrics Implementation
This PR implements metrics for tracking singleton and batch task submissions to the external queue in WorkStealingThreadPool, as in issue #4269 .
Changes Made:
-
Added Atomic Counters to WorkStealingThreadPool:
singletonsSubmittedCount- tracks total singleton task submissionssingletonsPresentCount- tracks current singleton tasks in the queuebatchesSubmittedCount- tracks total batch task submissionsbatchesPresentCount- tracks current batch tasks in the queue
-
Updated Submission Logic:
- Changed
submitToExternalQueue()to increment counters when tasks are added - Changed
pollTask()to decrement present counters when tasks are polled
- Changed
-
Added Accessor Methods:
getSingletonsSubmittedCount()- returns total singleton submissionsgetSingletonsPresentCount()- returns current singletons in queuegetBatchesSubmittedCount()- returns total batch submissionsgetBatchesPresentCount()- returns current batches in queuelogQueueMetrics()- Helper for logging all metrics
-
Added Verification Test: I made a test (which i am not sure about, but It did work -->)
- Submits singleton tasks and verifies the counter increases
- Triggers batch creation through local queue overflow
- Verifies both types of metrics track correctly
- Singleton submissions: Created 10,000 tasks → Counter increased by exactly 10,000 ✓
- Batch submissions: Generated queue overflow → Counter increased by 2,053 ✓
This might confirm that the implementation works as fine as it should be. The code too compiles without any errors, at least in my terminal :) The test is included in the PR for reference, showing how to access these metrics from client code.
Hey @armanbilge! Please ignore some of the previous commits as i got myself into a git command hell :p , Anyways, the PR is now ready to review. Please only check the recent commit. It will clearly show what the code I have changed. Thanks :)
Thanks for your work on this!
May I suggest an alternative approach? I think we should make
ScalQueueno longer generic. Internally, it can be backed by queues ofAnyRef. Then, we can adjust its API to be:def offer(a: Runnable, random: ThreadLocalRandom): Unit def offer(a: Array[Runnable], random: ThreadLocalRandom): UnitThen, we can keep the counters inside of
ScalQueue, and increment the appropriate counters in the specificoffermethods.
pollwill have to returnAnyRefand useisInstanceOfchecks to decrement the appropriate counter.
For sure. Give me a day, i will look into it!
Hey @armanbilge :) I have added the requested changes. Please review! If there is anything to fix again, please let me know!
Thanks for making those changes! It looks like the code doesn't compile currently.
Thanks for making those changes! It looks like the code doesn't compile currently.
Yes! I am sorry, I actually slept after pushing the changes and forgot fixing the issues. I will be doing it right away! ~Atharva
@armanbilge Actually there were so many references and inter dependencies of how scalqueue was generic, that is why this happened. It seems like there is a lot of code which is written kept in mind that it is generic. However, Ill fix it soon:)
Update :
I guess the code works as we wanted it to work! There are still warnings and formatting issues, which can be fixed promptly, but the functionality is working fine now.
Let's change this to a
ScalQueue[Runnable], I think that will help us keep track of singletons vs batches.https://github.com/typelevel/cats-effect/blob/7102a235fd8beeeb8b13131552706c4127b0fb7a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala#L125-L126
With this change, we'll also need to update related parts of the code:
In the liveTraces() method (line ~541):
In the stealFromOtherWorkerThread method (line ~317):
Is it okay if I change these methods too? For the record, I have changed it to Runnable.
Is it okay if I change these methods too? For the record, I have changed it to Runnable.
Yes, that makes sense!
Hi @armanbilge,
I've implemented the changes you suggested:
- Added a new
ExternalQueueMetricstrait in the metrics package - Added the
externalQueueproperty toWorkStealingThreadPoolMetrics - Removed the metrics methods from
WorkStealingThreadPool - Updated
offerAllinScalQueueto track as individual submissions - Removed the unused methods (
getPool(),offerBatchToExternalQueue, etc.)
However, I'm running into some type mismatch errors:
- When trying to change
externalQueuefromScalQueue[AnyRef]toScalQueue[Runnable]:[error] found: cats.effect.unsafe.ScalQueue[Runnable] [error] required: cats.effect.unsafe.ScalQueue[AnyRef] - In
liveTraces()method when processing elements from the queue:[error] found: AnyRef [error] required: Runnable
I see two possible solutions:
- Keep
externalQueueasScalQueue[AnyRef]and continue to do type checking - Make
ScalQueuecovariant in its type parameter (define asScalQueue[+A <: AnyRef])
Which approach would you prefer? The covariance change would be larger but could simplify the code overall.
Thank you!
I think we should use ScalQueue[Runnable], but we don't need to make any covariance changes. In all the places where there is an type mismatch, that means we should change the type :) if that still doesn't make sense, push your code up and I can help you work through the errors.
I think we should use
ScalQueue[Runnable], but we don't need to make any covariance changes. In all the places where there is an type mismatch, that means we should change the type :) if that still doesn't make sense, push your code up and I can help you work through the errors.
I'll push my code up because this got a little, a little only;) confusing
@Atharva-Kanherkar I think you are getting compile errors, because you didn't change it in WorkerThread.
https://github.com/typelevel/cats-effect/blob/1f084bf6aaafd179fe64b4d004521c91e81e2eb4/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala#L54
You may need to change it in other places as well.
@Atharva-Kanherkar I think you are getting compile errors, because you didn't change it in
WorkerThread.https://github.com/typelevel/cats-effect/blob/1f084bf6aaafd179fe64b4d004521c91e81e2eb4/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala#L54
You may need to change it in other places as well.
Yes, but this would require changing many other parts of the code, including method signatures and usage patterns, and could introduce more bugs.
Are you sure we can proceed with this?
To maintain compatibility with the existing codebase, which uses ScalQueue[AnyRef] consistently throughout. we can't change just one part to use ScalQueue[Runnable] without changing all the related parts, which would be a much larger and riskier change.
I dont have a problem doing that, Just hoping to get a green flag on this one :)
Yes, please go ahead and make the change 🙂
@armanbilge I've completed the changes to make ScalQueue non-generic with a type-safe API that distinguishes between singleton and batch tasks:
-
Removed the type parameter from ScalQueue, making it non-generic
- Changed from
ScalQueue[A <: AnyRef]to justScalQueue
- Changed from
-
Updated internal implementation to use
AnyReffor heterogeneous storage:private[this] val queues: Array[ConcurrentLinkedQueue[AnyRef]]
-
Created a type-safe public API with specialized methods:
offer(a: Runnable, random: ThreadLocalRandom): Unit- for singleton tasksofferBatch(batch: Array[Runnable], random: ThreadLocalRandom): Unit- for batch tasksofferAll(as: Array[Runnable], random: ThreadLocalRandom): Unit- for multiple singletons
-
Added metrics tracking in appropriate methods:
- In
offer():incrementsingletonsSubmittedCount&singletonsPresentCount - In
offerBatch():incrementbatchesSubmittedCount&batchesPresentCount - In
poll():decrement the appropriate counter based on element type
- In
-
Updated all references to
ScalQueuethroughout the codebase:WorkStealingThreadPool.scalaWorkerThread.scalaLocalQueue.scalaScalQueueBenchmark.scalaWorkStealingThreadPoolMetrics.scala
-
Added proper type checking and casting in code that uses
ScalQueue:element.asInstanceOf[Runnable]for singleton taskselement.asInstanceOf[Array[Runnable]]for batch tasks
This keeps all the performance-focused design choices intact while improving type safety at the API level. 🚀 Let me know if you have any thoughts! If I have forgotten something or something is wrong, please let me know!
As I am running into the issues, i'd like to document them for future ease.
The errors specifically mention two methods:
offer(java.lang.Object, ThreadLocalRandom) changed to offer(java.lang.Runnable, ThreadLocalRandom) ``offerAll(Array[java.lang.Object], ThreadLocalRandom) changed to offerAll(Array[java.lang.Runnable], ThreadLocalRandom)
Why I think, these changes Are important :
-
Type Safety: The original design used
ScalQueue[AnyRef]which wasn't type-safe. By making the API methods explicitly acceptRunnableandArray[Runnable], we've improved type safety. -
Clear API Distinction: The changes make it explicit what types of elements can be enqueued:
offer(Runnable, ...) - for individual tasks
offerBatch(Array[Runnable], ...) - for batch tasks
- Performance Focus: As @armanbilge said, In Cats Effect's internal runtime, we often make 'non-idiomatic' choices to optimize for performance. The new design maintains this by:
- Using
AnyRefinternally for heterogeneous storage (avoiding wrappers) - Keeping direct access to elements without additional indirection
- Avoiding
Eitheror wrapper types that would require extra allocations
@armanbilge Does it seem okay now, The CI Is green, tell if you need any more changes!:)
Hello @armanbilge ! I have added the changes! Please review :)
Hey @armanbilge! Just wanted to follow up on this PR — marked it as ready a couple days ago. No rush at all, but when you get a chance, your review would be super appreciated! 🙌
hey @armanbilge ! do you need anything else from my side! :) happy to help..
@djspiewak do you have any performance concerns here, should we run some benchmarks?
I would definitely like to run some benchmarks on this one. I'll review the diff more closely though.
Hi @armanbilge @djspiewak! Hope you're doing well. Just wanted to follow up on this—any updates by chance?
Finally got to the benchmarks. Looks like there is a notable impact. Let's see if we can do some micro-optimization to gain back those millis.
Before
[info] Benchmark (cpuTokens) (size) Mode Cnt Score Error Units
[info] ParallelBenchmark.parTraverse 10000 1000 thrpt 10 288.943 ± 1.853 ops/s
[info] ParallelBenchmark.traverse 10000 1000 thrpt 10 48.077 ± 0.019 ops/s
[info] WorkStealingBenchmark.alloc N/A 1000000 thrpt 10 4.473 ± 0.033 ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark N/A 1000000 thrpt 10 6.820 ± 0.147 ops/min
[info] WorkStealingBenchmark.runnableScheduling N/A 1000000 thrpt 10 700.437 ± 3.968 ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal N/A 1000000 thrpt 10 597.162 ± 0.807 ops/min
[info] WorkStealingBenchmark.scheduling N/A 1000000 thrpt 10 8.120 ± 0.346 ops/min
After
[info] Benchmark (cpuTokens) (size) Mode Cnt Score Error Units
[info] ParallelBenchmark.parTraverse 10000 1000 thrpt 10 289.627 ± 1.154 ops/s
[info] ParallelBenchmark.traverse 10000 1000 thrpt 10 48.061 ± 0.012 ops/s
[info] WorkStealingBenchmark.alloc N/A 1000000 thrpt 10 4.427 ± 0.009 ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark N/A 1000000 thrpt 10 6.713 ± 0.132 ops/min
[info] WorkStealingBenchmark.runnableScheduling N/A 1000000 thrpt 10 626.559 ± 3.414 ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal N/A 1000000 thrpt 10 579.224 ± 2.218 ops/min
[info] WorkStealingBenchmark.scheduling N/A 1000000 thrpt 10 7.908 ± 0.388 ops/min
@djspiewak thank you so much for taking your time to review this!
Finally got to the benchmarks. Looks like there is a notable impact. Let's see if we can do some micro-optimization to gain back those millis.
Before
[info] Benchmark (cpuTokens) (size) Mode Cnt Score Error Units [info] ParallelBenchmark.parTraverse 10000 1000 thrpt 10 288.943 ± 1.853 ops/s [info] ParallelBenchmark.traverse 10000 1000 thrpt 10 48.077 ± 0.019 ops/s [info] WorkStealingBenchmark.alloc N/A 1000000 thrpt 10 4.473 ± 0.033 ops/min [info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark N/A 1000000 thrpt 10 6.820 ± 0.147 ops/min [info] WorkStealingBenchmark.runnableScheduling N/A 1000000 thrpt 10 700.437 ± 3.968 ops/min [info] WorkStealingBenchmark.runnableSchedulingScalaGlobal N/A 1000000 thrpt 10 597.162 ± 0.807 ops/min [info] WorkStealingBenchmark.scheduling N/A 1000000 thrpt 10 8.120 ± 0.346 ops/minAfter
[info] Benchmark (cpuTokens) (size) Mode Cnt Score Error Units [info] ParallelBenchmark.parTraverse 10000 1000 thrpt 10 289.627 ± 1.154 ops/s [info] ParallelBenchmark.traverse 10000 1000 thrpt 10 48.061 ± 0.012 ops/s [info] WorkStealingBenchmark.alloc N/A 1000000 thrpt 10 4.427 ± 0.009 ops/min [info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark N/A 1000000 thrpt 10 6.713 ± 0.132 ops/min [info] WorkStealingBenchmark.runnableScheduling N/A 1000000 thrpt 10 626.559 ± 3.414 ops/min [info] WorkStealingBenchmark.runnableSchedulingScalaGlobal N/A 1000000 thrpt 10 579.224 ± 2.218 ops/min [info] WorkStealingBenchmark.scheduling N/A 1000000 thrpt 10 7.908 ± 0.388 ops/min
i see. could you please elaborate a little bit on how we could do that?
No magic bullet really! We just have to experiment and see. The most expensive things you added have to do with the AtomicLongs, so I would start by commenting out the mutations in the hot path and see if that changes the benchmark results.
Or rather, I would start by trying to replicate my benchmark results locally. benchmarks / Jmh / run -f 1 -wi 10 -i 10 cats.effect.benchmarks.WorkStealingBenchmark is the relevant one. See if you can reproduce the statistically significant difference before and after your change, then see if you can manipulate it by commenting out lines. If we can figure out what's especially expensive, we can start thinking of creative ways to do it differently so it's less expensive!