kombu
kombu copied to clipboard
gcpubsub broker isn't working yet - 'str' object has no attribute 'seconds'
May be not ready yet, but i couldn't stop trying rc4, if this is premature close the bug.
$ python3 celery-rc55-test.py [28/1603]
Traceback (most recent call last):
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 439, in FromTimedelta
td.seconds + td.days * _SECONDS_PER_DAY,
AttributeError: 'str' object has no attribute 'seconds'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/balaji_vinci4d_ai/celery-rc55-test.py", line 22, in <module>
result = add.delay(4, 6)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 444, in delay
return self.apply_async(args, kwargs)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/task.py", line 599, in apply_async
return app.send_task(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/base.py", line 922, in send_task
amqp.send_task_message(P, name, message, **options)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/celery/app/amqp.py", line 523, in send_task_message
ret = producer.publish(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 190, in publish
return _publish(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
return fun(*args, **kwargs)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 208, in _publish
maybe_declare(entity, retry=retry, **retry_policy)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/messaging.py", line 107, in maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 112, in maybe_declare
return _imaybe_declare(entity, channel, **retry_policy)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 167, in _imaybe_declare
return entity.channel.connection.client.ensure(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
return fun(*args, **kwargs)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/common.py", line 153, in _maybe_declare
entity.declare(channel=channel)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 617, in declare
self._create_queue(nowait=nowait, channel=channel)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 628, in _create_queue
self.queue_bind(nowait=nowait, channel=channel)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 672, in queue_bind
return self.bind_to(self.exchange, self.routing_key,
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/entity.py", line 681, in bind_to
return (channel or self.channel).queue_bind(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/virtual/base.py", line 577, in queue_bind
self._queue_bind(exchange, *meta)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 244, in _queue_bind
self._create_subscription(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/kombu/transport/gcpubsub.py", line 315, in _create_subscription
self.subscriber.create_subscription(
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/pubsub_v1/services/subscriber/client.py", line 887, in create_subscription
request = pubsub.Subscription(request)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/message.py", line 728, in __init__
pb_value = marshal.to_proto(pb_type, value)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/marshal.py", line 235, in to_proto
pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/proto/marshal/rules/message.py", line 36, in to_proto
return self._descriptor(**value)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 449, in _internal_assign
self.FromTimedelta(td)
File "/home/balaji_vinci4d_ai/celery/lib/python3.10/site-packages/google/protobuf/internal/well_known_types.py", line 443, in FromTimedelta
raise AttributeError(
AttributeError: Fail to convert to Duration. Expected a timedelta like object got str: 'str' object has no attribute 'seconds'
deps
$ cat requirements.txt
celery==5.5.0rc4
google-cloud-pubsub>=2.18.4
google-cloud-monitoring>=2.16.0
grpcio==1.66.2
example
$ cat celery-rc55-test.py
from celery import Celery
expiration_seconds = 86400
broker_transport_options = {
'ack_deadline_seconds': 60,
'polling_interval': 0.3,
'queue_name_prefix': 'kombu-',
'expiration_seconds': 96400,
}
broker_url = 'gcpubsub://projects/v4d-dev'
app = Celery(
"example_pubsub_app",
broker=broker_url,
broker_transport_options = broker_transport_options,
)
@app.task
def add(x, y):
return x + y
result = add.delay(4, 6)
@haimjether
The issue's origin seem to be incorrect dependency resolution of protobuf version.
commit https://github.com/celery/kombu/commit/e5ea1f6d58a3441c2e26e389634221aa784f7e00 was exactly for that issue: pin protobuf to a specific 4.25.5 version
google-cloud-pubsub>=2.18.4
google-cloud-monitoring>=2.16.0
grpcio==1.67.0
protobuf==4.25.5
If I use pip to install the requirements in a fresh virtual env, the protobuf version correctly installed:
.....
Downloading protobuf-4.25.5-cp37-abi3-manylinux2014_x86_64.whl (294 kB)
pip freeze |grep protobuf
protobuf==4.25.5
However if I use uv then I have to add the --no-cache to make it download the correct protobuf version,
otherwise I get the latest protobuf==5.29.2 version (which is cached, broken and causes the issue)
@balaji-v4d , can you please share steps to recreate or try adding the --no-cache flag ?
Status?