kombu icon indicating copy to clipboard operation
kombu copied to clipboard

a callback function when Consumer.callbacks raises an exception

Open ttddyy opened this issue 11 years ago • 6 comments

Added on_callback_error on Consumer. The registered function is invoked when one of the consumer callback functions have failed(throw exception). When True is returned from the function, it will continue the rest of consumer callback functions. Otherwise, stop and raise the originally thrown exception.

I think with this global error function, queue consumer applications can cleanly separate main logic and error handling logic.

ttddyy avatar May 22 '13 00:05 ttddyy

Thanks! I wonder if it would be nicer to have it attached to the callback (so to speak), something like:

 callbacks=[promise(process_message, errback)]

There's already an amqp.utils.promise object that works this way. It's a little weird though since promises usually imply the value will be calculated once. But promises will be used more in kombu soon (for async support) and it may make sense to do it this way.

ask avatar May 29 '13 15:05 ask

Do you mean the callback list is a mixed list that contains regular callback functions and promise instances. Regular functions will be called on receive() and promise instances will be called when exception is thrown from regular functions? Or, the entire callbacks will become asynchronous calls?

If latter:

in current master, callbacks is synchronous like this:

Consumer#receive() has

[callback(body, message) for callback in callbacks]

so probably, async version, it'll change to

for callback in callbacks:
    p = promise(callback, body, message)
    # start p function asynchronously.  ex: async_manager.start(p)

Since callback-promises run asynchronously, the consumer code cannot catch the exceptions raised by promises unless it manages async calls itself. I think the async-manager needs to handle them if we want to implement global error handling.

Is there any code already checked in that has async support? Just wondering how it looks like.

Thanks,

ttddyy avatar May 29 '13 22:05 ttddyy

Evaluating a promise is implemented something like this:

class promise:
    ...
    def __call__(self, *args, **kwargs):
        try:
            self.value = self.fun(*self.args + args, **dict(self.kwargs, **kwargs))
            self.ready = True
            return self.value
        except Exception as exc:
            self.ready = self.failed = True
            self.value = exc
            self.errback(exc)

So it's impartial to temporality, everything is contained within the promise.

ask avatar Jun 25 '13 10:06 ask

There are no concrete examples of async use yet.

While there are details that cannot be hooked into, the consumer API is asynchronous already. E.g. Celery is consuming messages from the py-amqp, librabbitmq and redis transports in an event loop. The API to get this set up with an event loop is internal and experimental though, work should be made to support the new async API in Py3 (tulip).

The producer part of the API, and declaring entities is not asynchronous yet, and the API does not yet support callbacks. My plan is to support a callback-based API by using promises for this as well.

ask avatar Jun 25 '13 10:06 ask

Looks like this issue uses Milestone that is already release, propose to update to appropriate.

wowkin2 avatar Dec 03 '19 11:12 wowkin2

thanks

auvipy avatar Dec 03 '19 11:12 auvipy