async
async copied to clipboard
Fiber starvation when sharing a connection pool
Certain access patterns, pool sizes, and numbers of fibers will result in fiber starvation when sharing a connection pool. The starvation occurs when the pool size is smaller than the number of fibers, and the access pattern of using pooled resources and performing IO causes a waiting fiber to only be resumed at times when a pooled resource is unavailable, so it must wait again. The sequence continues until some timeout is reached.
Reproduction script
worker-1 is always signaled when the resource is released after "Sleep with resource #1", but is resumed during "Sleep with resource #2", when the resource has already been re-acquired, so it is never able to make progress. worker-0 and worker-2 make progress by being resumed during the "Sleep without resource" step.
require 'async'
require 'colorize'
require_relative 'resource_pool'
POOL = ResourcePool.new(pool_size: 1, timeout: 0.1)
WORKER_COUNT = 3
MAX_TEST_DURATION = 2.0
LOG_COLORS = [:light_blue, :light_magenta, :light_green, :light_red, :light_cyan, :light_yellow,
:blue, :magenta, :green, :red, :cyan, :yellow]
class Logger
def self.debug(message)
task = Async::Task.current
fiber = Fiber.current
color = Thread.current[:log_color]
puts "[#{Time.now}] #{task} on Fiber 0x#{fiber.object_id.to_s(16)}: #{message}".colorize(color)
end
end
Async do
clock = Async::Clock.new
clock.start!
WORKER_COUNT.times do |n|
Async(annotation: "worker-#{n}") do
Thread.current[:log_color] = LOG_COLORS[n]
begin
while clock.total < MAX_TEST_DURATION do
POOL.with_resource do
Logger.debug('Sleep with resource #1')
sleep(0.001) # simulates a DB call
end
POOL.with_resource do
Logger.debug('Sleep with resource #2')
sleep(0.001) # simulates a DB call
end
Logger.debug('Sleep without resource')
sleep(0.001) # simulates some other IO
end
rescue ResourcePool::TimeoutError => e
Logger.debug("Timed out. Aborting test after #{clock.total} seconds")
puts "#{e.class} #{e.message}"
puts e.backtrace
STDOUT.flush
Kernel.exit!
end
end
end
end
resource_pool.rb
# Uses the same acquire/release flow as Sequel::ThreadedConnectionPool
class ResourcePool
class TimeoutError < StandardError; end
def initialize(pool_size:, timeout:)
@available_resources = pool_size.times.map { |n| "resource-#{n}" }
@timeout = timeout
@mutex = Mutex.new
@waiter = ConditionVariable.new
end
def with_resource
resource = acquire
yield resource
ensure
if resource
release(resource)
end
end
private
def acquire
if resource = sync_next_available
Logger.debug('Pool: Acquired resource without waiting')
return resource
end
timeout = @timeout
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@mutex.synchronize do
Logger.debug('Pool: Waiting')
@waiter.wait(@mutex, timeout)
if resource = next_available
Logger.debug('Pool: Acquired resource after waiting')
return resource
end
end
until resource = sync_next_available
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
if elapsed > timeout
raise TimeoutError, "Unable to acquire resource after #{elapsed} seconds"
end
# We get here when the resource was released and this fiber was unblocked by the signal,
# but the resource was immediately re-acquired by the fiber that sent the signal before
# this fiber could be resumed. Effectively a race condition.
@mutex.synchronize do
Logger.debug('Pool: Woken by signal but resource unavailable. Waiting again.')
@waiter.wait(@mutex, timeout - elapsed)
if resource = next_available
Logger.debug('Pool: Acquired resource after multiple waits')
return resource
end
end
end
Logger.debug('Pool: Acquired resource after waiting')
resource
end
def release(resource)
@mutex.synchronize do
@available_resources << resource
Logger.debug('Pool: Released resource. Signaling.')
@waiter.signal
end
end
def sync_next_available
@mutex.synchronize do
next_available
end
end
def next_available
@available_resources.pop
end
end
My environment and output
$ uname -a
Linux toppy 4.15.0-135-generic #139-Ubuntu SMP Mon Jan 18 17:38:24 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
$ ruby -v
ruby 3.0.0p0 (2020-12-25 revision 95aff21468) [x86_64-linux]
$ gem list --exact async
*** LOCAL GEMS ***
async (1.28.5)
$ ruby async_resource_pool_test.rb # line added between fibers for clarity
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1
[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #2
[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again.
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep without resource
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource after waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #1
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #2
[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again.
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep without resource
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource after waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1
... sequence continues until ...
[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Timed out. Aborting test after 0.10122425699955784 seconds
ResourcePool::TimeoutError Unable to acquire resource after 0.1010006020005676 seconds
/home/brendan/projects/async-experiments/resource_pool.rb:44:in `acquire'
/home/brendan/projects/async-experiments/resource_pool.rb:12:in `with_resource'
async_resource_pool_test.rb:30:in `block (3 levels) in <main>'
/home/brendan/.rvm/gems/ruby-3.0.0/gems/async-1.28.5/lib/async/task.rb:265:in `block in make_fiber'
Note that in about 1 run out of 10, the script will run for longer than 0.1 seconds, so far always timing out at 0.2 seconds, but could presumably run for longer. I believe this occurs when the wait timeout triggers at a time that causes worker-1 to be resumed when the resource is available during the "Sleep without resource" step. The system falls back into a starvation pattern after that though.
I'm interested to hear your thoughts on what should be done to avoid this. Is this something that ruby or async should prevent? Or should a fiber-safe resource pool be written differently from a thread-safe resource pool? (e.g. explicitly yielding after releasing the resource if others are waiting)
I'm just trying to understand the extent of the issue.
Are you saying there are some cases where the fiber is not fairly scheduled? i.e. in some cases, if two fibers are waiting on the same queue, only one fiber ever makes progress?
Okay, I could reproduce the issue... let me consider what is going on.
I tried adding this to the scheduler:
unblocked.sort_by!{rand}
But it turns out there is never more than one fiber getting unblocked at a time. It seems like the sequence of events always produces a predictable deadlock.
def release(resource)
@mutex.synchronize do
@available_resources << resource
Logger.debug('Pool: Released resource. Signaling.')
@waiter.signal
end
sleep(0)
end
This seems to mitigate the problem by ensuring that the task that releases the pool resource doesn't immediately reacquire it.
My guess, is mutex.unlock is not actually very fair - it's immediately scheduling the next available task. My guess is due to the order of operations, it's likely that we end up with a predictable order.
I would say that in order for this to work more fairly, we should check the implementation of mutex.unlock. Maybe we need to do random order, reverse order or some other order.
A resource pool should technically wake up the task that has been waiting the longest if timeouts are being used.
@ko1 do you have any ideas/feedback?
My guess, is mutex.unlock is not actually very fair - it's immediately scheduling the next available task. My guess is due to the order of operations, it's likely that we end up with a predictable order.
I would say that in order for this to work more fairly, we should check the implementation of mutex.unlock. Maybe we need to do random order, reverse order or some other order.
I think that's roughly accurate, but it might be ConditionVariable#signal that would need to be randomized. ConditionVariable#signal seems to always unblock the fiber that's been waiting the longest, which seems like the fairest thing to do, but in this scenario the fiber can't make progress after being resumed, so it turns out to be unfair overall. Randomizing which fiber is unblocked by ConditionVariable#signal might solve the problem, but it it could also make other fiber use cases less fair.
A resource pool should technically wake up the task that has been waiting the longest if timeouts are being used.
Agreed. I also eventually discovered the sleep(0) solution you posted above, and in some ways that seems preferable to randomizing fiber unblocking/resumption. Presumably it could be achieved via some clearer means like Fiber.yield_and_reschedule or something like that, but yielding if other fibers are waiting for the resource seems like the best solution to this problem.
This appears to be what happens in the crystal-db connection pool: https://github.com/crystal-lang/crystal-db/blob/v0.10.0/src/db/pool.cr#L157
Where I believe @availability_channel.send nil will cause the calling fiber to yield and be rescheduled.
The difficulty with having the pool sleep/yield explicitly is that it would require changes to existing resource pool implementations. That might be acceptable though since the change would be minimal.
If requiring changes to existing resource pool implementations is undesirable, would it be possible for Ruby to automatically yield a fiber upon ConditionVariable#signal if others are waiting? It might need to delay the yield until after the fiber has unlocked the mutex, but that might be doable. Admittedly I haven't fully thought this through, but perhaps it would work like this:
def release(resource)
@mutex.synchronize do
@available_resources << resource
Logger.debug('Pool: Released resource. Signaling.')
@waiter.signal # Ruby marks this fiber as "should yield after releasing @mutex"
end # Ruby automatically yields and reschedules this fiber
# sleep(0) # No longer necessary
end
Thanks for the discussion. That makes sense.
My gut feeling is, @waiter.signal should be more fair, i.e. the implementation could probably do a better job of picking which fiber/thread to resume.
but in this scenario the fiber can't make progress after being resumed
Why is that? Is it because the mutex is locked and we signal it within the synchronize block?
The mutex is only locked during ResourcePool#acquire/release, so it is unlocked during "Sleep with resource". This allows the starved fiber (worker-1) to be resumed, but it finds that @available_resources is empty because the resource is still checked out (by the fiber performing "Sleep with resource"). It prints "Pool: Woken by signal but resource unavailable. Waiting again." and calls @waiter.wait.
So it was resumed, but it wasn't able to make progress as there was no resource available.
I'm investigating this problem again.
I don't think it's a problem we can solve in Async itself, so I'm going to close this issue. But I think we should open an issue at <bugs.ruby-lang.org>.
I thought about the problem a bit more.
The problem is that the "signal the oldest" doesn't work when the oldest thread/fiber/thing is consistently receiving spurious signals.
To test this hypothesis, I implemented this in CRuby and added wakeup_one_random:
static void
wakeup_one_random(struct ccan_list_head *head)
{
struct sync_waiter *cur = 0, *next;
size_t size = 0;
ccan_list_for_each_safe(head, cur, next, node) {
size++;
}
if (size == 0) return;
size_t index = rand() % size;
ccan_list_for_each_safe(head, cur, next, node) {
if (index-- == 0) {
ccan_list_del_init(&cur->node);
if (cur->th->status != THREAD_KILLED) {
if (cur->th->scheduler != Qnil && cur->fiber) {
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
else {
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
}
}
return;
}
}
}
// ...
/*
* Document-method: Thread::ConditionVariable#signal
*
* Wakes up the first thread in line waiting for this lock.
*/
static VALUE
rb_condvar_signal(VALUE self)
{
struct rb_condvar *cv = condvar_ptr(self);
wakeup_one_random(&cv->waitq);
return self;
}
This solved the problem - I hope the reason is obvious but let me know and I'll explain it if not.
I don't think Async can solve this problem because it's to do with Ruby's scheduling.
The short term solution is to introduce sleep(0) as discussed as this allows a different fiber to run before trying to re-acquire the resource.
I created a pure Ruby (no Async) reproduction using Threads:
https://github.com/ioquatix/ruby-condition-variable-timeout
Here is the bug report: https://bugs.ruby-lang.org/issues/19717.