tornado-celery
tornado-celery copied to clipboard
Add callbacks for sent task ack, sent task and reshape API as retrieving result from AsyncResult.get() (fix #38)
It's kind of large commit, since the small commits are hard to resolve merge conflicts while creating this PR.
Give user 3 options to listen for callbacks for async send task operations
- After task sent (fix #38 )
- After task sent and ack-ed (fix #38 )
- To fit original celery behavior that
task.apply_async()
to get theAsyncResult
first, thenAsyncResult.get()
to get actual task result in tornado asynchronous fashion
Usage
Calling Celery tasks(has return value) from Tornado RequestHandler: ::
from tornado import gen, web
import tcelery, tasks
tcelery.setup_nonblocking_producer()
class AsyncHandler(web.RequestHandler):
@web.asynchronous
def get(self):
tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)
def on_async_result(self, async_result):
async_result.get(callback=self.on_actual_result)
def on_actual_result(self, result):
self.write(str(result))
self.finish()
with generator-based interface: ::
class GenAsyncHandler(web.RequestHandler):
@web.asynchronous
@gen.coroutine
def get(self):
async_result = yield gen.Task(tasks.sleep.apply_async, args=[3])
result = yield gen.Task(async_result.get)
self.write(str(result))
self.finish()
Calling Celery tasks(no return value) from Tornado RequestHandler: ::
@web.asynchronous
def get(self):
tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)
def on_async_result(self, async_result):
self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
self.finish()
with generator-based interface: ::
@web.asynchronous
@gen.coroutine
def get(self):
yield gen.Task(tasks.sleep.apply_async, args=[3])
self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
self.finish()
See updated README.rst for api usage details.
Functional quality
- All function tests are passed, including amqp, redis backend
- Works in connection pool where connections >= 2
- Works in re-connect scenario, including features as wait for publish ack...etc.
- If no backend is configured, behaves the same as original celery that uses DisabledBackend
@mher any review comment on this PR? thanks :)