pool
pool copied to clipboard
Add FIFO queue of waiters
Under the current design, resources are kept in a TVar
pointing to a list of resources. If a thread attempts to acquire a resource and finds the list empty it calls retry
, adding itself as a waiter to the resource list TVar
. When the resource TVar
is later modified all waiters are awoken and free to retry their STM transaction. This is problematic when there are many threads waiting on a resource as they will all be awoken by a thread that puts a resource but only one of the awoken threads will acquire that resource and the rest will fail their validation step. All of these wake ups and retries create some overhead.
To measure this overhead I created a benchmark that forks some number of threads, each of which executes the following loop 10 times: acquire a postgresql connection from a pool and execute a simple query that takes a fixed amount of time (using pg_sleep
). The main thread waits for all forked threads to complete this work. Benchmark output names look like t=<int>/c=<int>/d=<int>/s=<int>
where t
is the number of threads forked, c
is the maximum number of connections the pool allows, d
is the postgresql query delay in microseconds, and s
is the number of stripes that c
is spread over. I am running these benchmarks with the following flags: --regress=mutatorCpuSeconds:iters +RTS -N20 -A128m -qn2 -I0 -T
.
Looking at the output:
benchmarking t=500/c=25/d=2000/s=1 resource-pool
time 6.158 s (5.315 s .. 6.560 s)
0.998 R² (0.995 R² .. 1.000 R²)
mean 5.832 s (5.631 s .. 5.970 s)
std dev 198.1 ms (81.93 ms .. 272.8 ms)
mutatorCpuSeconds: 0.997 R² (0.994 R² .. 1.000 R²)
iters 109.085 (93.416 .. NaN)
y -13.266 (-36.762 .. 9.157)
variance introduced by outliers: 19% (moderately inflated)
benchmarking t=500/c=25/d=2000/s=5 resource-pool
time 2.637 s (2.602 s .. 2.681 s)
1.000 R² (1.000 R² .. 1.000 R²)
mean 2.740 s (2.703 s .. 2.791 s)
std dev 58.93 ms (20.19 ms .. 78.20 ms)
mutatorCpuSeconds: 1.000 R² (1.000 R² .. 1.000 R²)
iters 41.539 (40.277 .. NaN)
y 2.886 (NaN .. 7.416)
variance introduced by outliers: 19% (moderately inflated)
benchmarking t=500/c=25/d=2000/s=25 resource-pool
time 1.074 s (1.037 s .. 1.129 s)
1.000 R² (0.999 R² .. 1.000 R²)
mean 1.077 s (1.065 s .. 1.084 s)
std dev 11.47 ms (5.179 ms .. 14.38 ms)
mutatorCpuSeconds: 0.999 R² (0.997 R² .. 1.000 R²)
iters 11.292 (10.340 .. 12.223)
y -0.243 (-2.736 .. 1.030)
variance introduced by outliers: 19% (moderately inflated)
We see 109.085 seconds of cpu time consumed when we use one stripe. We can lower this by increasing the number of stripes as fewer threads will be awoken on each put. With 25 stripes of 1 connection we reduce the cpu time consumed to 11.292 seconds, and our running time is reduced from 6.158 seconds to 1.074 seconds. Of course, we are unfairly favoring high striping in this benchmark since each thread does the exact same amount of work. In an actual application with uneven work across threads this would have undesirable consequences.
It is clear that performance improves if we wake up fewer threads, and an ideal scenario would be if we woke up at most one thread when a resource is returned. To this end, I propose that we add a FIFO queue of TMVar (Entry a)
to the pool. When a thread attempts to acquire a resource it first checks the available resource list. If this list is empty then the thread adds an empty TMVar
to the waiter queue and waits on it. When returning a resource to the pool the returning thread first checks if there are any waiters in the queue, and if so returns straight to the waiter, waking up only 1 thread.
There is a complication though: a waiting thread might be killed, and if so we don't want to put a resource into its TMVar
. To avoid this issue the queue implemented in this PR allows for removing any entry from the queue. Any thread that enqueues a TMVar
and blocks on reading it first installs an exception handler that removes itself from the queue and returns the resource to the pool if it has already been put into its TMVar
.
The benchmark shows promising results for this change:
benchmarking t=500/c=25/d=2000/s=1 resource-pool-patched
time 469.3 ms (468.9 ms .. 469.7 ms)
1.000 R² (1.000 R² .. 1.000 R²)
mean 469.1 ms (468.9 ms .. 469.2 ms)
std dev 158.2 μs (62.94 μs .. 213.0 μs)
mutatorCpuSeconds: 1.000 R² (1.000 R² .. 1.000 R²)
iters 0.148 (0.146 .. 0.153)
y -9.138e-4 (-7.374e-3 .. 6.396e-3)
variance introduced by outliers: 19% (moderately inflated)
Here we see the benchmark ran against this PR with a single stripe and the running time is less than half the running time of the above 25 stripe pool while cpu time is less than 2% the cpu time consumed by the 25 stripe pool. Indeed, with this PR I did not run a benchmark where striping would be beneficial.
The full results may be found here and the html output may be found here.
I'll also note two other desirable properties that we get from this waiter queue:
- Fairer resource distribution as once a thread is enqueued in the waiter queue no later thread can "cut it in line" and acquire a resource before it
- If the waiter queue is in use then putting and taking resources are not conflicting transactions, so concurrent execution of a take and put will not invalidate either (assuming there is more than one waiter in the queue)
@tstat just FYI, I merged this branch into my fork via this PR, alongside other long-standing PRs: one, two
If you're interested, I'd appreciate if you review the validity of the merge, as I had to conflict-resolve a few lines because of the changes introduced by the other two PRs.
As the repo currently doesn't have public tests, I ran the resulting master branch with my internal project and it worked without visible issues, at least.
https://hackage.haskell.org/package/resource-pool-fork-avanov-0.2.4.0
@tstat Thanks a lot for this PR!
What do you think about limiting the waiter queue? I was thinking that could serve as a way to do backpressure and prevent the queue from getting too big.
Have you tried using the QSem
from Control.Concurrent.QSem
to let threads through instead? IIUC it should have the same result, but with less changes.
I think this PR could be closed, as this package has changed hands and been re-implemented here: https://github.com/scrive/pool
I'll also note that the new implementation bears a lot of resemblance to this PR, but based on a queue of MVars rather than TVars, perhaps some attribution is in order? :)
@mitchellwrosen the current implementation is based on QSem
from base
, not on this PR. I don't even fully understand the code from this PR.