cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap

Open pantShrey opened this issue 7 months ago • 3 comments
trafficstars

Description: This PR fixes a thread leak in the Work Stealing Thread Pool (Issue #4382), previously caused by FIFO-based reuse of cached blocker threads (introduced in #4295). This FIFO behavior could prevent older cached threads from timing out and exiting during periods of low load, when the number of cached threads exceeds the number of blocking tasks. The implemented solution transitions to a LIFO-like reuse strategy for cached blocker threads. This is achieved by using a SynchronousQueue for state hand-off between threads (leveraging its typically LIFO behavior for waiting pollers in non-fair mode) and a ConcurrentHashMap for tracking these blocker threads. Changes:

  1. SynchronousQueue<TransferState> for State Hand-off:
    • WorkerThreads becoming blockers offer their TransferState to this pool-level queue.
    • Cached (previously blocker) WorkerThreads poll this queue to acquire new state. In its non-fair implementation, the JDK's SynchronousQueue typically services waiting pollers in a LIFO manner, prioritizing more recently cached threads for reuse.
    • If an offer isn't immediately taken, the thread preparing to block spawns a new replacement worker to maintain active pool size.
  2. ConcurrentHashMap<WorkerThread, Boolean> for Tracking:
    • All threads that enter the blocker/cached state are now tracked in this map.
  3. WorkerThread Blocker Logic Update:
    • After its blocking call, a blocker thread attempts to poll the SynchronousQueue for new TransferState with a timeout (runtimeBlockingExpiration).
    • Successful polling leads to re-initialization as an active worker. If the poll times out, the thread terminates and is removed from tracking.
  4. Shutdown Logic Modification:
    • WorkStealingThreadPool.shutdown() now iterates and interrupts all threads currently tracked in the blockerThreads map.

pantShrey avatar Apr 19 '25 19:04 pantShrey