kombu icon indicating copy to clipboard operation
kombu copied to clipboard

SimpleQueue.get()'s body attribute is a string when using py-amqp, but a buffer with librabbitmq

Open edmorley opened this issue 9 years ago • 6 comments
trafficstars

It's my understanding that librabbitmq is meant to be a drop-in replacement for py-amqp, that kombu will use if present and will otherwise fall back to the latter.

Using Python 2.7.11, Kombu 3.0.32 & amqp 1.4.8, I've just tried adding librabbitmq 1.6.1 to our requirements file in mozilla/treeherder#1225, however now get test failures where there were none before: https://travis-ci.org/mozilla/treeherder/jobs/100121889#L452

=============== FAILURES ===============
____ test_job_retrigger_authorized _____

webapp = <webtest.app.TestApp object at 0x7fa9d6db18d0>
eleven_jobs_stored = None
jm = <treeherder.model.derived.jobs.JobsModel object at 0x7fa9d44d4b50>
pulse_action_consumer = <kombu.simple.SimpleQueue object at 0x7fa9d4519f50>

    def test_job_retrigger_authorized(webapp, eleven_jobs_stored, jm,
                                      pulse_action_consumer):
        """
        Validate that only authenticated users can hit this endpoint.
        """
        client = APIClient()
        email = "[email protected]"
        user = User.objects.create(username="retrigger-fail", email=email)
        client.force_authenticate(user=user)

        job = jm.get_job_list(0, 1)[0]
        job_id_list = [job["id"]]
        url = reverse("jobs-retrigger",
                      kwargs={"project": jm.project})
        client.post(url, {"job_id_list": job_id_list}, format='json')

        message = pulse_action_consumer.get(block=True, timeout=2)
>       content = json.loads(message.body)

tests/webapp/api/test_jobs_api.py:148: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/opt/python/2.7.11/lib/python2.7/json/__init__.py:339: in loads
    return _default_decoder.decode(s)
/opt/python/2.7.11/lib/python2.7/json/decoder.py:364: in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <json.decoder.JSONDecoder object at 0x7fa9dd350050>
s = <read-only buffer ptr 0x3f35557, size 216 at 0x7fa9d44b1ef0>
idx = 0
    def raw_decode(self, s, idx=0):
        """Decode a JSON document from ``s`` (a ``str`` or ``unicode``
            beginning with a JSON document) and return a 2-tuple of the Python
            representation and the index in ``s`` where the document ended.

            This can be used to decode a JSON document from a string that may
            have extraneous data at the end.

            """
        try:
>           obj, end = self.scan_once(s, idx)
E           TypeError: first argument must be a string, not buffer

/opt/python/2.7.11/lib/python2.7/json/decoder.py:380: TypeError

It seems like the APIs for py-amqp and librabbitmq are not identical after all, and maybe kombu needs to handle this somehow? (Or alternatively there's a bug in amqp or librabbitmq.)

For our test run, we configure Kombu here: https://github.com/mozilla/treeherder/blob/985cd86a6de3b33a5025b7152be4f29804456eba/tests/conftest.py#L324

And the failure test is here: https://github.com/mozilla/treeherder/blob/5bdc6ab6a8cd14e1d3907a4310bc4f5bebf9048f/tests/webapp/api/test_jobs_api.py#L131

Any ideas? Thanks :-)

edmorley avatar Jan 04 '16 15:01 edmorley

tl;dr: the response from .get() on a SimpleQueue is of type string when using py-amqp, but once librabbitmq is available, the response is of type buffer.

Is this a bug in py-amqp or librabbitmq, or in the way Kombu wraps them? (If the former, I'm happy to file an issue against them)

edmorley avatar Jan 04 '16 15:01 edmorley

You should expect receiving a buffer from any of them, and this is good because it avoids copying the message body.

I don't see how SimpleQueue.get would return that since the content is automatically deserialized? Or are you receiving a string payload and deserializing manually?

ask avatar Jan 04 '16 21:01 ask

Performing a .get() on the SimpleQueue object was returning an object whose body attribute was a string when using py-amqp, which we were then deserializing manually until now. When using librabbitmq, the body attribute is instead of type buffer.

As a user, it's not expected that the behaviour should vary like this depending on which rabbitmq library is used, particularly when the Kombu docs imply that librabbitmq should just be a drop-in replacement.

Many thanks :-)

edmorley avatar Jan 04 '16 22:01 edmorley

.body is the raw message body, .payload is the deserialized payload that handles all of this stuff for you

ask avatar Jan 04 '16 23:01 ask

Ah thank you, so we should be using .payload.

That said, is it still expected that body's type varies depending on the backend?

edmorley avatar Jan 05 '16 11:01 edmorley

Excuse my ignorance, but I believe the problem I'm trying to currently figure out might be related to this issue. I don't really know Python beyond the superficial, so correct me if I'm wrong.

I'm implementing a small client library in TypeScript to queue up Celery tasks. I use Flower as well and use HTTP REST API that it offers, but I don't really want to make it so that Flower is a mandatory requirement for the servers to be able to queue up tasks.

I have protocol v1 working fine, it's v2 that's puking. The error message from my Celery worker is as follows...

[2018-04-01 07:16:57,280: ERROR/MainProcess] Pool callback raised exception: AttributeError("'str' object has no attribute 'get'",)
Traceback (most recent call last):
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'chord'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/billiard/pool.py", line 1747, in safe_apply_callback
    fun(*args, **kwargs)
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/celery/worker/request.py", line 366, in on_failure
    self.id, exc, request=self, store_result=self.store_errors,
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/celery/backends/base.py", line 165, in mark_as_failure
    if request.chord:
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/home/myuser/tmp/myproject/.pyenv/lib/python3.6/site-packages/celery/worker/request.py", line 486, in chord
    return embed.get('chord')
AttributeError: 'str' object has no attribute 'get'

I double-checked the serializers & also verified my messages several times. I tried using both pyamqp as well as librabbitmq. I'm also using Python 3.6, so librabbitmq had to be built from source (although from what I understand, pyamqp is the better option anyway). I'm using amqplib as the client library on my server to publish messages into RabbitMQ. The header/properties look like they're going through fine, it's the body that looks like it's getting a string instead of a buffer.

Am I missing something here? Any updates on the issue? Thanks! :)

xorander00 avatar Apr 02 '18 06:04 xorander00