[Bug] No clean way to cancel an activity and wait until it's cancelled
What are you really trying to do?
Start an activity from a workflow, cancel it, wait for clean cancellation acknowledgement, exit the workflow.
Describe the bug
There appear to be several issues.
- If you call
handle.cancel()on an activity handle from the workflow, theCancelledErrorwill only be raised in the activity after anactivity.heartbeat()call, followed by anawait something()call. Otherwise the activity itself doesn't seem to receiveCancelledError. This kinda sucks -- both that you need to callheartbeat()for cancellation requests to get through, and the fact that long-running coros do not get interrupted correctly. I don't understand the underlying implementation, but if the reason we need toheartbeat()is that workers don't want to poll for cancellation, still, when you callactivity.heartbeat(), the client library could check for cancellation and immediately calltask.cancel()on the task running the activity, no? Currently the work-around seems to be to litter activity code withasyncio.sleep(0.1). - From the workflow code, if you want to cleanly wait for the activity cancellation, you have to catch
ActivityError, which is a broader exception type than I actually want to catch (presumably there are otherActivityErrors which I don't want to catch). - If you use
WAIT_CANCELLATION_COMPLETED, if you try to cancel an activity that's currently awaiting a long-running coro (likeasyncio.sleep(10)), the activity won't receive aCancelledErroruntil it sends a heartbeat, which it can't do until the coro it's waiting on finishes, so your workflow won't finish until the activity's long-running coro finishes.
Minimal Reproduction
https://github.com/andmis/snippets/tree/temporal-python-sdk-cannot-cleanly-cancel-activities
Using python run_workflow.py with no args, the workflow exits promptly, but the activity never receives CancelledError, completes, and we get warning log spam:
2024-12-09 19:44:06.116611+00:00 (Workflow) Starting, use_sleep=False use_wait=False
2024-12-09 11:44:06.124587 (Activity) Running sandbox_activity
2024-12-09 19:44:07.130422+00:00 (Workflow) Cancelling activity
2024-12-09 19:44:07.130422+00:00 (Workflow) Activity cancelled
2024-12-09 19:44:07.130422+00:00 (Workflow) Exiting
2024-12-09 11:44:12.868185 (Activity) Completing sandbox_activity
2024-12-09T19:44:12.881460Z WARN temporal_sdk_core::worker::activities: Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway. task_token=TaskToken(CiRhOTc2ZWZkMy1iM2NiLTQwZmMtOWYzZi1jNjk0MzU2NjMzN2ESEHNhbmRib3gtd29ya2Zsb3caJDVhMjAyNWI3LWRhY2MtNDg4OC04NjA5LWI3NGQ3MzI2MzA3YyAFKAEyATFCEHNhbmRib3hfYWN0aXZpdHlKCAgBEIeKQBgB) details=Status { code: NotFound, message: "workflow execution already completed", details: b"\x08\x05\x12$workflow execution already completed\x1aB\[email protected]/temporal.api.errordetails.v1.NotFoundFailure", metadata: MetadataMap { headers: {"content-type": "application/grpc"} }, source: None }
Using python run_workflow.py -w, the workflow waits until the activity's long-running sleep finishes, despite the activity being cancelled (note timestamps), and the activity completes rather than being cancelled:
2024-12-09 19:45:15.119427+00:00 (Workflow) Starting, use_sleep=False use_wait=True
2024-12-09 11:45:15.127826 (Activity) Running sandbox_activity
2024-12-09 19:45:16.130733+00:00 (Workflow) Cancelling activity
2024-12-09 11:45:24.371849 (Activity) Completing sandbox_activity
2024-12-09 19:45:24.384356+00:00 (Workflow) Exiting
Using python run_workflow.py -w -s, the workflow waits for the long-running coro in the activity to finish, which is bad, and the activity does cancel rather than completing, but -s sucks:
2024-12-09 19:47:00.283375+00:00 (Workflow) Starting, use_sleep=True use_wait=True
2024-12-09 11:47:00.291593 (Activity) Running sandbox_activity
2024-12-09 19:47:01.301447+00:00 (Workflow) Cancelling activity
2024-12-09 11:47:06.547612 (Activity) Cancelling sandbox_activity
2024-12-09 19:47:06.551643+00:00 (Workflow) Activity cancelled
2024-12-09 19:47:06.551643+00:00 (Workflow) Exiting
Using python run_workflow.py -s results in the workflow exiting cleanly and promptly (since we aren't using WAIT_CANCELLATION_COMPLETED), and the activity cancels rather than completing, but still waits on the long-running coro:
2024-12-09 19:48:40.102363+00:00 (Workflow) Starting, use_sleep=True use_wait=False
2024-12-09 11:48:40.110083 (Activity) Running sandbox_activity
2024-12-09 19:48:41.121725+00:00 (Workflow) Cancelling activity
2024-12-09 19:48:41.121725+00:00 (Workflow) Activity cancelled
2024-12-09 19:48:41.121725+00:00 (Workflow) Exiting
2024-12-09 11:48:47.629316 (Activity) Cancelling sandbox_activity
Environment/Versions
OS and processor: macOS, M1 Temporal version: 1.1.2 Python SDK version: 1.8.0 Are you using Docker or Kubernetes or building Temporal from source? No
Sorry if this report is a bit garbled -- I'd say the real issue is "there does not seem to be a clean, blessed way of doing this".
If you call handle.cancel() on an activity handle from the workflow, the CancelledError will only be raised in the activity after an activity.heartbeat() call, followed by an await something() call. Otherwise the activity itself doesn't seem to receive CancelledError.
This is how Python asyncio cancellation works. It only can issue a cancel at the await point. The snippet you show completes the activity immediately after calling the non-blocking heartbeat, so I would not expect it to fail because it has completed.
From the workflow code, if you want to cleanly wait for the activity cancellation, you have to catch ActivityError, which is a broader exception type than I actually want to catch (presumably there are other ActivityErrors which I don't want to catch).
Use the __cause__ (aliased as cause) to differentiate
the activity won't receive a CancelledError until it sends a heartbeat, which it can't do until the coro it's waiting on finishes, so your workflow won't finish until the activity's long-running coro finishes.
This is the nature of heartbeating. The server only relays cancellation on heartbeating to the worker, so if you're not heartbeating you're not receiving cancellation. You can run heartbeating in a background task or you can do something like sleep for a short period and heartbeat every so often. Also, heartbeats are throttled to within 0.8 times the heartbeat timeout anyways but you are not setting the heartbeat timeout so it uses a really high default.
See:
- https://docs.temporal.io/develop/python/failure-detection#activity-heartbeats
- https://docs.temporal.io/encyclopedia/detecting-activity-failures#activity-heartbeat
- https://github.com/temporalio/sdk-python?tab=readme-ov-file#heartbeating-and-cancellation
Etc to better understand how heartbeating and activity cancellation work together.
edit: ignore this post, i misunderstood something above.
This is how Python asyncio cancellation works. It only can issue a cancel at the await point. The snippet you show completes the activity immediately after calling the non-blocking heartbeat, so I would not expect it to fail because it has completed.
It's possible for task.cancel() to interrupt a coro such as sleep:
It's possible for task.cancel() to interrupt a coro such as sleep:
Right, if use_sleep is set, that makes sense, it was just a bit strange to see an activity that could heartbeat and then complete immediately. But regardless, to the later point, in this case you are choosing to sleep without heartbeating which means the worker never even gets notified while sleeping that the activity is cancelling. You should heartbeat while sleeping if you want to cancel while sleeping.
IIUC the point of the activity issuing a heartbeat is to let the server know it's still healthy. it sounds like your advice is that i could create a separate asyncio.Task and heartbeat from that task, and then i would get CancelledError interrupting my long-running coro.
so IIUC, activity.heartbeat() basically means send_heartbeat_to_server_and_check_for_cancellation(). is that right?
i'd suggest noting in the docs the exact semantics here.
these docs make it sound like you can just handle.cancel() and that will work: https://python.temporal.io/temporalio.workflow.ActivityHandle.html
it sounds like the semantics are something like this, i'd appreciate clarification as well:
- call
handle.cancel() - from your activity code, call
activity.heartbeat() - after
handle.cancel()followed byacvitity.heartbeat(), we will calltask.cancel()on the rootasyncio.Taskrunning your activity
these docs make it sound like you can just handle.cancel() and that will work
In Python asyncio, calling cancel on a task doesn't immediately cancel always (or at all), it's up to the implementation for how to handle. Some tasks may swallow cancel, some shield, some may do cleanup first, or in our case we communicate it with a server. But it does work in that the request is sent to the server and put on history (assuming activity is not complete), it's just not necessarily processed immediately. We will see about clarifying in the docs that Python asyncio Task cancellation (and explicit Temporal workflow activity cancellation in all Temporal languages) is actually a cancellation request.
after handle.cancel() followed by acvitity.heartbeat(), we will call task.cancel() on the root asyncio.Task running your activity
Yes, though there are of course caveats concerning heartbeat throttling so it's not always immediately on heartbeating.
It looks like activity file handles extend asyncio.Task. So can one use workflow.wait_condition until activity_handle.canceled() is True? hmm that did not work for us, but checking for activity_handle.done worked
So can one use workflow.wait_condition until activity_handle.canceled() is True?
The proper way is to wait on the activity task/handle itself, catching the ActivityError and checking that the cause is CancelledError, though yes done() may work. As for cancelled helpers on activity and child workflow tasks, I opened #810 on that.
The way we went about this is the following:
try:
await workflow.execute_xxx(...)
except exceptions.ActivityError as e:
if e.retry_state == exceptions.RetryState.CANCEL_REQUESTED:
handle_explicit_cancellation_teardown()
or for a full activity example (heartbeat util from: https://github.com/temporalio/samples-python/blob/main/custom_decorator/activity_utils.py)
import asyncio
import datetime as dt
from temporalio import activity, exceptions, workflow
class HelloGoodbyeActivities:
def __init__(self):
pass
@activity.defn
@heartbeat_utils.periodic_heartbeater
async def say_hello(self):
"""Demo activity for saying hello."""
activity.logger.info("activity=say_hello fired")
for _ in range(5):
activity.logger.info("Hello!")
await asyncio.sleep(10)
@activity.defn
async def say_goodbye(self):
"""Demo activity for saying goodbye."""
activity.logger.info("Goodbye!")
@classmethod
async def hello_goodbye(cls):
try:
await workflow.execute_activity_method(
cls.say_hello,
start_to_close_timeout=dt.timedelta(minutes=1),
heartbeat_timeout=dt.timedelta(seconds=10),
)
await workflow.execute_activity_method(
cls.say_goodbye, start_to_close_timeout=dt.timedelta(seconds=10)
)
except exceptions.ActivityError as e:
if e.retry_state == exceptions.RetryState.CANCEL_REQUESTED:
workflow.logger.exception(
f"Workflow canceled: {type(e.cause)}({e}), {e.retry_state=}"
)
await workflow.execute_activity_method(
cls.say_goodbye, start_to_close_timeout=dt.timedelta(seconds=10)
)
else:
workflow.logger.exception(
f"Workflow failed: {e.cause} {type(e.cause)}({e}), {e.retry_state=}"
)
raise
except Exception as e:
workflow.logger.exception(f"Unknown exception: {e}, {e.__dict__}")
@workflow.defn
class SayHello:
@workflow.run
async def run(self) -> None:
return await HelloGoodbyeActivities.hello_goodbye()
This example would always get to the say_goodbye activity, no matter when it was cancelled (even after a worker restart).