kombu icon indicating copy to clipboard operation
kombu copied to clipboard

Request: Accept full range of azure storage queue connection parameters

Open cbcoutinho opened this issue 4 years ago • 6 comments

I'd like to test the azure storage queue transport locally using the Azure Storage emulator, Azurite [link], which allows you to host a version of the blob and queue storage service locally using a docker image running on localhost accessible via http.

Kombu should be able to connect to it; however, the connection url it accepts doesn't make room for any other connection parameters than account_name and access_key, limiting you to e.g. the default endpoint (core.windows.net) and transport scheme (https).

Here is the full range accepted by QueueService:

QueueService(account_name=None, account_key=None, sas_token=None, is_emulated=False, protocol='https', endpoint_suffix='core.windows.net', request_session=None, connection_string=None, socket_timeout=None)

Kombu just looks at the account_name and account_key based and uses the defaults for the rest:

https://kombu.readthedocs.io/en/latest/reference/kombu.transport.azurestoragequeues.html

At a minimum, by supporting connection_string, you could use the QueueService itself to parse everything for you. After a brief look into the source, I wasn't able to quickly determine where this change would need to be made - maybe in azurestoragequeue.py or its base classvirtual.Channel?

I'd be happy to make a PR if I knew where to look.

cbcoutinho avatar Apr 13 '20 14:04 cbcoutinho

After looking at this further, the kombu library seems to be working with the old version of the azure storage queue library. The newest version of the SDK (^12.0) has apparently refactored the QueueService class into the QueueServiceClient class, which only supports authenticating via a connection string rather than account name/access key:

link to python SDK docs

cbcoutinho avatar Apr 13 '20 20:04 cbcoutinho

new SDK is incorporated in master, you could try

auvipy avatar Mar 24 '21 04:03 auvipy

I tried the latest release candidate which has the implementation of azure storage queue version 12. Therefore I changed from: broker = f"azurestoragequeues://:{access_key}@{blobstorage_name}" to: broker = f"azurestoragequeues://:{access_key}@https://{blobstorage_name}.queue.core.windows.net/" but now I am getting the following error:

Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 937, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/authentication.py", line 118, in _add_authorization_header
    signature = sign_string(self.account_key, string_to_sign)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/__init__.py", line 47, in sign_string
    key = decode_base64_to_bytes(key)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/__init__.py", line 37, in decode_base64_to_bytes
    return base64.b64decode(data)
  File "/usr/local/lib/python3.8/base64.py", line 87, in b64decode
    return binascii.a2b_base64(s)
binascii.Error: Incorrect padding

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 428, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 434, in connection_for_read
    return self.ensure_connected(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 460, in ensure_connected
    conn = conn.ensure_connection(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/connection.py", line 396, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/connection.py", line 448, in _ensure_connection
    return retry_over_time(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/utils/functional.py", line 314, in retry_over_time
    return fun(*args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/connection.py", line 897, in _connection_factory
    self._connection = self._establish_connection()
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/connection.py", line 827, in _establish_connection
    conn = self.transport.establish_connection()
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 961, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 939, in create_channel
    channel = self.Channel(connection)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/kombu/transport/azurestoragequeues.py", line 79, in __init__
    for queue in self.queue_service.list_queues():
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/paging.py", line 128, in __next__
    return next(self._page_iterator)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/paging.py", line 76, in __next__
    self._response = self._get_next(self.continuation_token)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_models.py", line 364, in _get_next_cb
    process_storage_error(error)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/response_handlers.py", line 90, in process_storage_error
    raise storage_error
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_models.py", line 358, in _get_next_cb
    return self._command(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/tracing/decorator.py", line 78, in wrapper_use_tracer
    return func(*args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_generated/operations/_service_operations.py", line 551, in list_queues_segment
    pipeline_response = self._client._pipeline.run(  # pylint: disable=protected-access
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 211, in run
    return first_node.send(pipeline_request)  # type: ignore
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  [Previous line repeated 2 more times]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/policies/_redirect.py", line 158, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/policies.py", line 535, in send
    raise err
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/policies.py", line 509, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 71, in send
    response = self.next.send(request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_base.py", line 69, in send
    _await_result(self._policy.on_request, request)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/core/pipeline/_tools.py", line 34, in await_result
    result = func(*args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/authentication.py", line 141, in on_request
    self._add_authorization_header(request, string_to_sign)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/azure/storage/queue/_shared/authentication.py", line 124, in _add_authorization_header
    raise _wrap_exception(ex, AzureSigningError)
azure.storage.queue._shared.authentication.AzureSigningError: Incorrect padding

Naeemedmo avatar Jul 11 '22 08:07 Naeemedmo

Hey @Naeemedmo, can you try with broker = f"azurestoragequeues://{access_key}@https://{blobstorage_name}.queue.core.windows.net/" (note the removed : before the access_key)

jonasmiederer avatar Jul 11 '22 10:07 jonasmiederer

Hey @jonasmiederer, I tried that and unfortunately the issue persists.

Naeemedmo avatar Jul 11 '22 11:07 Naeemedmo

I have just tried out v5.3.0a1 using the updated azurestoragequeues transport and a local Azurite instance running in docker. The updated transport works as expected when the account url is http://localhost:10001/devstoreaccount1. However, my celery worker fails to start when running inside a docker-compose configuration where the account url becomes http://azurite:10001/devstoreaccount1, where azurite is the name of the Azurite container instance. This appears to be related to an issue with the Azure python SDK: https://github.com/Azure/azure-sdk-for-python/issues/19202. Might it be worth considering the suggested workaround in that issue thread?

davidbossanyi avatar Jul 11 '22 21:07 davidbossanyi