tornado-celery icon indicating copy to clipboard operation
tornado-celery copied to clipboard

Add callbacks for sent task ack, sent task and reshape API as retrieving result from AsyncResult.get() (fix #38)

Open jimhorng opened this issue 10 years ago • 1 comments

It's kind of large commit, since the small commits are hard to resolve merge conflicts while creating this PR.

Give user 3 options to listen for callbacks for async send task operations

  1. After task sent (fix #38 )
  2. After task sent and ack-ed (fix #38 )
  3. To fit original celery behavior that task.apply_async() to get the AsyncResult first, then AsyncResult.get() to get actual task result in tornado asynchronous fashion

Usage

Calling Celery tasks(has return value) from Tornado RequestHandler: ::

from tornado import gen, web
import tcelery, tasks

tcelery.setup_nonblocking_producer()

class AsyncHandler(web.RequestHandler):
    @web.asynchronous
    def get(self):
        tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)

    def on_async_result(self, async_result):
        async_result.get(callback=self.on_actual_result)

    def on_actual_result(self, result):
        self.write(str(result))
        self.finish()

with generator-based interface: ::

class GenAsyncHandler(web.RequestHandler):
    @web.asynchronous
    @gen.coroutine
    def get(self):
        async_result = yield gen.Task(tasks.sleep.apply_async, args=[3])
        result = yield gen.Task(async_result.get)
        self.write(str(result))
        self.finish()

Calling Celery tasks(no return value) from Tornado RequestHandler: ::

@web.asynchronous
def get(self):
    tasks.echo.apply_async(args=['Hello world!'], callback=self.on_async_result)

def on_async_result(self, async_result):
    self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
    self.finish()

with generator-based interface: ::

@web.asynchronous
@gen.coroutine
def get(self):
    yield gen.Task(tasks.sleep.apply_async, args=[3])
    self.write("task sent") # ack-ed if BROKER_TRANSPORT_OPTIONS: {'confirm_publish': True}
    self.finish()

See updated README.rst for api usage details.

Functional quality

  • All function tests are passed, including amqp, redis backend
  • Works in connection pool where connections >= 2
  • Works in re-connect scenario, including features as wait for publish ack...etc.
  • If no backend is configured, behaves the same as original celery that uses DisabledBackend

jimhorng avatar Oct 22 '14 18:10 jimhorng

@mher any review comment on this PR? thanks :)

jimhorng avatar Apr 17 '15 21:04 jimhorng