kombu icon indicating copy to clipboard operation
kombu copied to clipboard

Regression in 5.5.0rc1: sqlalchemy create_engine issue

Open woutdenolf opened this issue 4 months ago • 0 comments

When using kombu 5.5.0rc1 through Celery with an SQlite backend:

  File ".../site-packages/ewoksjob/client/celery/tasks.py", line 17, in execute_graph
    return send_task("ewoksjob.apps.ewoks.execute_graph", **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/celery/local.py", line 182, in __call__
    return self._get_current_object()(*a, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/celery/app/base.py", line 892, in send_task
    amqp.send_task_message(P, name, message, **options)
  File ".../site-packages/celery/app/amqp.py", line 522, in send_task_message
    ret = producer.publish(
          ^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/messaging.py", line 187, in publish
    return _publish(
           ^^^^^^^^^
  File ".../site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/messaging.py", line 203, in _publish
    [maybe_declare(entity) for entity in declare]
  File ".../site-packages/kombu/messaging.py", line 203, in <listcomp>
    [maybe_declare(entity) for entity in declare]
     ^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/messaging.py", line 107, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/common.py", line 113, in maybe_declare
    return _maybe_declare(entity, channel)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/common.py", line 153, in _maybe_declare
    entity.declare(channel=channel)
  File ".../site-packages/kombu/entity.py", line 617, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File ".../site-packages/kombu/entity.py", line 626, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File ".../site-packages/kombu/entity.py", line 655, in queue_declare
    ret = channel.queue_declare(
          ^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/transport/virtual/base.py", line 537, in queue_declare
    self._new_queue(queue, **kwargs)
  File ".../site-packages/kombu/transport/sqlalchemy/__init__.py", line 162, in _new_queue
    self._get_or_create(queue)
  File ".../site-packages/kombu/transport/sqlalchemy/__init__.py", line 141, in _get_or_create
    obj = self.session.query(self.queue_cls) \
          ^^^^^^^^^^^^
  File ".../site-packages/kombu/transport/sqlalchemy/__init__.py", line 136, in session
    _, Session = self._open()
                 ^^^^^^^^^^^^
  File ".../site-packages/kombu/transport/sqlalchemy/__init__.py", line 126, in _open
    engine = self._engine_from_config()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/kombu/transport/sqlalchemy/__init__.py", line 115, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in create_engine
  File ".../site-packages/sqlalchemy/util/deprecations.py", line 375, in warned
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File ".../site-packages/sqlalchemy/engine/create.py", line 632, in create_engine
    raise TypeError(
TypeError: Invalid argument(s) 'max_retries','interval_start','interval_max','interval_step' sent to create_engine(), using configuration SQLiteDialect_pysqlite/NullPool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.

In this part of the code the transport_options used to be empty but now contains options that create_engine does not expect

# kombu/transport/sqlalchemy/__init__.py

    def _engine_from_config(self):
        conninfo = self.connection.client
        transport_options = conninfo.transport_options.copy()
        transport_options.pop('queue_tablename', None)
        transport_options.pop('message_tablename', None)
        return create_engine(conninfo.hostname, **transport_options)
  • 5.4.2: transport_options={}
  • 5.5.0rc1: transport_options={'interval_max': 1, 'interval_start': 0, 'interval_step': 0.2, 'max_retries': 3}

Commit https://github.com/celery/kombu/pull/2148/commits/d3f06b9f5cbe87f5f9802a6b4ae02171deb70e7b from PR https://github.com/celery/kombu/pull/2148 causes the extra keys to appear in transport_options.

@thedrow Could you have a look?

woutdenolf avatar Oct 16 '24 09:10 woutdenolf