async icon indicating copy to clipboard operation
async copied to clipboard

Fiber starvation when sharing a connection pool

Open brendar opened this issue 4 years ago • 9 comments

Certain access patterns, pool sizes, and numbers of fibers will result in fiber starvation when sharing a connection pool. The starvation occurs when the pool size is smaller than the number of fibers, and the access pattern of using pooled resources and performing IO causes a waiting fiber to only be resumed at times when a pooled resource is unavailable, so it must wait again. The sequence continues until some timeout is reached.

Reproduction script worker-1 is always signaled when the resource is released after "Sleep with resource #1", but is resumed during "Sleep with resource #2", when the resource has already been re-acquired, so it is never able to make progress. worker-0 and worker-2 make progress by being resumed during the "Sleep without resource" step.

require 'async'
require 'colorize'
require_relative 'resource_pool'

POOL = ResourcePool.new(pool_size: 1, timeout: 0.1)
WORKER_COUNT = 3
MAX_TEST_DURATION = 2.0
LOG_COLORS = [:light_blue, :light_magenta, :light_green, :light_red, :light_cyan, :light_yellow,
              :blue, :magenta, :green, :red, :cyan, :yellow]

class Logger
  def self.debug(message)
    task = Async::Task.current
    fiber = Fiber.current
    color = Thread.current[:log_color]
    puts "[#{Time.now}] #{task} on Fiber 0x#{fiber.object_id.to_s(16)}: #{message}".colorize(color)
  end
end

Async do
  clock = Async::Clock.new
  clock.start!

  WORKER_COUNT.times do |n|
    Async(annotation: "worker-#{n}") do
      Thread.current[:log_color] = LOG_COLORS[n]

      begin
        while clock.total < MAX_TEST_DURATION do
          POOL.with_resource do
            Logger.debug('Sleep with resource #1')
            sleep(0.001) # simulates a DB call
          end

          POOL.with_resource do
            Logger.debug('Sleep with resource #2')
            sleep(0.001) # simulates a DB call
          end

          Logger.debug('Sleep without resource')
          sleep(0.001) # simulates some other IO
        end
      rescue ResourcePool::TimeoutError => e
        Logger.debug("Timed out. Aborting test after #{clock.total} seconds")
        puts "#{e.class} #{e.message}"
        puts e.backtrace
        STDOUT.flush
        Kernel.exit!
      end
    end
  end
end

resource_pool.rb

# Uses the same acquire/release flow as Sequel::ThreadedConnectionPool
class ResourcePool
  class TimeoutError < StandardError; end

  def initialize(pool_size:, timeout:)
    @available_resources = pool_size.times.map { |n| "resource-#{n}" }
    @timeout = timeout
    @mutex = Mutex.new
    @waiter = ConditionVariable.new
  end

  def with_resource
    resource = acquire
    yield resource
  ensure
    if resource
      release(resource)
    end
  end

  private

  def acquire
    if resource = sync_next_available
      Logger.debug('Pool: Acquired resource without waiting')
      return resource
    end

    timeout = @timeout
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

    @mutex.synchronize do
      Logger.debug('Pool: Waiting')
      @waiter.wait(@mutex, timeout)
      if resource = next_available
        Logger.debug('Pool: Acquired resource after waiting')
        return resource
      end
    end

    until resource = sync_next_available
      elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
      
      if elapsed > timeout
        raise TimeoutError, "Unable to acquire resource after #{elapsed} seconds"
      end

      # We get here when the resource was released and this fiber was unblocked by the signal,
      # but the resource was immediately re-acquired by the fiber that sent the signal before
      # this fiber could be resumed. Effectively a race condition.
      @mutex.synchronize do
        Logger.debug('Pool: Woken by signal but resource unavailable. Waiting again.')
        @waiter.wait(@mutex, timeout - elapsed)
        if resource = next_available
          Logger.debug('Pool: Acquired resource after multiple waits')
          return resource
        end
      end
    end

    Logger.debug('Pool: Acquired resource after waiting')
    resource
  end

  def release(resource)
    @mutex.synchronize do
      @available_resources << resource
      Logger.debug('Pool: Released resource. Signaling.')
      @waiter.signal
    end
  end

  def sync_next_available
    @mutex.synchronize do
      next_available
    end
  end

  def next_available
    @available_resources.pop
  end
end

My environment and output

$ uname -a
Linux toppy 4.15.0-135-generic #139-Ubuntu SMP Mon Jan 18 17:38:24 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ ruby -v
ruby 3.0.0p0 (2020-12-25 revision 95aff21468) [x86_64-linux]

$ gem list --exact async
*** LOCAL GEMS ***
async (1.28.5)

$ ruby async_resource_pool_test.rb # line added between fibers for clarity
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1

[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Waiting

[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Waiting

[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #2

[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again.

[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep without resource

[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource after waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #1

[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Waiting

[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Acquired resource without waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep with resource #2

[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Pool: Woken by signal but resource unavailable. Waiting again.

[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Pool: Released resource. Signaling.
[2021-01-31 14:33:32 -0500] #<Async::Task:0xb4 worker-2 (running)> on Fiber 0xc8: Sleep without resource

[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Pool: Acquired resource after waiting
[2021-01-31 14:33:32 -0500] #<Async::Task:0x3c worker-0 (running)> on Fiber 0x50: Sleep with resource #1

... sequence continues until ...

[2021-01-31 14:33:32 -0500] #<Async::Task:0x78 worker-1 (running)> on Fiber 0x8c: Timed out. Aborting test after 0.10122425699955784 seconds
ResourcePool::TimeoutError Unable to acquire resource after 0.1010006020005676 seconds
/home/brendan/projects/async-experiments/resource_pool.rb:44:in `acquire'
/home/brendan/projects/async-experiments/resource_pool.rb:12:in `with_resource'
async_resource_pool_test.rb:30:in `block (3 levels) in <main>'
/home/brendan/.rvm/gems/ruby-3.0.0/gems/async-1.28.5/lib/async/task.rb:265:in `block in make_fiber'

Note that in about 1 run out of 10, the script will run for longer than 0.1 seconds, so far always timing out at 0.2 seconds, but could presumably run for longer. I believe this occurs when the wait timeout triggers at a time that causes worker-1 to be resumed when the resource is available during the "Sleep without resource" step. The system falls back into a starvation pattern after that though.

I'm interested to hear your thoughts on what should be done to avoid this. Is this something that ruby or async should prevent? Or should a fiber-safe resource pool be written differently from a thread-safe resource pool? (e.g. explicitly yielding after releasing the resource if others are waiting)

brendar avatar Feb 01 '21 16:02 brendar

I'm just trying to understand the extent of the issue.

Are you saying there are some cases where the fiber is not fairly scheduled? i.e. in some cases, if two fibers are waiting on the same queue, only one fiber ever makes progress?

ioquatix avatar Feb 07 '21 04:02 ioquatix

Okay, I could reproduce the issue... let me consider what is going on.

ioquatix avatar Feb 07 '21 05:02 ioquatix

I tried adding this to the scheduler:

unblocked.sort_by!{rand}

But it turns out there is never more than one fiber getting unblocked at a time. It seems like the sequence of events always produces a predictable deadlock.

ioquatix avatar Feb 07 '21 05:02 ioquatix

  def release(resource)
    @mutex.synchronize do
      @available_resources << resource
      Logger.debug('Pool: Released resource. Signaling.')
      @waiter.signal
    end

    sleep(0)
  end

This seems to mitigate the problem by ensuring that the task that releases the pool resource doesn't immediately reacquire it.

ioquatix avatar Feb 07 '21 05:02 ioquatix

My guess, is mutex.unlock is not actually very fair - it's immediately scheduling the next available task. My guess is due to the order of operations, it's likely that we end up with a predictable order.

I would say that in order for this to work more fairly, we should check the implementation of mutex.unlock. Maybe we need to do random order, reverse order or some other order.

A resource pool should technically wake up the task that has been waiting the longest if timeouts are being used.

ioquatix avatar Feb 07 '21 05:02 ioquatix

@ko1 do you have any ideas/feedback?

ioquatix avatar Feb 07 '21 05:02 ioquatix

My guess, is mutex.unlock is not actually very fair - it's immediately scheduling the next available task. My guess is due to the order of operations, it's likely that we end up with a predictable order.

I would say that in order for this to work more fairly, we should check the implementation of mutex.unlock. Maybe we need to do random order, reverse order or some other order.

I think that's roughly accurate, but it might be ConditionVariable#signal that would need to be randomized. ConditionVariable#signal seems to always unblock the fiber that's been waiting the longest, which seems like the fairest thing to do, but in this scenario the fiber can't make progress after being resumed, so it turns out to be unfair overall. Randomizing which fiber is unblocked by ConditionVariable#signal might solve the problem, but it it could also make other fiber use cases less fair.

A resource pool should technically wake up the task that has been waiting the longest if timeouts are being used.

Agreed. I also eventually discovered the sleep(0) solution you posted above, and in some ways that seems preferable to randomizing fiber unblocking/resumption. Presumably it could be achieved via some clearer means like Fiber.yield_and_reschedule or something like that, but yielding if other fibers are waiting for the resource seems like the best solution to this problem.

This appears to be what happens in the crystal-db connection pool: https://github.com/crystal-lang/crystal-db/blob/v0.10.0/src/db/pool.cr#L157 Where I believe @availability_channel.send nil will cause the calling fiber to yield and be rescheduled.

The difficulty with having the pool sleep/yield explicitly is that it would require changes to existing resource pool implementations. That might be acceptable though since the change would be minimal.

If requiring changes to existing resource pool implementations is undesirable, would it be possible for Ruby to automatically yield a fiber upon ConditionVariable#signal if others are waiting? It might need to delay the yield until after the fiber has unlocked the mutex, but that might be doable. Admittedly I haven't fully thought this through, but perhaps it would work like this:

  def release(resource)
    @mutex.synchronize do
      @available_resources << resource
      Logger.debug('Pool: Released resource. Signaling.')
      @waiter.signal # Ruby marks this fiber as "should yield after releasing @mutex"
    end # Ruby automatically yields and reschedules this fiber

    # sleep(0) # No longer necessary
  end

brendar avatar Feb 10 '21 15:02 brendar

Thanks for the discussion. That makes sense.

My gut feeling is, @waiter.signal should be more fair, i.e. the implementation could probably do a better job of picking which fiber/thread to resume.

but in this scenario the fiber can't make progress after being resumed

Why is that? Is it because the mutex is locked and we signal it within the synchronize block?

ioquatix avatar Feb 11 '21 00:02 ioquatix

The mutex is only locked during ResourcePool#acquire/release, so it is unlocked during "Sleep with resource". This allows the starved fiber (worker-1) to be resumed, but it finds that @available_resources is empty because the resource is still checked out (by the fiber performing "Sleep with resource"). It prints "Pool: Woken by signal but resource unavailable. Waiting again." and calls @waiter.wait.

So it was resumed, but it wasn't able to make progress as there was no resource available.

brendar avatar Feb 11 '21 03:02 brendar

I'm investigating this problem again.

I don't think it's a problem we can solve in Async itself, so I'm going to close this issue. But I think we should open an issue at <bugs.ruby-lang.org>.

I thought about the problem a bit more.

The problem is that the "signal the oldest" doesn't work when the oldest thread/fiber/thing is consistently receiving spurious signals.

To test this hypothesis, I implemented this in CRuby and added wakeup_one_random:

static void
wakeup_one_random(struct ccan_list_head *head)
{
    struct sync_waiter *cur = 0, *next;
    size_t size = 0;

    ccan_list_for_each_safe(head, cur, next, node) {
        size++;
    }

    if (size == 0) return;

    size_t index = rand() % size;

    ccan_list_for_each_safe(head, cur, next, node) {
        if (index-- == 0) {
            ccan_list_del_init(&cur->node);

            if (cur->th->status != THREAD_KILLED) {
                if (cur->th->scheduler != Qnil && cur->fiber) {
                    rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
                }
                else {
                    rb_threadptr_interrupt(cur->th);
                    cur->th->status = THREAD_RUNNABLE;
                }
            }

            return;
        }
    }
}

// ...

/*
 * Document-method: Thread::ConditionVariable#signal
 *
 * Wakes up the first thread in line waiting for this lock.
 */

static VALUE
rb_condvar_signal(VALUE self)
{
    struct rb_condvar *cv = condvar_ptr(self);
    wakeup_one_random(&cv->waitq);
    return self;
}

This solved the problem - I hope the reason is obvious but let me know and I'll explain it if not.

I don't think Async can solve this problem because it's to do with Ruby's scheduling.

The short term solution is to introduce sleep(0) as discussed as this allows a different fiber to run before trying to re-acquire the resource.

ioquatix avatar Jun 07 '23 09:06 ioquatix

I created a pure Ruby (no Async) reproduction using Threads:

https://github.com/ioquatix/ruby-condition-variable-timeout

Here is the bug report: https://bugs.ruby-lang.org/issues/19717.

ioquatix avatar Jun 07 '23 10:06 ioquatix