rq
rq copied to clipboard
Worker concurrency?
It'd be nice if there was a built in mechanism to use multiprocess, eventlet, or gevent to have rworker handle n tasks concurrently.
I don't think it would complicate things.
Yes, it would be very nice feature.
@kennethreitz That's definitely a plan. Are you suggesting to replace the whole pop-then-fork model inside the worker? gevent definitely is promising. Any suggestions on supporting either multiprocessing or gevent?
Since multiprocessing is included in python maybe it should be preferred.
Both have their areas of usefulness. Roughly, multiprocessing is better at CPU-bound jobs, while gevent will profit from I/O heavy jobs. Building a clean API (so only little documentation is required) is the hardest part.
Multiprocessing is a good default.
It should be pretty strait forward, I think. Celery and gunicorn both abstract it to simple configuration strings: 'gevent'
and 'eventlet'
.
Please consider that multiprocessing
is not functional on BSD platforms. If you go down that route, you can fallback to threading (and be stuck with the GIL) like this:
https://github.com/hcarvalhoalves/duplicity/commit/cdc35e5169db4e773af3fc7e8d8de5e2407f26b5
Multiprocessing works fine on Windows.
Currently working on a gevent worker. Other async libs shouldn't be too difficult either once this is done.
When will this be available ? Is there a beta version which can be tried ?
It's in my fork for now https://github.com/jgelens/rq/ Please check the examples dir.
Thanks. I went through the code and examples. I have a few questions.
- How to select gevent type workers while starting rqworker ? I didn't see any cmd line options to switch to gevent.
- From worker.py it seems the slaves are using a semaphore(_slave_semaphore). Why is this required?. Is it only to get the running state of greenlets ?
- Is there a plan to support eventlet also ?
- Will this be merged to the official one ?
- bin/rqworker doesn't have an option for this (yet). For now import the GeventWorker instead of Worker to use Gevent.
- The samephore is indeed only there to get the running state, do you know an easier or better way?
- No plan currently, but it's easy to implement once the GeventWorker is done.
- I think so, @nvie will decide once the gevent worker is fully implemented.
As greenlets uses co-operative scheduling(non preemptible), I think for the state use-case, simple integer incr/decr should do.
Oh yes, this will be pulled into the main repo—definitely! I still need to take a look at it, though, but I've been too busy lately.
Thanks, any time lines planned for this feature ?
Nothing planned. If I don't see any problems with it for future development and it works reliably, I pull. As said, I simply haven't had the time to play with it, yet.
I ran some tests using the existing Worker and GeventWorker, and found some impressive results. My workers are running a simple database update to a MongoDB replica set, and for 1000 iterations the completion time went from ~3 minutes to ~8 seconds.
Any plans to pull this in sometime in the near future?
I acknowledge that @jgelens' work on the gevent workers is really interesting, and I would love to have it available in RQ. However, I'm still a bit hesitant to pull, since the implementation currently isn't really DRY and if changes are made to the core worker logic, the changes have to be made to both the naïve and the gevent worker implementations. The end result is increased complexity and I wouldn't be confident anymore to make changes to the worker core.
I still would love to pull the concurrent worker stuff in, but I think the core of the worker implementation needs some layer of abstraction (think BaseWorker
), of which both the Worker
and the GeventWorker
could subclass. My gut says that if we come up with the right abstraction, both subclasses could have a really simple implementation and big changes would only have to happen to BaseWorker
.
To summarise, I think Jeffrey's work is excellent, and it's definitely not his fault that I'm not pulling yet. It's just that the current RQ implementation needs some refactoring in order to support gevent elegantly.
We currently already have regular and horse type workers. Instead of making a BaseWorker
, I think we should make a new Horse
class which handles job dequeues and executions, this can then be subclassed to make other types of horses.
Very rough pseudo code to demonstrate what I have in mind (don't even know if this makes sense):
class Horse(object):
def fetch_job(self):
return Queue.dequeue_any(self.queues, blocking=True,
connection=self.connection)
def perform(self, job):
result = job.perform()
return result
def work(self):
while True: # This loop is for regular worker, a Gevent one would obviously be different
job = self.fetch_job()
result = self.execute(job)
class Worker(object):
def dispatch_horses(self, type, *args, **kwargs):
# start horses here
Horse(*args, **kwargs).work()
def work(self, burst=False, type=None, *args, **kwargs):
# ...
self.dispatch_horses(type=type, *args, **kwargs)
# ...
So essentially Worker
would just be in charge of starting, monitoring and stopping the Horses
(so perhaps the terminologies could be changed here since worker is typically in charge of executing the jobs).
Thoughts?
Thanks for your thoughts, @selwin!
Currently, the worker is responsible for dequeueing and unpickling the job function and the horse is responsible for execution (in an isolated context). The GeventWorker
uses a different approach: the worker only spawns slaves and just waits until they're all finished. The slaves themselves are responsible for both dequeueing/unpickling and execution. The latter is not due to limitations of gevent, however. It's just implemented that way.
I've stated this before: The main reason for the current implementation has to do with stability. When you spawn a child process (with fork(), or multiprocessing, or whatever) you get an isolated execution context, which has a few nice benefits. Some of which are:
- If a process crashes (by a segfault in a C module for example), only the child crashes;
- Additionally, the worker will always be responsive and can easily kill the child after a time out;
- Also, memory leaks caused in the child can never affect the main worker. The child is killed after every job, so memory should never grow, even when running rqworker for long periods of time.
These characteristics I'm not willing to let go.
Therefore, I like it that Worker
is responsible for the dequeueing loop. I could maybe live with a Horse
that's responsible for that, but the main dequeue/unpickle/execute loop should not be fully inside the Horse
, as it loses execution isolation (see 3). Two possible implementation scenarios as I see it:
- Add another level of "spawning" under the
Horse
, to provide execution isolation; - Keep the
Horse
responsible for what it's doing right now: merely (isolated) execution of jobs. TheWorker
then stays responsible for collecting work and managing its horses, only now it manages potentially many of them.
My gut feeling says the latter approach is simpler and feels more natural to me. What we need to change, however, is that it should not make the assumption of a single horse and instead support spawning/controlling multiple horses. Besides the aspect of supporting multiple horses, we should also delegate the actual creation and managing of them to a subclass, which knows about the implementation details for doing so (e.g. fork(), multiprocessing, gevent).
What we might need is a pool of horses (?), which can be size=1 for the default (forking) worker. The Worker
loop could then be:
while True:
result = Queue.dequeue_any(self.queues, wait_for_job=True, ...)
...
job, queue = result
self.spawn_horse(job)
The spawn_horse()
method would then spawn a horse to execute the given job, using a free pool slot. The nice thing about pools is that they are auto-managed (idle pool members are reused, and dying pool members are replaced).
Provided we use pool semantics to dispatch jobs to horses, and even use that for the simplest case (our current single-horse-by-forking), we have an abstraction that could work for every type of concurrency implementation. Threading, multiprocessing and gevent all support the pool model in some way or the other.
The thing that worries me most is how we can keep the signal handling / abortion behaviour the same. My experiences with multiprocessing pools are that there's little control over accessing/killing members explicitly from the managing process. I might need to familiarise myself a bit more with this and see how we would keep bullet 2 in practice. I'm not sure how compatible gevent is with respect to this, too.
More thoughts?
Oh, wait a minute. I suddenly remember something.
When I first wrote the worker implementation, I did consider supporting a pool for multiple members. This is what was so nasty about it:
-
gevent.pool.spawn()
typically blocks when there is no free slot—by that time, work has been read from Redis already, so if you stop the worker whilespawn()
is blocking, you lose a job; -
multiprocessing.apply_async()
never blocks, and instead buffers work in its internal queue—in effect, there's a chance that all work is read from Redis and gets queued up in the worker process instead. Again, you lose work if the worker is then killed (before the work is carried out).
Thinking about this again, we might need another pool primitive that works with both fork(), multiprocessing, threading, and gevent.
What we might need is something like this:
- We should be able to "claim" pool members before reading any work from Redis. Those members are spawned (started or reused), initialised and ready to perform work immediately when handed. When no free slots are available,
claim()
should be blocking; - Pool members can be asked to die gracefully after finishing their current job.
- There's direct access to the pool member's PIDs, in order to forcefully stop their execution (by killing their processes). I'm not sure how this even works with gevent, though. @jgelens, can you shed some light on this? Is force-killing even possible for gevent "children"?
@nvie Killing greenlets is certainly possible. I'll look into the discussion above later this week and post my thoughts.
I think making Worker
responsible for dequeueing jobs and passing them on to horses is complicated for the reasons @nvie stated above, it needs to know whether there's any horses available for work before running dequeue
or you'll risk losing jobs.
With regards to execution isolation, I think it just needs to be implemented within each horse's work
method. So for a example (again, only for rough illustration purposes, please be gentle ;)
class Horse(object):
def warm_shutdown(self):
# Warm shutdown logic here
def cold_shutdown(self):
# Cold shutdown logic
def fetch_job(self):
return Queue.dequeue_any(self.queues, blocking=True, connection=self.connection)
def perform(self, job):
try:
result = job.perform()
job.status = 'finished'
except:
job.status = 'failed'
# handle error
return result
def work(self):
while True: # This loop is for regular worker, a Gevent one would obviously be different
job = self.fetch_job()
result = self.execute(job)
class ForkingHorse(Horse):
def fork_and_perform_job(self, job):
#some forking action here, calling self.perform(job)
def work(self):
while True: # This loop is for regular worker, a Gevent one would obviously be different
job = self.fetch_job()
result = self.fork_and_perform_job(job)
class GeventHorse(Horse):
def __init__(self, *args, **kwargs):
self.slaves = kwargs.pop('slaves', 1)
self.slave_workers = []
self.slave_counter = self.slaves
super(GeventHorse, self).__init__(*args, **kwargs)
def work(self, burst=False):
# Gevent work logic here
class Worker(object):
def warm_shutdown(self):
self.horse.warm_shutdown()
def cold_shutdown(self):
self.horse.cold_shutdown()
def dispatch_horses(self, type, *args, **kwargs):
# start horses here
Horse(*args, **kwargs).work()
def work(self, burst=False, type=None, *args, **kwargs):
# ...
self.dispatch_horses(type=type, *args, **kwargs)
# ...
Thoughts?
Why not have the GeventHorse extend Greenlet, so that you can closely manage each process?
class GeventHorse(Horse, Greenlet):
def __init__(self, *args, **kwargs):
super(GeventHorse, self).__init__(*args, **kwargs)
def warm_shutdown(self):
# wait for this greenlet process to complete
self.join()
def cold_shutdown(self):
# Kill this greenlet process
self.kill()
# Horse execute
def execute(self, burst=False):
self.start()
# Greenlet _run
def _run():
# Gevent work logic here
@selwin, I agree that dequeueing jobs from within the horse makes the problem much simpler. So it might not be as bad an idea as I originally thought.
gevent
provides great primitives for implementing pools with blocking spawn()
calls, which is exactly what we need.
However, after playing a bit with multiprocessing.Pool
, I found my gut feeling to be correct: multiprocessing.Pool
does not block on apply_async
when all children are already busy. Instead, it queues up work internally. See this Gist for a demonstration.
We need to either monkey patch multiprocessing.Pool
, subclass it to fix this behaviour, or manually come up with something similar to the way gevent
works on this. FYI: I came across this SO thread that discusses something similar. The solution would be so simple, if only multiprocessing
would support passing through the maxsize
of its internal queue in the Pool()
constructor...
@nicholasaiello I don't have much experience using gevent
so yes, it could probably be much improved when we get around to implementing it. Your input will be much appreciated :)
@nvie I think as long as we keep the dequeue and perform within the same process (or child processes), everything should fall in place quite elegantly. I haven't had time to play with real codes but I think something like this would work for horses with multiple slaves:
def dequeue_and_perform_loop(horse, slave):
while True:
job = Queue.dequeue_any(horse.queues, blocking=True, connection=self.connection)
horse.perform_job(job)
class MultiProcessingHorse(Horse):
def __init__(self, num_processes=1, *args, **kwargs):
self.slaves = # init slaves
self.pool = Pool(processes=num_processes)
def work(self, *args, **kwargs):
for slave in self.slaves:
pool.apply_async(dequeue_and_perform_loop, [horse, slave])
I'll try to play around with this over the weekend.
OK. This is what I'm thinking about currently:
The main worker loop could be reduced to this:
def work(self):
"""Forever, keep spawning children (this is like how a "pre-fork" server works)."""
while True:
self.spawn_child()
The function spawn_child()
should be implemented in concrete subclasses, one for each supported concurrency technique. It must:
- Claim an execution slot, or block until such a free slot is available;
- Create a second unit of execution (process/greenlet/thread) and do all the fetch+execute work in there. When done, release the slot from that thread/process/greenlet.
Point 2 is simple. Thought should be given to how we can assure point 1. The blocking behaviour is absolutely crucial since work()
will be stuck in an endless loop otherwise.
The gevent implementation is fairly simple: using gevent.pool
, and pool.spawn
, it natively has the desired blocking spawn behaviour already.
The forking worker implementation could use a semaphore to guard the forking. What's extra nice about this is that we gain the ability to fork multiple times, instead of the current one horse limit. The current RQ (< 0.4) implementation is comparable to a forking worker with a semaphore value of 1 (a simple lock).
For a sample implementation, see: https://gist.github.com/3541415
I'm fairly certain that a ThreadingWorker
or MultiprocessingWorker
would be easy to do, too.
Now, all that remains is how to fit in the signal handling stuff in there, so we keep the current behaviour no matter what concurrency mechanism is used.
But... I'd say: :sparkles: progress :sparkles:!
@nvie I took a stab at porting ForkingWorker
from your gist to rq, it's still in a pretty rough state and doesn't actually perform jobs yet but here's what I found from this quick experiment (feel free to correct me if I'm wrong):
- Worker subclasses need to implement a few methods like
handle_cold_shutdown
andis_busy
methods to handle SIGINT - Graceful termination a worker instance when a job is in progress is an issue. Unlike the original
Worker
implementation, the parent process waits doesn't for the child process to finish before doing another BLPOP. Perhaps we can tweak therequest_stop
method to loop indefinitely and check whether all children are finished before terminating.
Here's the source code https://github.com/selwin/rq/blob/new-worker/rq/new_worker.py and here's how to run it:
from rq.queue import Queue
from rq.new_worker import ForkingWorker
from redis import Redis
redis = Redis()
queue = Queue('default', connection=redis)
worker = ForkingWorker(num_processes=4, queues=[queue], connection=redis)
worker.work()
I'd appreciate any feedback.
@nvie I just read your comment again for the second time and it seems like I misunderstood how you want it to be implemented. Now that I finally understand, I think the way you suggested would be simpler to implement.
I'm going to be looking at this really soon! (Fixing some production issues now, first.)