kombu icon indicating copy to clipboard operation
kombu copied to clipboard

gcpubsub broker isn't working yet - 'str' object has no attribute 'seconds'

Open balaji-v4d opened this issue 11 months ago • 3 comments

May be not ready yet, but i couldn't stop trying rc4, if this is premature close the bug.

$ python3 celery-rc55-test.py                                                                                                                                                           [28/1603]
Traceback (most recent call last):
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 439, in FromTimedelta
    td.seconds + td.days * _SECONDS_PER_DAY,
AttributeError: 'str' object has no attribute 'seconds'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/balaji_vinci4d_ai/celery-rc55-test.py", line 22, in <module>
    result = add.delay(4, 6)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 599, in apply_async
    return app.send_task(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/base.py", line 922, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/amqp.py", line 523, in send_task_message
    ret = producer.publish(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 190, in publish
    return _publish(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 208, in _publish
    maybe_declare(entity, retry=retry, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 107, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 112, in maybe_declare
    return _imaybe_declare(entity, channel, **retry_policy)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 167, in _imaybe_declare
    return entity.channel.connection.client.ensure(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 153, in _maybe_declare
    entity.declare(channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 617, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 628, in _create_queue
    self.queue_bind(nowait=nowait, channel=channel)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 672, in queue_bind
    return self.bind_to(self.exchange, self.routing_key,
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 681, in bind_to
    return (channel or self.channel).queue_bind(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 577, in queue_bind
    self._queue_bind(exchange, *meta)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 244, in _queue_bind
    self._create_subscription(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 315, in _create_subscription
    self.subscriber.create_subscription(
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/pubsub_v1/services/subscriber/client.py", line 887, in create_subscription
    request = pubsub.Subscription(request)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/message.py", line 728, in __init__
    pb_value = marshal.to_proto(pb_type, value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/marshal.py", line 235, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
    return self._descriptor(**value)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 449, in _internal_assign
    self.FromTimedelta(td)
  File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 443, in FromTimedelta
    raise AttributeError(
AttributeError: Fail to convert to Duration. Expected a timedelta like object got str: 'str' object has no attribute 'seconds'

deps

 $ cat requirements.txt
celery==5.5.0rc4
google-cloud-pubsub>=2.18.4
google-cloud-monitoring>=2.16.0
grpcio==1.66.2

example

$ cat celery-rc55-test.py
from celery import Celery

expiration_seconds = 86400
broker_transport_options = {
        'ack_deadline_seconds': 60,
        'polling_interval': 0.3,
        'queue_name_prefix': 'kombu-',
        'expiration_seconds': 96400,
}

broker_url = 'gcpubsub://projects/v4d-dev'
app = Celery(
    "example_pubsub_app",
    broker=broker_url,
    broker_transport_options = broker_transport_options,
)

@app.task
def add(x, y):
   return x + y

result = add.delay(4, 6)

balaji-v4d avatar Dec 23 '24 06:12 balaji-v4d

@haimjether

Nusnus avatar Dec 23 '24 11:12 Nusnus

The issue's origin seem to be incorrect dependency resolution of protobuf version.

commit https://github.com/celery/kombu/commit/e5ea1f6d58a3441c2e26e389634221aa784f7e00 was exactly for that issue: pin protobuf to a specific 4.25.5 version

google-cloud-pubsub>=2.18.4
google-cloud-monitoring>=2.16.0
grpcio==1.67.0
protobuf==4.25.5 

If I use pip to install the requirements in a fresh virtual env, the protobuf version correctly installed:

.....

Downloading protobuf-4.25.5-cp37-abi3-manylinux2014_x86_64.whl (294 kB)


pip freeze |grep protobuf
protobuf==4.25.5

However if I use uv then I have to add the --no-cache to make it download the correct protobuf version, otherwise I get the latest protobuf==5.29.2 version (which is cached, broken and causes the issue)

@balaji-v4d , can you please share steps to recreate or try adding the --no-cache flag ?

haimjether avatar Dec 24 '24 08:12 haimjether

Status?

cclauss avatar Oct 23 '25 04:10 cclauss