solid_queue icon indicating copy to clipboard operation
solid_queue copied to clipboard

How to make solid_queue work well with fibers?

Open mintuhouse opened this issue 1 year ago • 1 comments

We make quite a few LLM calls in our background jobs and want to leverage fibers to efficiently use the resources as these jobs spend most of their time waiting for HTTP responses.

I have attempted to make it work using async gem But it sometimes get into a deadlock (my guess) and stops process the jobs

# config/initializers/solid_queue.rb

require "async"

module SolidQueue
  module AsyncableWorker
    extend ActiveSupport::Concern

    def initialize(**options)
      super
      @async = options.fetch(:async, false)
      @pool = Pool.new(options[:threads], on_idle: -> { wake_up }, async: @async)
    end

    def do_start_loop
      if @async
        Async do
          super
        end
      else
        super
      end
    end
  end

  module AsyncablePool
    extend ActiveSupport::Concern

    def initialize(size, on_idle: nil, async: false)
      super(size, on_idle: on_idle)
      @async = async
    end

    def post(execution)
      if @async
        available_threads.decrement
        Async do
          wrap_in_app_executor do
            execution.perform
          rescue => error
            handle_thread_error(error)
          ensure
            available_threads.increment
            mutex.synchronize { on_idle.try(:call) if idle? }
          end
        end
      else
        super
      end
    end
  end
end

SolidQueue::Pool.class_eval do
  prepend SolidQueue::AsyncablePool
end

SolidQueue::Worker.class_eval do
  prepend SolidQueue::AsyncableWorker
end

NOTE: See the newly introduced async option in workers which enables fiber usage in above initialiser code

#config/solid_queue.yml

# The supervisor forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.
default: &default
  # Dispatchers are in charge of selecting jobs scheduled to run in the future that are due and dispatching them,
  # which is simply moving them from the solid_queue_scheduled_executions table over to the solid_queue_ready_executions table so that workers can pick them up.
  # They also do some maintenance work related to concurrency controls.
  dispatchers:
    - polling_interval: 1 # seconds
      batch_size: 500
      concurrency_maintenance_interval: 600 # seconds before checking if blocked jobs can be unblocked
  # Workers are in charge of picking jobs ready to run from queues and processing them.
  # They work off the solid_queue_ready_executions table.
  workers:
    - queues: [critical, default, lowpriority, oneoff]
      async: <%= ENV['SOLID_QUEUE_DISABLE_ASYNC_WORKERS'] != 'true' %> # whether to use async workers (fibers) or not (threads)
      threads: 5 # thread/fiber pool max size. Also used as batch size when fetching jobs.
      processes: <%= (ENV['SOLID_QUEUE_WORKER_PROCESS_COUNT'] || Concurrent.processor_count).to_i %>
      polling_interval: 0.1 # seconds

And start the process regularly bundle exec rake solid_queue:start


Has anyone else attempted similar or know what the issue could be?

mintuhouse avatar Feb 09 '24 17:02 mintuhouse

Hey @mintuhouse, thanks for writing this up. I'm not familiar with async, so after a first look, I'm not sure where the problem could be 😕

However, I have a question: what's the advantage of wrapping the worker's loop in Async if each job already runs in a fiber if you configure solid queue workers to use multiple threads/fiber? That's it, you'd running the whole pool in a fiber, no? And each fiber would be unique per worker process 🤔

rosa avatar Feb 13 '24 09:02 rosa