txaio icon indicating copy to clipboard operation
txaio copied to clipboard

aio._AsyncioApi.resolve() must care for cancelled future

Open blinkingmatt opened this issue 7 years ago • 1 comments

When setting the result for a Future object with Future.set_result() for a cancelled future we get an InvalidStateError. So I would suggest a small addition in the resolve() method:

def resolve(self, future, result=None):
    if not future.cancelled():
        future.set_result(result)

blinkingmatt avatar Jan 19 '18 15:01 blinkingmatt

I have run into the same problem today. I think it is very severe! Basically if a wamp call is performed, and the outer task is cancelled, txaio raises an InvalidState exception which brings the connection down.

I cannot see why cancelling a task while it "awaits" a wamp call is an uncommon scenario. Below is an example which triggers the behavior.

The suggestion above or a try-except block seem to work fine in my tests

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class ServerComponent(ApplicationSession):
    async def onJoin(self, details):
        async def long_op():
            print("long_op")
            await asyncio.sleep(5)
        await self.register(long_op, "com.myapp.long_op")

class ClientComponent(ApplicationSession):
    async def _do(self):
        try:
            # this call will take a while
            res = await self.call("com.myapp.long_op")
        except asyncio.CancelledError as err:
            # so far so good, the task is cancelled correctly, but InvalidStateError occurs as soon as "long_op" returns
            print(repr(err))

    async def onJoin(self, details):
        f = asyncio.ensure_future(self._do())
        # cancel the task before long_op returns
        asyncio.get_event_loop().call_later(2, f.cancel)

    def onDisconnect(self):
        pass

if __name__ == "__main__":
    realm = "motesque"
    client_runner = ApplicationRunner("ws://127.0.0.1:9090/ws", realm)
    server_runner = ApplicationRunner("ws://127.0.0.1:9090/ws", realm)

    async def run():
        await server_runner.run(ServerComponent, start_loop=False)
        await client_runner.run(ClientComponent, start_loop=False)
        for i in range(0, 10):
            print("loop")
            await asyncio.sleep(1)

    asyncio.get_event_loop().run_until_complete(run())

tlangmo avatar Aug 21 '18 16:08 tlangmo