pyee icon indicating copy to clipboard operation
pyee copied to clipboard

Celery and RQ support

Open jfhbrook opened this issue 5 years ago • 4 comments

6.0.0 has a way of supporting fairly arbitrary backends. That means that we could make an event emitter that can hook onto a celery app. Maybe like:

from celery import Celery
from pyee import CeleryEventEmitter

app = Celery('events', broker='pyamqp://guest@localhost//')

# Creates a unique @app.task that gets emit task calls for this
# EE from celery and executes standard emit behavior
ee = CeleryEventEmitter(app=app)

# Register handlers like normal
@ee.on('data')
def handler(data):
    print(data)

then you can spin up workers like normal:

celery -A example_app worker --loglevel=info

and emit from your main program:

from example_app import ee

# Calls our generated celery task with the payload;'
# actual emit code happens on servers
ee.emit('data', dict(some='data'))

See: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application

jfhbrook avatar Apr 13 '19 04:04 jfhbrook

Likely in this case there would be no special error handling, since celery doesn't by default try to do anything smart with errors either.

jfhbrook avatar Apr 13 '19 04:04 jfhbrook

RQ is a little weirder:

http://python-rq.org/

from pyee import RQEventEmitter

# Sets a (name, BaseEventEmitter()) key/val pair in a
# shared dict namespace. The worker and the
# client have to agree on these names, so no
# autogeneration on the fly unfortunately.
ee = RQEventEmitter(name='a_unique_deterministic_name')

# Registers the event with the underlying base EE
@ee.on('data')
def handler(data):
    print(data)

Then you spin up your rq worker:

rq worker

and then call the EE in your app

from redis import Redis
from rq import Queue

from sample_app import ee

# You only need to set up a queue object if you're emitting,
# so in this example we do it after ee is assembled
ee.queue = Queue(connection=Redis())

# Enqueues a global event handler function (pyee._rq.emit or
# something) with the name of the EE, the name of the event,
# and the payload. On the worker, the underlying base EE gets
# called.
ee.emit('data', dict(some='data'))

RQ seems to allow returning results but doesn't require us to do so.

Error handling is a little involved: http://python-rq.org/docs/exceptions/ I think the best bet is going to be to have no special behavior, meaning that because everything is running synchronously by default raised exceptions will trigger the standard DLQ behavior.

jfhbrook avatar Apr 13 '19 04:04 jfhbrook

Fun fact, Celery supports not just rabbit but also redis SQS and *checks notes* zookeeper

http://docs.celeryproject.org/en/latest/getting-started/brokers/

jfhbrook avatar Apr 13 '19 18:04 jfhbrook

One edge case here is what happens if a worker emits an event on its own emitter.

I would hope that celery handles this - that if you call app.handler on the worker it gets inserted into the queue like everything else. I would hope!

I think RQ doesn't have any special handling here, but I think we can say that if the queue is set then we submit to the queue, and otherwise either raise or emit on the underlying (configurable?).

jfhbrook avatar Apr 13 '19 18:04 jfhbrook