google-cloud-python icon indicating copy to clipboard operation
google-cloud-python copied to clipboard

Sporadic `DeadlineExceeded` GRPC errors when queuing tasks to Cloud Task

Open smallwat3r opened this issue 7 months ago • 2 comments

Determine this is the right repository

  • [x] I determined this is the correct repository in which to report this bug.

Summary of the issue

Context

We have a service that uses Cloud Task to dispatch and run long running tasks. It has been running well for months, but since the 15th May we started noticing some sporadic DeadlineExceeded errors raised from gRPC. We have not changed anything in our system or in the way we enqueue tasks.

The amount of failed tasks raising DeadlineExceeded is probably around 20%.

After seeing these new recent errors, we decided to add support for retries and a timeout when creating tasks, here is a similar configuration than the we used:

retry = Retry(
    predicate=if_exception_type(
        exceptions.TooManyRequests,
        exceptions.ServiceUnavailable,
        requests.exceptions.ConnectionError,
        requests.exceptions.ChunkedEncodingError,
        auth_exceptions.TransportError,
        exceptions.DeadlineExceeded,  # exception we started noticing
    ),
    initial=1.0,
    maximum=2.0,
    multiplier=2.0,
    timeout=15.0,  # how long to keep retrying
)
client.create_task(
    request={"parent": parent, "task": task}, timeout=5.0, retry=retry
)

We saw the tasks that were raising an exception being retried, but none of them got successfully queued before timing out, and re-raising DeadlineExceeded.

The ones that gets queued successfully are very quick, so the configured timeouts should have been enough.

After these failed attempts, the only way for us to actually make it work, and ensure 100% of our tasks are successfully put to the queue, was to switch the transport protocol from GRPC to HTTP.

client = CloudTasksClient(
    transport=CloudTasksRestTransport(credentials=...)
)

Is there anything that we are missing, or anything that has changed in the way the library is using GRPC that would explain why we started noticing this behaviour?

Expected Behavior: All our tasks should be queued successfully when retrying on sporadic DeadlineExceeded errors.

Actual Behavior: We are getting sporadic DeadlineExceeded errors, the task never gets queued, even on retry.

API client name and version

google-cloud-tasks 2.19.2

Reproduction steps: code

This is some pseudo code using the GRPC transport protocol, before we switched to using HTTP. Note that reaching the DeadlineExceeded exception seems to only happen on ~20% of the tasks we queued. The rest of the tasks were queued successfully.

file: main.py

client = CloudTasksClient()
retry = Retry(
    predicate=if_exception_type(
        exceptions.DeadlineExceeded,  # exception we started noticing
    ),
    initial=1.0,
    maximum=2.0,
    multiplier=2.0,
    timeout=15.0,  # how long to keep retrying
)
client.create_task(
    request={"parent": parent, "task": task}, timeout=5.0, retry=retry
)

Reproduction steps: supporting files

No response

Reproduction steps: actual results

No response

Reproduction steps: expected results

OS & version + platform

Debian Bookworm (12) on App Engine flex

Python environment

Python 3.12.10

Python dependencies

Here is the list of our dependencies related to gRPC

grpc-google-iam-v1                       0.14.2
grpcio                                   1.71.0
grpcio-status                            1.62.3

Additional context

Here is a sample of the traceback of the actual exception we've been seeing:

  File "/opt/.venv/lib/python3.12/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-05-21T14:14:37.173146012+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/.venv/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/opt/.venv/lib/python3.12/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/.venv/lib/python3.12/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.DeadlineExceeded: 504 Deadline Exceeded

smallwat3r avatar May 21 '25 19:05 smallwat3r

Hi @smallwat3r,

I believe this is working as intended if the task is taking longer than 5 seconds. In the code below, I believe that timeout=5.0 could trigger deadline exceeded if the request does not complete within 5 seconds. As per the documentation, timeout is the timeout for the request. DEADLINE_EXCEEDED will be raised after this time. From the gRPC documentation, If a server has gone past the deadline when processing a request, the client will give up and fail the RPC with the DEADLINE_EXCEEDED status.

https://github.com/googleapis/google-cloud-python/blob/0e1d6b09c6b901f137665a73ddc0a34d46d807ab/packages/google-cloud-tasks/google/cloud/tasks_v2/services/cloud_tasks/client.py#L2509

client.create_task(
    request={"parent": parent, "task": task}, timeout=5.0, retry=retry
)

Regrettably, It is not possible to retry the DEADLINE_EXCEEDED error using the retry argument of create_task. Instead, I would recommend to configure retry settings for a Queue via the retry_config argument. Do you have a retry configuration set up already? If not, you can use update_queue to configure this. As an example, to set a retry duration of 15 seconds, add the following configuration to your queue.

from google.cloud.tasks_v2 import RetryConfig
some_queue = client.get_queue(name=f"projects/{project}/locations/{location}/queues/{my_queue}")
retry_duration = duration_pb2.Duration()
retry_duration.FromSeconds(15) # For example, 15 seconds
some_queue.retry_config = RetryConfig(max_attempts=20, max_retry_duration=retry_duration)
client.update_queue(queue=some_queue)

You can also set a deadline or timeout for a specific task via the dispatch_deadline argument.

As an example, see the code below which is a modified version of this sample code

import datetime
import json
from typing import Dict, Optional
import uuid

from google.cloud import tasks_v2
from google.cloud.tasks_v2 import RetryConfig
from google.protobuf import duration_pb2, timestamp_pb2


def create_http_task(
    project: str,
    location: str,
    queue: str,
    url: str,
    json_payload: Dict,
    scheduled_seconds_from_now: Optional[int] = None,
    task_id: Optional[str] = None,
    deadline_in_seconds: Optional[int] = None,
) -> tasks_v2.Task:
    """Create an HTTP POST task with a JSON payload.
    Args:
        project: The project ID where the queue is located.
        location: The location where the queue is located.
        queue: The ID of the queue to add the task to.
        url: The target URL of the task.
        json_payload: The JSON payload to send.
        scheduled_seconds_from_now: Seconds from now to schedule the task for.
        task_id: ID to use for the newly created task.
        deadline_in_seconds: The deadline in seconds for task.
    Returns:
        The newly created task.
    """

    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # Construct the task.
    task = tasks_v2.Task(
        http_request=tasks_v2.HttpRequest(
            http_method=tasks_v2.HttpMethod.POST,
            url=url,
            headers={"Content-type": "application/json"},
            body=json.dumps(json_payload).encode(),
        ),
        name=(
            client.task_path(project, location, queue, task_id)
            if task_id is not None
            else None
        ),
    )

    # Convert "seconds from now" to an absolute Protobuf Timestamp
    if scheduled_seconds_from_now is not None:
        timestamp = timestamp_pb2.Timestamp()
        timestamp.FromDatetime(
            datetime.datetime.utcnow()
            + datetime.timedelta(seconds=scheduled_seconds_from_now)
        )
        task.schedule_time = timestamp

    # Convert "deadline in seconds" to a Protobuf Duration
    if deadline_in_seconds is not None:
        duration = duration_pb2.Duration()
        duration.FromSeconds(deadline_in_seconds)
        task.dispatch_deadline = duration

    # Use the client to send a CreateTaskRequest.
    return client.create_task(
        tasks_v2.CreateTaskRequest(
            # The queue to add the task to
            parent=client.queue_path(project, location, queue),
            # The task itself
            task=task,
        )
    )


# populate the project, location and queue name here
project = "<REDACTED_PROJECT>"
location = "us-central1" # as an example
my_queue = "my-queue" + uuid.uuid4().hex  # as an example

# create client
client = tasks_v2.CloudTasksClient()

# create queue
queue = client.create_queue(
    tasks_v2.CreateQueueRequest(
        parent=client.common_location_path(project, location),
        queue=tasks_v2.Queue(name=client.queue_path(project, location, my_queue)),
    )
)

# As an exmaple, get the queue that we just created)
some_queue = client.get_queue(
    name=f"projects/{project}/locations/{location}/queues/{my_queue}"
)

# As an example, create a retry config for the queue
retry_duration = duration_pb2.Duration()
retry_duration.FromSeconds(60)  # 60 seconds
some_queue.retry_config = RetryConfig(
    max_attempts=20, max_retry_duration=retry_duration
)

# As an example, update the queue
client.update_queue(queue=some_queue)

# Create a task with a 15 second deadline
create_http_task(
    project,
    location,
    my_queue,
    "https://example.com/task_handler",
    json_payload={"greeting": "hola"},
    scheduled_seconds_from_now=180,
    task_id=uuid.uuid4().hex,
    deadline_in_seconds=15,
)

Please don't hesitate to reach out if you have additional questions.

parthea avatar May 23 '25 18:05 parthea

Hi @parthea ,

Thank you for looking into this, and for the very detailed reply, this is much appreciated.

I should have added more context about things we have tried, instead of setting the timeout to 5 seconds, we also tried using the default timeout (omitting the parameter when calling create_task), or increasing it to 30 seconds. Our original setup (when we first noticed the issues) was using the default timeout, and no retry handler. Note we used this config for a long time before we started noticing the DeadlineExceeded issues a few days ago.

client.create_task(request={"parent": parent, "task": task})
client.create_task(request={"parent": parent, "task": task}, retry=retry, timeout=30)

With all the different new configs we tested, we experienced the same issue. We stuck to a 5 seconds as we thought it should still be enough time for the gRPC request to complete and queue the task to Cloud Task. This is also the timeout we are now using using the Rest transport client, and all tasks are queued successfully. Although we're not sure why this is not working anymore on all tries using gRPC.

Please correct me if I'm wrong, but adding a retry config to our queue would not help in our case, as we are not seeing the timeouts when running the task, but before, when queueing them to Cloud Task (so they can run async)? This means the tasks we tried to create using the default gRPC client never made it to Cloud Task.

smallwat3r avatar May 25 '25 15:05 smallwat3r