rq icon indicating copy to clipboard operation
rq copied to clipboard

Worker concurrency?

Open kennethreitz opened this issue 12 years ago • 74 comments

It'd be nice if there was a built in mechanism to use multiprocess, eventlet, or gevent to have rworker handle n tasks concurrently.

I don't think it would complicate things.

kennethreitz avatar Mar 28 '12 19:03 kennethreitz

Yes, it would be very nice feature.

pcdinh avatar Mar 29 '12 09:03 pcdinh

@kennethreitz That's definitely a plan. Are you suggesting to replace the whole pop-then-fork model inside the worker? gevent definitely is promising. Any suggestions on supporting either multiprocessing or gevent?

nvie avatar Mar 29 '12 13:03 nvie

Since multiprocessing is included in python maybe it should be preferred.

ibeex avatar Mar 29 '12 14:03 ibeex

Both have their areas of usefulness. Roughly, multiprocessing is better at CPU-bound jobs, while gevent will profit from I/O heavy jobs. Building a clean API (so only little documentation is required) is the hardest part.

nvie avatar Mar 29 '12 14:03 nvie

Multiprocessing is a good default.

It should be pretty strait forward, I think. Celery and gunicorn both abstract it to simple configuration strings: 'gevent' and 'eventlet'.

kennethreitz avatar Mar 29 '12 15:03 kennethreitz

Please consider that multiprocessing is not functional on BSD platforms. If you go down that route, you can fallback to threading (and be stuck with the GIL) like this:

https://github.com/hcarvalhoalves/duplicity/commit/cdc35e5169db4e773af3fc7e8d8de5e2407f26b5

hcarvalhoalves avatar Mar 29 '12 16:03 hcarvalhoalves

Multiprocessing works fine on Windows.

kennethreitz avatar Mar 29 '12 16:03 kennethreitz

Currently working on a gevent worker. Other async libs shouldn't be too difficult either once this is done.

jgelens avatar May 20 '12 23:05 jgelens

When will this be available ? Is there a beta version which can be tried ?

anoopk6 avatar May 28 '12 07:05 anoopk6

It's in my fork for now https://github.com/jgelens/rq/ Please check the examples dir.

jgelens avatar May 28 '12 12:05 jgelens

Thanks. I went through the code and examples. I have a few questions.

  • How to select gevent type workers while starting rqworker ? I didn't see any cmd line options to switch to gevent.
  • From worker.py it seems the slaves are using a semaphore(_slave_semaphore). Why is this required?. Is it only to get the running state of greenlets ?
  • Is there a plan to support eventlet also ?
  • Will this be merged to the official one ?

anoopk6 avatar May 28 '12 13:05 anoopk6

  1. bin/rqworker doesn't have an option for this (yet). For now import the GeventWorker instead of Worker to use Gevent.
  2. The samephore is indeed only there to get the running state, do you know an easier or better way?
  3. No plan currently, but it's easy to implement once the GeventWorker is done.
  4. I think so, @nvie will decide once the gevent worker is fully implemented.

jgelens avatar May 28 '12 13:05 jgelens

As greenlets uses co-operative scheduling(non preemptible), I think for the state use-case, simple integer incr/decr should do.

anoopk6 avatar May 28 '12 14:05 anoopk6

Oh yes, this will be pulled into the main repo—definitely! I still need to take a look at it, though, but I've been too busy lately.

nvie avatar May 28 '12 14:05 nvie

Thanks, any time lines planned for this feature ?

anoopk6 avatar May 28 '12 14:05 anoopk6

Nothing planned. If I don't see any problems with it for future development and it works reliably, I pull. As said, I simply haven't had the time to play with it, yet.

nvie avatar May 28 '12 14:05 nvie

I ran some tests using the existing Worker and GeventWorker, and found some impressive results. My workers are running a simple database update to a MongoDB replica set, and for 1000 iterations the completion time went from ~3 minutes to ~8 seconds.

Any plans to pull this in sometime in the near future?

nicholasaiello avatar Aug 23 '12 19:08 nicholasaiello

I acknowledge that @jgelens' work on the gevent workers is really interesting, and I would love to have it available in RQ. However, I'm still a bit hesitant to pull, since the implementation currently isn't really DRY and if changes are made to the core worker logic, the changes have to be made to both the naïve and the gevent worker implementations. The end result is increased complexity and I wouldn't be confident anymore to make changes to the worker core.

I still would love to pull the concurrent worker stuff in, but I think the core of the worker implementation needs some layer of abstraction (think BaseWorker), of which both the Worker and the GeventWorker could subclass. My gut says that if we come up with the right abstraction, both subclasses could have a really simple implementation and big changes would only have to happen to BaseWorker.

To summarise, I think Jeffrey's work is excellent, and it's definitely not his fault that I'm not pulling yet. It's just that the current RQ implementation needs some refactoring in order to support gevent elegantly.

nvie avatar Aug 27 '12 13:08 nvie

We currently already have regular and horse type workers. Instead of making a BaseWorker, I think we should make a new Horse class which handles job dequeues and executions, this can then be subclassed to make other types of horses.

Very rough pseudo code to demonstrate what I have in mind (don't even know if this makes sense):


class Horse(object):

    def fetch_job(self):
        return Queue.dequeue_any(self.queues, blocking=True,
                    connection=self.connection)

    def perform(self, job):
        result = job.perform()
        return result

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.execute(job)


class Worker(object):

    def dispatch_horses(self, type, *args, **kwargs):
        # start horses here
        Horse(*args, **kwargs).work()

    def work(self, burst=False, type=None, *args, **kwargs):
        # ...
        self.dispatch_horses(type=type, *args, **kwargs)
        # ...

So essentially Worker would just be in charge of starting, monitoring and stopping the Horses (so perhaps the terminologies could be changed here since worker is typically in charge of executing the jobs).

Thoughts?

selwin avatar Aug 28 '12 01:08 selwin

Thanks for your thoughts, @selwin!

Currently, the worker is responsible for dequeueing and unpickling the job function and the horse is responsible for execution (in an isolated context). The GeventWorker uses a different approach: the worker only spawns slaves and just waits until they're all finished. The slaves themselves are responsible for both dequeueing/unpickling and execution. The latter is not due to limitations of gevent, however. It's just implemented that way.

I've stated this before: The main reason for the current implementation has to do with stability. When you spawn a child process (with fork(), or multiprocessing, or whatever) you get an isolated execution context, which has a few nice benefits. Some of which are:

  1. If a process crashes (by a segfault in a C module for example), only the child crashes;
  2. Additionally, the worker will always be responsive and can easily kill the child after a time out;
  3. Also, memory leaks caused in the child can never affect the main worker. The child is killed after every job, so memory should never grow, even when running rqworker for long periods of time.

These characteristics I'm not willing to let go.

Therefore, I like it that Worker is responsible for the dequeueing loop. I could maybe live with a Horse that's responsible for that, but the main dequeue/unpickle/execute loop should not be fully inside the Horse, as it loses execution isolation (see 3). Two possible implementation scenarios as I see it:

  • Add another level of "spawning" under the Horse, to provide execution isolation;
  • Keep the Horse responsible for what it's doing right now: merely (isolated) execution of jobs. The Worker then stays responsible for collecting work and managing its horses, only now it manages potentially many of them.

My gut feeling says the latter approach is simpler and feels more natural to me. What we need to change, however, is that it should not make the assumption of a single horse and instead support spawning/controlling multiple horses. Besides the aspect of supporting multiple horses, we should also delegate the actual creation and managing of them to a subclass, which knows about the implementation details for doing so (e.g. fork(), multiprocessing, gevent).

What we might need is a pool of horses (?), which can be size=1 for the default (forking) worker. The Worker loop could then be:

while True:
    result = Queue.dequeue_any(self.queues, wait_for_job=True, ...)
    ...
    job, queue = result
    self.spawn_horse(job)

The spawn_horse() method would then spawn a horse to execute the given job, using a free pool slot. The nice thing about pools is that they are auto-managed (idle pool members are reused, and dying pool members are replaced).

Provided we use pool semantics to dispatch jobs to horses, and even use that for the simplest case (our current single-horse-by-forking), we have an abstraction that could work for every type of concurrency implementation. Threading, multiprocessing and gevent all support the pool model in some way or the other.

The thing that worries me most is how we can keep the signal handling / abortion behaviour the same. My experiences with multiprocessing pools are that there's little control over accessing/killing members explicitly from the managing process. I might need to familiarise myself a bit more with this and see how we would keep bullet 2 in practice. I'm not sure how compatible gevent is with respect to this, too.

More thoughts?

nvie avatar Aug 28 '12 08:08 nvie

Oh, wait a minute. I suddenly remember something.

When I first wrote the worker implementation, I did consider supporting a pool for multiple members. This is what was so nasty about it:

  • gevent.pool.spawn() typically blocks when there is no free slot—by that time, work has been read from Redis already, so if you stop the worker while spawn() is blocking, you lose a job;
  • multiprocessing.apply_async() never blocks, and instead buffers work in its internal queue—in effect, there's a chance that all work is read from Redis and gets queued up in the worker process instead. Again, you lose work if the worker is then killed (before the work is carried out).

Thinking about this again, we might need another pool primitive that works with both fork(), multiprocessing, threading, and gevent.

What we might need is something like this:

  • We should be able to "claim" pool members before reading any work from Redis. Those members are spawned (started or reused), initialised and ready to perform work immediately when handed. When no free slots are available, claim() should be blocking;
  • Pool members can be asked to die gracefully after finishing their current job.
  • There's direct access to the pool member's PIDs, in order to forcefully stop their execution (by killing their processes). I'm not sure how this even works with gevent, though. @jgelens, can you shed some light on this? Is force-killing even possible for gevent "children"?

nvie avatar Aug 28 '12 08:08 nvie

@nvie Killing greenlets is certainly possible. I'll look into the discussion above later this week and post my thoughts.

jgelens avatar Aug 28 '12 09:08 jgelens

I think making Worker responsible for dequeueing jobs and passing them on to horses is complicated for the reasons @nvie stated above, it needs to know whether there's any horses available for work before running dequeue or you'll risk losing jobs.

With regards to execution isolation, I think it just needs to be implemented within each horse's work method. So for a example (again, only for rough illustration purposes, please be gentle ;)

class Horse(object):

    def warm_shutdown(self):
        # Warm shutdown logic here

    def cold_shutdown(self):
        # Cold shutdown logic

    def fetch_job(self):
        return Queue.dequeue_any(self.queues, blocking=True, connection=self.connection)

    def perform(self, job):
        try:
            result = job.perform()
            job.status = 'finished'
        except:
            job.status = 'failed'
            # handle error
        return result

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.execute(job)

class ForkingHorse(Horse):

    def fork_and_perform_job(self, job):
        #some forking action here, calling self.perform(job)

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.fork_and_perform_job(job)

class GeventHorse(Horse):

    def __init__(self, *args, **kwargs):
        self.slaves = kwargs.pop('slaves', 1)
        self.slave_workers = []
        self.slave_counter = self.slaves
        super(GeventHorse, self).__init__(*args, **kwargs)

    def work(self, burst=False):
        # Gevent work logic here

class Worker(object):

    def warm_shutdown(self):
        self.horse.warm_shutdown()

    def cold_shutdown(self):
        self.horse.cold_shutdown()

    def dispatch_horses(self, type, *args, **kwargs):
        # start horses here
        Horse(*args, **kwargs).work()

    def work(self, burst=False, type=None, *args, **kwargs):
        # ...
        self.dispatch_horses(type=type, *args, **kwargs)
        # ...

Thoughts?

selwin avatar Aug 28 '12 11:08 selwin

Why not have the GeventHorse extend Greenlet, so that you can closely manage each process?

class GeventHorse(Horse, Greenlet):

    def __init__(self, *args, **kwargs):
        super(GeventHorse, self).__init__(*args, **kwargs)

    def warm_shutdown(self):
        # wait for this greenlet process to complete
        self.join()

    def cold_shutdown(self):
        # Kill this greenlet process
        self.kill()

    # Horse execute
    def execute(self, burst=False):
        self.start()

    # Greenlet _run
    def _run():
        # Gevent work logic here

nicholasaiello avatar Aug 28 '12 13:08 nicholasaiello

@selwin, I agree that dequeueing jobs from within the horse makes the problem much simpler. So it might not be as bad an idea as I originally thought.

gevent provides great primitives for implementing pools with blocking spawn() calls, which is exactly what we need.

However, after playing a bit with multiprocessing.Pool, I found my gut feeling to be correct: multiprocessing.Pool does not block on apply_async when all children are already busy. Instead, it queues up work internally. See this Gist for a demonstration.

We need to either monkey patch multiprocessing.Pool, subclass it to fix this behaviour, or manually come up with something similar to the way gevent works on this. FYI: I came across this SO thread that discusses something similar. The solution would be so simple, if only multiprocessing would support passing through the maxsize of its internal queue in the Pool() constructor...

nvie avatar Aug 28 '12 22:08 nvie

@nicholasaiello I don't have much experience using gevent so yes, it could probably be much improved when we get around to implementing it. Your input will be much appreciated :)

@nvie I think as long as we keep the dequeue and perform within the same process (or child processes), everything should fall in place quite elegantly. I haven't had time to play with real codes but I think something like this would work for horses with multiple slaves:

def dequeue_and_perform_loop(horse, slave):
    while True:
        job = Queue.dequeue_any(horse.queues, blocking=True, connection=self.connection)
        horse.perform_job(job)


class MultiProcessingHorse(Horse):

    def __init__(self, num_processes=1, *args, **kwargs):
        self.slaves = # init slaves
        self.pool = Pool(processes=num_processes)

    def work(self, *args, **kwargs):
        for slave in self.slaves:
            pool.apply_async(dequeue_and_perform_loop, [horse, slave])

I'll try to play around with this over the weekend.

selwin avatar Aug 29 '12 01:08 selwin

OK. This is what I'm thinking about currently:

The main worker loop could be reduced to this:

def work(self):
    """Forever, keep spawning children (this is like how a "pre-fork" server works)."""
    while True:
        self.spawn_child()

The function spawn_child() should be implemented in concrete subclasses, one for each supported concurrency technique. It must:

  1. Claim an execution slot, or block until such a free slot is available;
  2. Create a second unit of execution (process/greenlet/thread) and do all the fetch+execute work in there. When done, release the slot from that thread/process/greenlet.

Point 2 is simple. Thought should be given to how we can assure point 1. The blocking behaviour is absolutely crucial since work() will be stuck in an endless loop otherwise.

The gevent implementation is fairly simple: using gevent.pool, and pool.spawn, it natively has the desired blocking spawn behaviour already.

The forking worker implementation could use a semaphore to guard the forking. What's extra nice about this is that we gain the ability to fork multiple times, instead of the current one horse limit. The current RQ (< 0.4) implementation is comparable to a forking worker with a semaphore value of 1 (a simple lock).

For a sample implementation, see: https://gist.github.com/3541415

I'm fairly certain that a ThreadingWorker or MultiprocessingWorker would be easy to do, too.

Now, all that remains is how to fit in the signal handling stuff in there, so we keep the current behaviour no matter what concurrency mechanism is used.

But... I'd say: :sparkles: progress :sparkles:!

nvie avatar Aug 30 '12 21:08 nvie

@nvie I took a stab at porting ForkingWorker from your gist to rq, it's still in a pretty rough state and doesn't actually perform jobs yet but here's what I found from this quick experiment (feel free to correct me if I'm wrong):

  1. Worker subclasses need to implement a few methods like handle_cold_shutdown and is_busy methods to handle SIGINT
  2. Graceful termination a worker instance when a job is in progress is an issue. Unlike the original Worker implementation, the parent process waits doesn't for the child process to finish before doing another BLPOP. Perhaps we can tweak the request_stop method to loop indefinitely and check whether all children are finished before terminating.

Here's the source code https://github.com/selwin/rq/blob/new-worker/rq/new_worker.py and here's how to run it:

from rq.queue import Queue
from rq.new_worker import ForkingWorker
from redis import Redis

redis = Redis()
queue = Queue('default', connection=redis)
worker = ForkingWorker(num_processes=4, queues=[queue], connection=redis)
worker.work()

I'd appreciate any feedback.

selwin avatar Jan 18 '13 12:01 selwin

@nvie I just read your comment again for the second time and it seems like I misunderstood how you want it to be implemented. Now that I finally understand, I think the way you suggested would be simpler to implement.

selwin avatar Jan 18 '13 14:01 selwin

I'm going to be looking at this really soon! (Fixing some production issues now, first.)

nvie avatar Jan 18 '13 15:01 nvie