sdk-python icon indicating copy to clipboard operation
sdk-python copied to clipboard

[Bug] cancelled timer callback causes asyncio.exceptions.InvalidStateError

Open dandavison opened this issue 10 months ago • 1 comments

The workflow below succeeds, but an exception occurs and is logged by the worker. In the example below this exception does not cause a workflow or workflow task failure, but we should nevertheless prevent it. What happens is:

  1. An asyncio.Task is created, which blocks on a timer.
  2. The task is cancelled, which throws asyncio.CancelledError into the coroutine, and cancels the sleep future.
  3. But SDK internals have set a callback that attempts to resolve the future. This callback still fires at the set timer time, despite the future having been cancelled in the interim, causing asyncio.exceptions.InvalidStateError. See

https://github.com/temporalio/sdk-python/blob/49ca10e413ba67e1adfeed3b5577cb4f5b007e54/temporalio/worker/_workflow_instance.py#L1453-L1457

🔴 caught asyncio.CancelledError when sleeping in task
Exception in callback _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456
handle: <_TimerHandle when=1741139892.760706 _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456>
Traceback (most recent call last):
  File "/Users/dan/.local/share/uv/python/cpython-3.13.1-macos-aarch64-none/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py", line 1456, in <lambda>
    lambda: fut.set_result(None),
            ~~~~~~~~~~~~~~^^^^^^
asyncio.exceptions.InvalidStateError: invalid state
Result: Hello, World!
import asyncio
import uuid

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker


@workflow.defn
class SayHello:
    async def _my_task(self) -> None:
        try:
            await workflow.sleep(6, summary="my task")
        except asyncio.CancelledError:
            print("🔴 caught asyncio.CancelledError when sleeping in task")

    @workflow.run
    async def run(self) -> str:
        task = asyncio.create_task(self._my_task())
        await workflow.sleep(2, summary="let the loop start")

        try:
            task.cancel()
            await task
        except BaseException:
            assert False, "unreachable"

        await workflow.sleep(15)
        return "Hello, World!"


async def main():
    client = await Client.connect("localhost:7233")
    task_queue = "timer-callback-exception-task-queue"
    async with Worker(
        client,
        task_queue=task_queue,
        workflows=[SayHello],
    ):
        result = await client.execute_workflow(
            SayHello.run,
            id=str(uuid.uuid4()),
            task_queue=task_queue,
        )
        print(f"Result: {result}")


if __name__ == "__main__":
    asyncio.run(main())

dandavison avatar Mar 05 '25 02:03 dandavison

:+1: Should just not call set_result on an already done future (and find where else we might be doing that). This is probably just us (me) not properly thinking about the fact that in Python's asyncio.Future set-result-if-already-done is not a no-op (as it is in other langs and as concurrent.futures.Future result setter was until 3.8).

cretz avatar Mar 07 '25 13:03 cretz