rq icon indicating copy to clipboard operation
rq copied to clipboard

Programmatically create more workers at run-time

Open sivabudh opened this issue 9 years ago • 6 comments

Is there a way to programmatically create more workers at run-time? When the work finishes the job, it simply dies. In this way, I can minimize the amount of jobs waiting in the queue.

sivabudh avatar Apr 07 '16 13:04 sivabudh

Yes you can. But because you want the worker to die, you'll have to start

  • an all-time running worker using the rq worker command and
  • on demand workers by saving the following code in some file, say on_demand_workers.py and then calling $ python on_demand_workers.py

Both of the command should ideally be run under a task monitor (supervisor or circus)

#!/usr/bin/python3

import sys
import time
from rq import Connection, Worker
from redis import Redis

redis = Redis(host='localhost')

def need_burst_workers():
    # check database or redis key to determine whether burst worker army is required
    return True #boolean

def num_burst_workers_needed():
    # check the number, maybe divide the number of pending tasks by n
    return 10 #integer

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            [Worker(qs).work(burst=True) for i in range(num_burst_workers_needed())]
        else:
            time.sleep(10) #in seconds

if __name__ == '__main__':
    qs = sys.argv[1:] or ['default']
    main(qs)

Hope this helps. Thank you.

shivekkhurana avatar Aug 23 '16 06:08 shivekkhurana

Worker.work is blocking call. This code creates ten workers sequentially. Wrap work method call into multiprocessing at least.

proofit404 avatar Aug 23 '16 07:08 proofit404

@proofit404 : I didn't realize this while answering. Thanks for pointing this out.

The updated __name__ == '__main__' block is :

import multiprocessing

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            [multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start() for i in range(num_burst_workers_needed())]
        else:
            time.sleep(10) #in seconds

Update 0 : Using for loop instead of a list comprehension, as suggested by @proofit404 :

import multiprocessing

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            for i in range(num_burst_workers_needed()):
                multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start()
        else:
            time.sleep(10) #in seconds

Source : Stack Overflow

shivekkhurana avatar Aug 23 '16 08:08 shivekkhurana

@shivekkhurana looks correct.

I prefer to use for loops to perform actions (like process creation) and list comprehensions for data structures. So for this case I suggest you to use for loop.

proofit404 avatar Aug 23 '16 09:08 proofit404

import multiprocessing

def main(qs):
    with Connection(connection=redis):
        if need_burst_workers():
            for i in range(num_burst_workers_needed()):
                multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start()
        else:
            time.sleep(10) #in seconds

When I run this code snippet in Python 3.10, I get the following error:

TypeError: cannot pickle '_thread.lock' object

I believe this is because of limitations in pickle in newer versions of Python (https://github.com/rq/rq/issues/1413#issuecomment-785014951). What is the latest way to programmatically create more workers using multiprocessing?

e13h avatar Apr 17 '22 06:04 e13h

In case it helps someone else down the road, here's how I've been able to programmatically create more workers with multiprocessing in Python 3.10.

import multiprocessing

def start_worker(queues):
    from rq import Connection, Worker

    with Connection():
        worker = Worker(queues)
        worker.work()

def main(queues):
    multiprocessing.Process(target=start_worker, args=(queues,)).start()

Alternatively, I've figured out a hacky way to make a Redis instance pickleable by creating a wrapper class that stores the metadata used to instantiate Redis, and deletes/reinitializes it when it is pickled/unpickled.

from redis import Redis

class PickleableRedis(Redis):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_args = args
        self.redis_kwargs = kwargs

    def __getstate__(self):
        # Called when pickled
        state = {'redis_args': self.redis_args, 'redis_kwargs': self.redis_kwargs}
        return state

    def __setstate__(self, state):
        # Called when unpickled
        self.__dict__.update(state)
        super().__init__(*self.redis_args, **self.redis_kwargs)

e13h avatar May 12 '22 20:05 e13h

This is a won't fix: it can be done manually (couple of examples in here), or be organized with an orchestrator (eg. K8S).

ccrvlh avatar Jan 27 '23 01:01 ccrvlh

For anyone who needs to start the rqworker from a python script and getting this error -

 An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

The issue is that a thread needs to be created from the main process/module read more. This might help -

def start_worker(queues):
    from rq import Connection, Worker

    with Connection(connection=redis):
        worker = Worker(queues)
        worker. Work()

if __name__ == "__main__":
    for i in range(num_burst_workers_needed()):
        multiprocessing.Process(target=start_worker, args=(queues)).start()

mechaadi avatar Feb 13 '23 07:02 mechaadi