kombu
kombu copied to clipboard
AttributeError: 'tuple' object has no attribute 'startswith'
I get the following exception when sending a task to a non-default queue (top line of the trace is my own code):
File "/Users/misha/pn/dbi2/dbi2/server.py", line 581, in add_input
celery.absorb_input_file.apply_async(kwargs=task_kwargs)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/task.py", line 535, in apply_async
**options
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/base.py", line 729, in send_task
amqp.send_task_message(P, name, message, **options)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/amqp.py", line 552, in send_task_message
**properties
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
return fun(*args, **kwargs)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in _publish
[maybe_declare(entity) for entity in declare]
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in <listcomp>
[maybe_declare(entity) for entity in declare]
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 102, in maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 129, in maybe_declare
return _maybe_declare(entity, declared, ident, channel, orig)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 135, in _maybe_declare
entity.declare(channel=channel)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 605, in declare
self._create_exchange(nowait=nowait, channel=channel)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 612, in _create_exchange
self.exchange.declare(nowait=nowait, channel=channel)
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 181, in declare
if self._can_declare():
File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 168, in _can_declare
self.name and not self.name.startswith(
AttributeError: 'tuple' object has no attribute 'startswith'
Now, the code that raises the exception is:
def _can_declare(self):
print(self.name) # my own debugging
return not self.no_declare and (
self.name and not self.name.startswith(
INTERNAL_EXCHANGE_PREFIX))
The above print function gives me:
('celery_slow',)
so it is indeed a tuple, whereas the code is expecting a string.
If I remove my task routing by deleting this code in my celery initialization, as described here:
app.conf.task_routes = {
'dbi2.celery.absorb_input_file': {'queue': 'celery_slow'},
}
or, alternatively,
app.conf.task_routes = ([
('dbi2.celery.absorb_input_file', {'queue': 'celery_slow'}),
],)
then everything works.
How can I get my task to work with the non-default queue?
I put in some print statements at various points of the stack trace, and got this:
apply_async.options: {'queue': None, 'routing_key': None, 'exchange': None, 'priority': None, 'expires': None, 'serializer': 'json', 'delivery_mode': None, 'compression': None, 'time_limit': None, 'soft_time_limit': None, 'immediate'
: None, 'mandatory': None}
send_task.options: {'queue': <unbound Queue 'celery_slow', -> <unbound Exchange 'celery_slow',(direct)> -> 'celery_slow',>, 'serializer': 'json'}
send_task.route_name: None name: 'dbi2.celery.absorb_input_file' args: None kwargs: {'input_id': '5b0f7da01ea23059c974d2be', 'filepath': '/Users/misha/pn/dbi2/gitignore/tmp/Added via Web UI_LHHQTP', 'keep_file': False} task_type: <@t
ask: dbi2.celery.absorb_input_file of tasks at 0x10ad3eb70>
Not sure if that helps.
(dbi2) sergeyich:dbi2 misha$ pip freeze | grep celery
celery==4.1.1
(dbi2) sergeyich:dbi2 misha$ pip freeze | grep kombu
kombu==4.2.1```
@mpenkov you can try debugging at a higher level, perhaps from celery.app.routes where the configured routes are being setup. I did have a look with your examples and it seems to work properly. I am guessing that the problem is on Celery initialization steps instead of Kombu.
Thank you for the fast reply.
I managed to get it working by specifying queue as a keyword argument to the apply_async function. I'm happy to debug via celery.app.routes, but it's not on the stack trace. Where would I start? For reference, here's my celery init code:
def _connect_to_celery():
broker = 'pyamqp://%s@%s' % (config.CONFIG.ampq_user, config.CONFIG.ampq_host)
app = celery.Celery('tasks', backend='rpc://', broker=broker)
app.conf.enable_utc = True
app.conf.timezone = 'UTC'
app.conf.worker_log_format = WORKER_LOG_FORMAT
app.conf.task_routes = {
'dbi2.celery.start_job': {'queue': SLOW_QUEUE_NAME},
'dbi2.celery._absorb_input_file': {'queue': SLOW_QUEUE_NAME},
}
return app
app = _connect_to_celery()
def _absorb_input_file(input_id, filepath, keep_file=False):
pass # do application-specific stuff here
absorb_input_file = app.task(_absorb_input_file)
would love to know if this issue still persists