autobahn-python icon indicating copy to clipboard operation
autobahn-python copied to clipboard

Cancellation of the handlers set to the "ready" event of an asyncio component are not properly handled

Open rdebroiz opened this issue 5 years ago • 15 comments

After cancelling the handlers associated with the "ready" event a _GatheringFuture exception holding the CancelledError is raised but never retrieved.

@comp.on_ready
async def go(*args, **kwargs):
    while True:
        await asyncio.sleep(1)

run(comp)

gives, after the user initiated a shutdown:

2019-02-22T16:54:47 connecting once using transport type "websocket" over endpoint "tcp"
2019-02-22T16:54:50 Shutting down due to SIGINT
2019-02-22T16:54:50 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

~~Just await the handlers solves the issue:~~

# in autobahn.util.ObservableMixin.fire
for handler in self._listener
    future = txaio.as_future(
    
    def consume_result(fut):
        return fut.result()
    
    future.add_callbacks(consume_result)
    res.append(future)

~~Related to #1000~~

rdebroiz avatar Feb 22 '19 16:02 rdebroiz

Actually I made a mistake and type add_callbacks instead of add_done_callback this raised an Exception which prevented the gather of all the handlers at the end of the function to be called.

Ironically what I did by mistake is exactly what I thought was the problem.

I don't understand what's the cause of the _GatheringFuture exception not being handled. It seems to be specific to the "ready" event.

rdebroiz avatar Feb 26 '19 11:02 rdebroiz

Looks like the problem is in the on_* handlers callback chain in autobahn.wamp.protocol.ApplicationSession and the differences in behaviour of Twisted's Deferred and asyncio Future objects which using txaio can't entirely abstract away.

Deferred's allows switching back and forth between callback and errback chains so any Exception raised later in the chain from user code is taken care of by the current code.

Once a Future is done with a successful result as happens here https://github.com/crossbario/autobahn-python/blob/master/autobahn/wamp/protocol.py#L514 only the callback chain on this Future will run. An Exception raised later in the chain when firing the 'ready' event ' https://github.com/crossbario/autobahn-python/blob/master/autobahn/wamp/protocol.py#L535 will not be handled by the next errback so the Exception from the GatheringFuture is never retrieved.

sprocket-9 avatar Feb 27 '19 18:02 sprocket-9

Thanks for your clarifications.

So if I understand well, a solution could be to add add a callback directly to the future returned by self.fire('ready', self) to handle its errors ?

something more or less like:

# this instance is now "ready"...
def create_ready_future(_):
    fut = self.fire('ready', self)
    if txaio.using_asyncio:
        def handle_errors(f):
            try:
                return f.result()
            except asyncio.CancelledError:
                pass  # maybe handle it as any other exception
            except Exception as e:
                self._swallow_error(txaio.create_failure(e), "While notifying 'ready'")
        fut.add_done_callback(handle_errors)
    return fut

txaio.add_callbacks(
    d,
    create_ready_future,
    None
)

rdebroiz avatar Feb 28 '19 12:02 rdebroiz

It's not just the 'ready' event but the whole callback chain that needs broken up. Using _swallow_error in the middle of an errback chain causes twisted and asyncio to follow different paths in the chain. I've made an attempt to workaround this problem https://github.com/spr0cketeer/autobahn-python/tree/session-lifecycle-futures-mod

There may well be a better way than this as I found it quite tricky to come up with a working solution - possibly finding an alternative to error handling and not using _swallow_error is the answer - it does go against the warning in txaio around errbacks https://txaio.readthedocs.io/en/latest/api.html#txaio.add_callbacks although to be fair its docstring does come with its own warning "Note that it cancels the error, so use with care!" but this only applies to twisted.

Anyway, please try the fix out and see if it solves your problem. If it does I'll make it a PR.

sprocket-9 avatar Mar 06 '19 02:03 sprocket-9

Hmm "different paths for Twisted vs asyncio" sounds bad. Are you saying that the Autobahn _swallow_error helper doesn't cancel the error in asyncio?

Also, yeah _swallow_error is intended to be an internal helper hence the _ prefix .. it's sort of the "last resort" thing. My intent there was that errors that otherwise would be completely unhandled are at least logged. That is, I don't see anything else that Autobahn can do here (and the "real" solution is that the user-code in question should be handling the error sensibly, if there's something better than "log it and carry on" to be done).

meejah avatar Mar 06 '19 19:03 meejah

@spr0cketeer that branch looks like it moves a chunk of code around too, can you point me to the relevant changes..?

meejah avatar Mar 06 '19 19:03 meejah

@spr0cketeer I tested your branch, it does not solves the problem, the _GatheringFuture exceptions are still not handled. but the behavior is now the same for both join and ready events

@meejah (maybe not really related but not sure an issue is needed): When one stop a component the callback attached to component.start is called as soon as the handler attached to the leave event have been completed but not the ones attached to the disconnect event. This might be a problem because if you want to properly stop the components on shutdown for instance there is no way to guarantee that the disconnect handlers have been completed before you stop the loop.

Is it the the expected behavior or is it a bug in the asyncio callbacks chain?

rdebroiz avatar Mar 07 '19 12:03 rdebroiz

@meejah:

Hmm "different paths for Twisted vs asyncio" sounds bad. Are you saying that the Autobahn _swallow_error helper doesn't cancel the error in asyncio?

Yes - try raising an Exception in user code on_join handler and you'll see it logged 3 times by _swallow_error in the errback chain.

@spr0cketeer that branch looks like it moves a chunk of code around too, can you point me to the relevant changes..?

It's a rewrite of this: https://github.com/crossbario/autobahn-python/blob/master/autobahn/wamp/protocol.py#L469-L550

To this: https://github.com/spr0cketeer/autobahn-python/blob/session-lifecycle-futures-mod/autobahn/wamp/protocol.py#L472-L570

sprocket-9 avatar Mar 07 '19 17:03 sprocket-9

@rdebroiz:

@spr0cketeer I tested your branch, it does not solves the problem, the _GatheringFuture exceptions are still not handled. but the behavior is now the same for both join and ready events

Hmm weird I'm not seeing the _GatheringFuture problem with my fix. Are you sure you've checkout out the session-lifecycle-futures-mod branch in my repo? A common mistake I often make is cloning the repo but forgetting to switch to the branch containing the actual fix!

If you are in the correct branch then let me know your test for showing it so I can reproduce. Mine is raising an Exception on on_join handler and pressing ctrl-c.

sprocket-9 avatar Mar 07 '19 17:03 sprocket-9

@rdebroiz:

If you are in the correct branch then let me know your test for showing it so I can reproduce. Mine is raising an Exception on on_join handler and pressing ctrl-c.

Actually in master branch, the _GatheringFuture message ERROR:asyncio:_GatheringFuture exception was never retrieved only shows up if I raise the Exception on on_ready not on_join

sprocket-9 avatar Mar 07 '19 17:03 sprocket-9

@spr0cketeer yes, on session-lifecycle-futures-mod, the message now shows up for both on_ready and on_join (i'm using python 3.6 btw)

I test it running an infinite loop in either on_ready or on_join then pressing ctr+c

comp = Component(
    transports="my_transport"
    realm="my_realm"
)

@comp.on_ready  # or comp.on_join
async def sleep_forever(*args, **kwargs):
    while True:
        print("ZZZzzz")
        await asyncio.sleep(1)

run([comp])

gives

2019-03-08T10:36:40 connecting once using transport type "websocket" over endpoint "tcp"
ZZZzzz
2019-03-08T10:36:42 Shutting down due to SIGINT
2019-03-08T10:36:42 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

rdebroiz avatar Mar 08 '19 09:03 rdebroiz

I played a little bit with the code at https://github.com/spr0cketeer/autobahn-python/blob/session-lifecycle-futures-mod/autobahn/asyncio/component.py#L353-L373

and I notice that if you yield to the asyncio loop before closing it, the warnings does not show up anymore. example if you replace the whole nicely_exit by:

    async def nicely_exit(signal):
        log.info("Shutting down due to {signal}", signal=signal)

        tasks = asyncio.Task.all_tasks()
        tasks.remove(asyncio.Task.current_task())
        for task in tasks:
            task.cancel()

        try:
            await asyncio.gather(*tasks)
        except asyncio.CancelledError:
            log.debug("All task cancelled")
        except Exception:
            log.error(txaio.failure_format_traceback(txaio.create_failure()))
        finally:
            await asyncio.sleep(0)  # yield to the loop one last time before stopping it
        loop.stop()

you can test it here: https://github.com/rdebroiz/autobahn-python/tree/session-lifecycle-futures-mod

rdebroiz avatar Mar 08 '19 10:03 rdebroiz

Ahhhh so the reason you're still seeing the error and I'm not is you're using run([comp]) and I've been using comp.start() and managing the loop myself. I'm effectively doing this with my test component:

loop = asyncio.get_event_loop()
fut = comp.start(loop=loop)
loop.run_until_complete(fut)

So with the fix in place and using start() I'm no longer seeing any of the GatheringFuture error messages when raising an Exception in any on_* handlers and I think I've managed to achieve a common code path for asyncio and twisted components which wasn't there before due to the use of _swallow_error in the middle of an errback chain not cancelling errors for asyncio components. So that's one thing.

The bug you're seeing now I'm not sure if it's due to how run() does things or how your test component is working or if it's actually a different bug outwith the Session lifecycle handling and more with how AB handles cancelling running tasks when it receives SIGINT. I've not had time to try out run()ing components yet or your fix for graceful shutdown so take what I say now with a big pinch of salt as it's not been thought through or tested!

Don't know if you're doing something similar in your actual app component as your test component, but I think the way your test component is written, the gathering future AB uses in fire() to wait for all user side on_* handlers to finish, will never be Done due to the while loop in your on_ready coro. Whether AB should gracefully handle this kind of scenario is another thing, and from what you say you've found a way to do this with your fix, but maybe it's a bad example of how to do things in an on_ready handler? Using loop.call_soon(my_setup_function) in your on_ready handler would allow AB's gathering future to finish and the session to be established.

Not sure. Maybe a better test of what it looks like the Session code in AB was designed to do is cope with Exceptions raised in user side on_* handlers. Can you try some tests with master and then my fix branch with doing something like: raise Exception("An on_join/ready exception") in your handlers and see what results you get. Keep using run() don't change to start() for these tests. This will at least test if the Session handling is fixed or needs more work. If it's fixed then the graceful shutdown is something else to be looked at.

sprocket-9 avatar Mar 08 '19 21:03 sprocket-9

Tried running your test Component with asyncio in debug mode: export PYTHONASYNCIODEBUG=1 This shows the GatheringFuture whose exception was never retrieved when ctrl-c is pressed - it is the GatheringFuture returned by self.fire('ready') which is never done due to your on_ready handler using a while loop, so never finishing. I don't really think this is a bug as such or that AB can handle it any better as you really want to let ABs Session handling code to complete.

To allow the GatheringFuture to finish and prevent any error messages on ctrl-c you could alter your on_ready handler to create a task for your long running code:

 comp = Component(
    transports="my_transport"
    realm="my_realm"
)

@comp.on_ready  # or comp.on_join
async def ready_handler(*args, **kwargs):
   asyncio.ensure_future(sleep_forever())

async def sleep_forever():
    while True:
        print("ZZZzzz")
        await asyncio.sleep(1)   

run([comp])

This should work on Master branch. Can you give it a try and see what results you get.

In trying to recreate your problem, my test Component used start() and raised explicit Exceptions in on_* handlers to ensure fire()s GatheringFuture failed - this uncovered a bug in the the way the Session handling code works with asyncio vs twisted. So my fix branch is still needed (or something like it) but it's fixing a different "bug" to what you're seeing.

sprocket-9 avatar Mar 09 '19 17:03 sprocket-9

This shows the GatheringFuture whose exception was never retrieved when ctrl-c is pressed - it is the GatheringFuture returned by self.fire('ready') which is never done due to your on_ready handler using a while loop, so never finishing. I don't really think this is a bug as such or that AB can handle it any better as you really want to let ABs Session handling code to complete.

IMO the gather task created by self.fire('ready') should be "done" once the on_ready handlers has either finished with success or raised an exception or been cancelled.

Let's imagine another scenario (more like "real use-case" ) without an infinite loop, which may better reflect the problem :

@comp.on_join
async def on_join(session, details):
    # result = await session.call("get.important.thing")
    # the previous call is mandatory and will takes a while, we fake it with asyncio.sleep()
    await asyncio.sleep(10)

    # do something with result  

    print("on_join done")

If on_join is cancelled (you pressed ctrl+c) before the 10 seconds has been spent (before the rpc call completed) the resulting GatheringFuture won't be retreived.

In this case you can't wrap it in a task and let asyncio schedule it because you want to await it and do something with the result later, plus this is in on_join and has to be done in order to fire the on_ready event.

rdebroiz avatar Mar 11 '19 09:03 rdebroiz