TimeoutError: Deadline exceeded on activities taking greating than 120 seconds
Here's the stack trace.
2021-05-13 13:22:37,579 | ERROR | retry.py:retry_loop:29 | run failed: Deadline exceeded, retrying in 3 seconds
Traceback (most recent call last):
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 360, in recv_initial_metadata
headers = await self._stream.recv_headers()
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/protocol.py", line 349, in recv_headers
await self.headers_received.wait()
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/asyncio/locks.py", line 309, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/retry.py", line 17, in retry_loop
await fp(*args, **kwargs)
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1083, in run
decision_task: PollWorkflowTaskQueueResponse = await self.poll()
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1165, in poll
task = await self.service.poll_workflow_task_queue(
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/api/workflowservice/v1.py", line 828, in poll_workflow_task_queue
return await self._unary_unary(
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/betterproto/__init__.py", line 1133, in _unary_unary
response = await stream.recv_message()
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 408, in recv_message
await self.recv_initial_metadata()
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 380, in recv_initial_metadata
self.initial_metadata = im
File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/utils.py", line 70, in __exit__
raise self._error
asyncio.exceptions.TimeoutError: Deadline exceeded
Here's what I've uncovered:
The python-sdk starts up two parallel threads:
- One to handle workflows (runs workflow logic and determines which activity needs to run)
- One to handle activities (runs activity code)
Both threads use the same grpc channel to communicate with the temporal server with a timeout set to 120s. Concurrent RPC calls are supported according to the grpclib docs: https://grpclib.readthedocs.io/en/latest/client.html
The workflow thread polls the workflow task queue and the activity thread polls the activity task queue. Both take 60 seconds before continuing the while loop to poll again if nothing is returned. When the activity thread receives something on the activity task queue, it starts running the activity code. Meanwhile, the workflow thread is in the middle of polling the workflow task queue.
What I'm noticing is that the workflow poll request is "blocked" and doesn't return like it usually would after 60 seconds. The workflow poll request doesn't complete until the activity in the other thread is finished. If an activity takes long enough to complete, the workflow poll request can take more than 120 seconds (note the timeout mentioned earlier) causing a deadline exceeded error.
Solution:
- Figure out why the workflow poll request is "stuck" and can't complete while an activity is running in the other thread.
A couple of temporary workaround:
- Set the timeout to the max time that we expect activities could take. I don't know if there are consequences to this.
OR
- Not worry about the deadline exceeded error because when it happens, it immediately continues the while loop and polls the workflow task queue again.
- The downside to this is having this error clutter the worker logs
Hi @CGreenburg, I'm not able to reproduce this with the code below. Could you share some sample code that produces that error on your side.
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
await asyncio.sleep(200)
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main())
loop.run_forever()
We migrated a pyspark/pandas application to a workflow activity and it takes longer than 2 minutes. That's where we've seen the deadline exceeded exception. I've been replicating it using time.sleep(150).
You might want to look into https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor if your activity is CPU bound or if the APIs that you're using aren't async friendly.
There was a thread about this in the past where I proposed creating an event loop per worker (running in its own thread) so that the activity code is free to block as it pleases but they feedback I got previously was that the library should do the bare minimum and allow the user to decide how to handle blocking calls.
https://community.temporal.io/t/timeline-for-python-client-support/223/26
If you're able to, I would run my workflow and activities in separate workers.
You can also try something like this:
import asyncio
import logging
from datetime import timedelta
from threading import Thread
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
import time
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str) -> str:
time.sleep(150)
return greeting + " " + name
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello!", name)
async def worker_main(activities=False, workflows=False):
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
if activities:
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
if workflows:
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker started")
def thread1():
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main(activities=True, workflows=False))
loop.run_forever()
def thread2():
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
asyncio.ensure_future(worker_main(activities=False, workflows=True))
loop.run_forever()
if __name__ == '__main__':
t1 = Thread(target=thread1)
t2 = Thread(target=thread2)
t1.start()
t2.start()
t1.join()
t2.join()
Thanks for the suggestions @firdaus!
In your thread example, it's okay that the two workers use the same task queue? They don't need to be different task queues?
It's fine because there are separate GRPC calls behind the scenes for polling for workflow tasks and activity tasks so a workflow worker will never get the work of an activity worker.