AsyncMachine with queued transitions breaks when transition is cancelled
Hello,
First off, thanks for the very helpful library :)
I seem to have found a bug specifically relating to AsyncMachines with queued transitions enabled.
Describe the bug When using an AsyncMachine with queued transitions, in case a transition is cancelled, the state machine stops processing triggers completely.
This seems to be related to the fact that if the processing of an enqueued transition raises a BaseException (in particular asyncio.CancelledError) here, the queue is not cleared properly. Any later transitions are then only enqueued, but processing will never be resumed.
I don't know if this issue affects other queued implementations, or only the AsyncMachine one.
Minimal working example
import asyncio
import logging
from transitions.extensions.asyncio import AsyncMachine
logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
class MyMachine(AsyncMachine):
STATES = [
'A',
'B',
'C',
]
TRANSITIONS = [
dict(
trigger='A_to_B',
source='A',
after='do_something',
dest='B',
),
dict(
trigger='B_to_C',
source='B',
after='do_something',
dest='C',
),
dict(
trigger='reset',
source='*',
after='do_something',
dest='A',
),
]
def __init__(self):
super().__init__(
states=self.STATES,
transitions=self.TRANSITIONS,
initial='A',
auto_transitions=False,
queued=True, # <== this triggers the bug
)
async def do_something(self):
_logger.info('Do something from state %s', self.state)
await asyncio.sleep(1)
_logger.info('Do something from state %s finished', self.state)
machine = MyMachine()
loop = None
async def aio_main():
global loop
loop = asyncio.get_event_loop()
try:
a_to_b = asyncio.create_task(machine.A_to_B())
await asyncio.sleep(0.5)
a_to_b.cancel()
await machine.B_to_C() # Does not do anything
await asyncio.sleep(2)
await machine.reset() # Does not do anything
# Expected result
assert machine.state == "A"
except asyncio.CancelledError:
_logger.info('Got cancelled')
if __name__ == '__main__':
asyncio.run(aio_main(), debug=True)
Expected behavior The state machine should continue processing future triggers.