[Bug] cancelled timer callback causes asyncio.exceptions.InvalidStateError
The workflow below succeeds, but an exception occurs and is logged by the worker. In the example below this exception does not cause a workflow or workflow task failure, but we should nevertheless prevent it. What happens is:
- An
asyncio.Taskis created, which blocks on a timer. - The task is cancelled, which throws
asyncio.CancelledErrorinto the coroutine, and cancels thesleepfuture. - But SDK internals have set a callback that attempts to resolve the future. This callback still fires at the set timer time, despite the future having been cancelled in the interim, causing
asyncio.exceptions.InvalidStateError. See
https://github.com/temporalio/sdk-python/blob/49ca10e413ba67e1adfeed3b5577cb4f5b007e54/temporalio/worker/_workflow_instance.py#L1453-L1457
🔴 caught asyncio.CancelledError when sleeping in task
Exception in callback _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456
handle: <_TimerHandle when=1741139892.760706 _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456>
Traceback (most recent call last):
File "/Users/dan/.local/share/uv/python/cpython-3.13.1-macos-aarch64-none/lib/python3.13/asyncio/events.py", line 89, in _run
self._context.run(self._callback, *self._args)
~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py", line 1456, in <lambda>
lambda: fut.set_result(None),
~~~~~~~~~~~~~~^^^^^^
asyncio.exceptions.InvalidStateError: invalid state
Result: Hello, World!
import asyncio
import uuid
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class SayHello:
async def _my_task(self) -> None:
try:
await workflow.sleep(6, summary="my task")
except asyncio.CancelledError:
print("🔴 caught asyncio.CancelledError when sleeping in task")
@workflow.run
async def run(self) -> str:
task = asyncio.create_task(self._my_task())
await workflow.sleep(2, summary="let the loop start")
try:
task.cancel()
await task
except BaseException:
assert False, "unreachable"
await workflow.sleep(15)
return "Hello, World!"
async def main():
client = await Client.connect("localhost:7233")
task_queue = "timer-callback-exception-task-queue"
async with Worker(
client,
task_queue=task_queue,
workflows=[SayHello],
):
result = await client.execute_workflow(
SayHello.run,
id=str(uuid.uuid4()),
task_queue=task_queue,
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
:+1: Should just not call set_result on an already done future (and find where else we might be doing that). This is probably just us (me) not properly thinking about the fact that in Python's asyncio.Future set-result-if-already-done is not a no-op (as it is in other langs and as concurrent.futures.Future result setter was until 3.8).