How to make solid_queue work well with fibers?
We make quite a few LLM calls in our background jobs and want to leverage fibers to efficiently use the resources as these jobs spend most of their time waiting for HTTP responses.
I have attempted to make it work using async gem But it sometimes get into a deadlock (my guess) and stops process the jobs
# config/initializers/solid_queue.rb
require "async"
module SolidQueue
module AsyncableWorker
extend ActiveSupport::Concern
def initialize(**options)
super
@async = options.fetch(:async, false)
@pool = Pool.new(options[:threads], on_idle: -> { wake_up }, async: @async)
end
def do_start_loop
if @async
Async do
super
end
else
super
end
end
end
module AsyncablePool
extend ActiveSupport::Concern
def initialize(size, on_idle: nil, async: false)
super(size, on_idle: on_idle)
@async = async
end
def post(execution)
if @async
available_threads.decrement
Async do
wrap_in_app_executor do
execution.perform
rescue => error
handle_thread_error(error)
ensure
available_threads.increment
mutex.synchronize { on_idle.try(:call) if idle? }
end
end
else
super
end
end
end
end
SolidQueue::Pool.class_eval do
prepend SolidQueue::AsyncablePool
end
SolidQueue::Worker.class_eval do
prepend SolidQueue::AsyncableWorker
end
NOTE: See the newly introduced async option in workers which enables fiber usage in above initialiser code
#config/solid_queue.yml
# The supervisor forks workers and dispatchers according to the configuration, controls their heartbeats, and sends them signals to stop and start them when needed.
default: &default
# Dispatchers are in charge of selecting jobs scheduled to run in the future that are due and dispatching them,
# which is simply moving them from the solid_queue_scheduled_executions table over to the solid_queue_ready_executions table so that workers can pick them up.
# They also do some maintenance work related to concurrency controls.
dispatchers:
- polling_interval: 1 # seconds
batch_size: 500
concurrency_maintenance_interval: 600 # seconds before checking if blocked jobs can be unblocked
# Workers are in charge of picking jobs ready to run from queues and processing them.
# They work off the solid_queue_ready_executions table.
workers:
- queues: [critical, default, lowpriority, oneoff]
async: <%= ENV['SOLID_QUEUE_DISABLE_ASYNC_WORKERS'] != 'true' %> # whether to use async workers (fibers) or not (threads)
threads: 5 # thread/fiber pool max size. Also used as batch size when fetching jobs.
processes: <%= (ENV['SOLID_QUEUE_WORKER_PROCESS_COUNT'] || Concurrent.processor_count).to_i %>
polling_interval: 0.1 # seconds
And start the process regularly bundle exec rake solid_queue:start
Has anyone else attempted similar or know what the issue could be?
Hey @mintuhouse, thanks for writing this up. I'm not familiar with async, so after a first look, I'm not sure where the problem could be 😕
However, I have a question: what's the advantage of wrapping the worker's loop in Async if each job already runs in a fiber if you configure solid queue workers to use multiple threads/fiber? That's it, you'd running the whole pool in a fiber, no? And each fiber would be unique per worker process 🤔