rq
rq copied to clipboard
Programmatically create more workers at run-time
Is there a way to programmatically create more workers at run-time? When the work finishes the job, it simply dies. In this way, I can minimize the amount of jobs waiting in the queue.
Yes you can. But because you want the worker to die, you'll have to start
- an all-time running worker using the
rq workercommand and - on demand workers by saving the following code in some file, say
on_demand_workers.pyand then calling$ python on_demand_workers.py
Both of the command should ideally be run under a task monitor (supervisor or circus)
#!/usr/bin/python3
import sys
import time
from rq import Connection, Worker
from redis import Redis
redis = Redis(host='localhost')
def need_burst_workers():
# check database or redis key to determine whether burst worker army is required
return True #boolean
def num_burst_workers_needed():
# check the number, maybe divide the number of pending tasks by n
return 10 #integer
def main(qs):
with Connection(connection=redis):
if need_burst_workers():
[Worker(qs).work(burst=True) for i in range(num_burst_workers_needed())]
else:
time.sleep(10) #in seconds
if __name__ == '__main__':
qs = sys.argv[1:] or ['default']
main(qs)
Hope this helps. Thank you.
Worker.work is blocking call. This code creates ten workers sequentially. Wrap work method call into multiprocessing at least.
@proofit404 : I didn't realize this while answering. Thanks for pointing this out.
The updated __name__ == '__main__' block is :
import multiprocessing
def main(qs):
with Connection(connection=redis):
if need_burst_workers():
[multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start() for i in range(num_burst_workers_needed())]
else:
time.sleep(10) #in seconds
Update 0 : Using for loop instead of a list comprehension, as suggested by @proofit404 :
import multiprocessing
def main(qs):
with Connection(connection=redis):
if need_burst_workers():
for i in range(num_burst_workers_needed()):
multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start()
else:
time.sleep(10) #in seconds
Source : Stack Overflow
@shivekkhurana looks correct.
I prefer to use for loops to perform actions (like process creation) and list comprehensions for data structures. So for this case I suggest you to use for loop.
import multiprocessing def main(qs): with Connection(connection=redis): if need_burst_workers(): for i in range(num_burst_workers_needed()): multiprocessing.Process(target=Worker(qs).work, kwargs={'burst': True}).start() else: time.sleep(10) #in seconds
When I run this code snippet in Python 3.10, I get the following error:
TypeError: cannot pickle '_thread.lock' object
I believe this is because of limitations in pickle in newer versions of Python (https://github.com/rq/rq/issues/1413#issuecomment-785014951). What is the latest way to programmatically create more workers using multiprocessing?
In case it helps someone else down the road, here's how I've been able to programmatically create more workers with multiprocessing in Python 3.10.
import multiprocessing
def start_worker(queues):
from rq import Connection, Worker
with Connection():
worker = Worker(queues)
worker.work()
def main(queues):
multiprocessing.Process(target=start_worker, args=(queues,)).start()
Alternatively, I've figured out a hacky way to make a Redis instance pickleable by creating a wrapper class that stores the metadata used to instantiate Redis, and deletes/reinitializes it when it is pickled/unpickled.
from redis import Redis
class PickleableRedis(Redis):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.redis_args = args
self.redis_kwargs = kwargs
def __getstate__(self):
# Called when pickled
state = {'redis_args': self.redis_args, 'redis_kwargs': self.redis_kwargs}
return state
def __setstate__(self, state):
# Called when unpickled
self.__dict__.update(state)
super().__init__(*self.redis_args, **self.redis_kwargs)
This is a won't fix: it can be done manually (couple of examples in here), or be organized with an orchestrator (eg. K8S).
For anyone who needs to start the rqworker from a python script and getting this error -
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
The issue is that a thread needs to be created from the main process/module read more. This might help -
def start_worker(queues):
from rq import Connection, Worker
with Connection(connection=redis):
worker = Worker(queues)
worker. Work()
if __name__ == "__main__":
for i in range(num_burst_workers_needed()):
multiprocessing.Process(target=start_worker, args=(queues)).start()