txaio
txaio copied to clipboard
aio._AsyncioApi.resolve() must care for cancelled future
When setting the result for a Future object with Future.set_result() for a cancelled future we get an InvalidStateError. So I would suggest a small addition in the resolve() method:
def resolve(self, future, result=None):
if not future.cancelled():
future.set_result(result)
I have run into the same problem today. I think it is very severe! Basically if a wamp call is performed, and the outer task is cancelled, txaio raises an InvalidState exception which brings the connection down.
I cannot see why cancelling a task while it "awaits" a wamp call is an uncommon scenario. Below is an example which triggers the behavior.
The suggestion above or a try-except block seem to work fine in my tests
import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
class ServerComponent(ApplicationSession):
async def onJoin(self, details):
async def long_op():
print("long_op")
await asyncio.sleep(5)
await self.register(long_op, "com.myapp.long_op")
class ClientComponent(ApplicationSession):
async def _do(self):
try:
# this call will take a while
res = await self.call("com.myapp.long_op")
except asyncio.CancelledError as err:
# so far so good, the task is cancelled correctly, but InvalidStateError occurs as soon as "long_op" returns
print(repr(err))
async def onJoin(self, details):
f = asyncio.ensure_future(self._do())
# cancel the task before long_op returns
asyncio.get_event_loop().call_later(2, f.cancel)
def onDisconnect(self):
pass
if __name__ == "__main__":
realm = "motesque"
client_runner = ApplicationRunner("ws://127.0.0.1:9090/ws", realm)
server_runner = ApplicationRunner("ws://127.0.0.1:9090/ws", realm)
async def run():
await server_runner.run(ServerComponent, start_loop=False)
await client_runner.run(ClientComponent, start_loop=False)
for i in range(0, 10):
print("loop")
await asyncio.sleep(1)
asyncio.get_event_loop().run_until_complete(run())