rq
rq copied to clipboard
Custom gevent worker class
Hello!
Seeing the new_workers inactive I started hacking a gevent worker class a while ago. Now that custom worker classes have landed, maybe that could be a first simple way of bringing better concurrency in rq workers?
Here is what I currently have working:
class WorkerGevent(Worker):
def get_ident(self):
return id(gevent.getcurrent())
def __init__(self, *nargs, **kwargs):
self.gevent_pool = gevent.pool.Pool(int(args.processes))
Worker.__init__(self, *nargs, **kwargs)
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
"""
def request_force_stop():
"""Terminates the application (cold shutdown).
"""
self.log.warning('Cold shut down.')
self.gevent_pool.kill()
raise SystemExit()
def request_stop():
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
gevent.signal(signal.SIGINT, request_force_stop)
gevent.signal(signal.SIGTERM, request_force_stop)
msg = 'Warm shut down requested.'
self.log.warning(msg)
# If shutdown is requested in the middle of a job, wait until
# finish before shutting down
self.log.debug('Stopping after all greenlets are finished. '
'Press Ctrl+C again for a cold shutdown.')
self._stopped = True
self.gevent_pool.join()
raise StopRequested()
gevent.signal(signal.SIGINT, request_stop)
gevent.signal(signal.SIGTERM, request_stop)
def fork_and_perform_job(self, job):
"""Spawns a gevent greenlet to perform the actual work.
"""
self.gevent_pool.spawn(self.perform_job, job)
def dequeue_job_and_maintain_ttl(self, timeout):
while True:
while not self.gevent_pool.free_count() > 0:
gevent.sleep(0.1)
try:
return Queue.dequeue_any(self.queues, timeout,
connection=self.connection)
except DequeueTimeout:
pass
self.log.debug('Sending heartbeat to prevent worker timeout.')
self.connection.expire(self.key, self.default_worker_ttl)
I forgot: something I also had trouble with was death_penalty_after(). It's not easily overloadable and I had to overload the whole perform_job() function just to replace it with the gevent implementation. It would be great to make it an instance method of worker ?
Here's my gevent version:
class gevent_death_penalty_after(object):
def __init__(self, timeout):
self._timeout = timeout
def __enter__(self):
self.setup_death_penalty()
def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except rq.timeouts.JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass
# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, JobTimeoutException being handled normally to the
# invoking context.
return False
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
a JobTimeoutException after the timeout amount (expressed in
seconds).
"""
self.gevent_timeout = gevent.Timeout(self._timeout, rq.timeouts.JobTimeoutException('Gevent Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout))
self.gevent_timeout.start()
def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
self.gevent_timeout.cancel()
Adding the necessary hooks to make implementing custom worker classes easier is definitely welcome.
May I suggest renaming the methods to setup_job_timeout
and cancel_job_timeout
though?
We also need to find a more generic name for Worker.fork_and_perform_job
.
@sylvinus, thanks a ton for putting this wrapper together! While using it I had a problem with running it in burst mode under py.test---the work() method would exit before all the child greenlet jobs had finished. To handle this case, the following modification on your dequeue_job_and_maintain_ttl() worked for me:
def dequeue_job_and_maintain_ttl(self, timeout):
while True:
while not self.gevent_pool.free_count() > 0:
gevent.sleep(0.1)
try:
job = Queue.dequeue_any(self.queues, timeout,
connection=self.connection)
# make sure all child jobs finish if queue is empty in burst
# mode
if job is None and timeout is None:
self.gevent_pool.join()
return job
except DequeueTimeout:
pass
self.log.debug('Sending heartbeat to prevent worker timeout.')
self.connection.expire(self.key, self.default_worker_ttl)
Hope someone finds it useful, and it's a case to keep in mind before merging the gevent worker.
Here is an updated version of the worker above (written against rq==0.4.6)
from __future__ import absolute_import
import signal
import gevent
import gevent.pool
from rq import Worker
from rq.timeouts import BaseDeathPenalty, JobTimeoutException
from rq.worker import StopRequested
from rq.exceptions import DequeueTimeout
class GeventDeathPenalty(BaseDeathPenalty):
def setup_death_penalty(self):
exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout)
self.gevent_timeout = gevent.Timeout(self._timeout, exception)
self.gevent_timeout.start()
def cancel_death_penalty(self):
self.gevent_timeout.cancel()
class GeventWorker(Worker):
death_penalty_class = GeventDeathPenalty
def __init__(self, queues, default_result_ttl=None, pool_size=20):
self.gevent_pool = gevent.pool.Pool(pool_size)
super(GeventWorker, self).__init__(queues, default_result_ttl=default_result_ttl)
def get_ident(self):
return id(gevent.getcurrent())
def _install_signal_handlers(self):
def request_force_stop():
self.log.warning('Cold shut down.')
self.gevent_pool.kill()
raise SystemExit()
def request_stop():
gevent.signal(signal.SIGINT, request_force_stop)
gevent.signal(signal.SIGTERM, request_force_stop)
self.log.warning('Warm shut down requested.')
self.log.debug('Stopping after all greenlets are finished. '
'Press Ctrl+C again for a cold shutdown.')
self._stopped = True
self.gevent_pool.join()
raise StopRequested()
gevent.signal(signal.SIGINT, request_stop)
gevent.signal(signal.SIGTERM, request_stop)
def execute_job(self, job):
self.gevent_pool.spawn(self.perform_job, job)
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
while True:
self.heartbeat()
while not self.gevent_pool.free_count() > 0:
gevent.sleep(0.1)
try:
result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)
if result is None and timeout is None:
self.gevent_pool.join()
break
except DequeueTimeout:
pass
self.heartbeat()
return result
@jhorman I've created gists here. You can create Your own pr fork it and we could discuss code and it's improvements there.
@lechup @jhorman Thanks for hacking around. The gist worked for me as expected, but it doesn't seem to be quitting properly using CTRL+C...
^C09:52:02 Warm shut down requested.
09:52:02 Stopping after all greenlets are finished. Press Ctrl+C again for a cold shutdown.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/gevent/hub.py", line 133, in handle
self.handler(*self.args, **self.kwargs)
File "Worker.py", line 79, in request_stop
raise StopRequested()
StopRequested
Thanks in advance. Any help on this would be appreciated!.
I suggest to comment gist on gist page not here. There are two version of it (my fork and jhorman's version). He also explained here why exactly CTRL+C doesn't work ...
ok will keep that in mind.. @lechup
I made some modifications based on @lechup and @jhorman 's version, and pack it in a package, this is the repo: https://github.com/zhangliyong/rq-gevent-worker
It's great to see all this. Perhaps we should document these alternative worker packages in http://python-rq.org . What do you think @nvie ?
I looked briefly at @zhangliyong's package and what stood out to me is that worker.set_current_job_id
is overridden to do nothing. I created a ticket to address this here https://github.com/nvie/rq/issues/392
Thanks @selwin . I'm still working on it and now test it by hand, it's not easy to do unit test with gevent, and it works well on my project now, except the worker process crashes sometimes because of the jobs, I use supervisor
to restart it automatically.
Hi is this GeventWorker able to be a official feature in rq?
That would be great indeed. Found this: https://github.com/zhangliyong/rq-gevent-worker But not very comfortable adding another lib when trying to run away from complexity. Would be great to have this builtin.
@selwin suggestion to close this in favor of https://github.com/rq/rq/issues/45 (or the other way round)
Closing this in favor of #45