kombu icon indicating copy to clipboard operation
kombu copied to clipboard

AttributeError: 'tuple' object has no attribute 'startswith'

Open mpenkov opened this issue 7 years ago • 5 comments

I get the following exception when sending a task to a non-default queue (top line of the trace is my own code):

  File "/Users/misha/pn/dbi2/dbi2/server.py", line 581, in add_input
    celery.absorb_input_file.apply_async(kwargs=task_kwargs)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/base.py", line 729, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/amqp.py", line 552, in send_task_message
    **properties
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 605, in declare
    self._create_exchange(nowait=nowait, channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 612, in _create_exchange
    self.exchange.declare(nowait=nowait, channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 181, in declare
    if self._can_declare():
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 168, in _can_declare
    self.name and not self.name.startswith(
AttributeError: 'tuple' object has no attribute 'startswith'

Now, the code that raises the exception is:

    def _can_declare(self):
        print(self.name)  # my own debugging
        return not self.no_declare and (
            self.name and not self.name.startswith(
                INTERNAL_EXCHANGE_PREFIX))

The above print function gives me:

('celery_slow',)

so it is indeed a tuple, whereas the code is expecting a string.

If I remove my task routing by deleting this code in my celery initialization, as described here:

    app.conf.task_routes = {
        'dbi2.celery.absorb_input_file': {'queue': 'celery_slow'},
    }

or, alternatively,

    app.conf.task_routes = ([
        ('dbi2.celery.absorb_input_file', {'queue': 'celery_slow'}),
    ],)

then everything works.

How can I get my task to work with the non-default queue?

mpenkov avatar May 31 '18 04:05 mpenkov

I put in some print statements at various points of the stack trace, and got this:

apply_async.options: {'queue': None, 'routing_key': None, 'exchange': None, 'priority': None, 'expires': None, 'serializer': 'json', 'delivery_mode': None, 'compression': None, 'time_limit': None, 'soft_time_limit': None, 'immediate'
: None, 'mandatory': None}
send_task.options: {'queue': <unbound Queue 'celery_slow', -> <unbound Exchange 'celery_slow',(direct)> -> 'celery_slow',>, 'serializer': 'json'}
send_task.route_name: None name: 'dbi2.celery.absorb_input_file' args: None kwargs: {'input_id': '5b0f7da01ea23059c974d2be', 'filepath': '/Users/misha/pn/dbi2/gitignore/tmp/Added via Web UI_LHHQTP', 'keep_file': False} task_type: <@t
ask: dbi2.celery.absorb_input_file of tasks at 0x10ad3eb70>

Not sure if that helps.

mpenkov avatar May 31 '18 04:05 mpenkov

(dbi2) sergeyich:dbi2 misha$ pip freeze | grep celery
celery==4.1.1
(dbi2) sergeyich:dbi2 misha$ pip freeze | grep kombu
kombu==4.2.1```

mpenkov avatar May 31 '18 04:05 mpenkov

@mpenkov you can try debugging at a higher level, perhaps from celery.app.routes where the configured routes are being setup. I did have a look with your examples and it seems to work properly. I am guessing that the problem is on Celery initialization steps instead of Kombu.

georgepsarakis avatar May 31 '18 05:05 georgepsarakis

Thank you for the fast reply.

I managed to get it working by specifying queue as a keyword argument to the apply_async function. I'm happy to debug via celery.app.routes, but it's not on the stack trace. Where would I start? For reference, here's my celery init code:

def _connect_to_celery():
    broker = 'pyamqp://%s@%s' % (config.CONFIG.ampq_user, config.CONFIG.ampq_host)
    app = celery.Celery('tasks', backend='rpc://', broker=broker)
    app.conf.enable_utc = True
    app.conf.timezone = 'UTC'
    app.conf.worker_log_format = WORKER_LOG_FORMAT
    app.conf.task_routes = {
        'dbi2.celery.start_job': {'queue': SLOW_QUEUE_NAME},
        'dbi2.celery._absorb_input_file': {'queue': SLOW_QUEUE_NAME},
    }
    return app

app = _connect_to_celery()

def _absorb_input_file(input_id, filepath, keep_file=False):
    pass # do application-specific stuff here

absorb_input_file = app.task(_absorb_input_file)

mpenkov avatar May 31 '18 05:05 mpenkov

would love to know if this issue still persists

auvipy avatar Apr 17 '22 07:04 auvipy