py-amqp icon indicating copy to clipboard operation
py-amqp copied to clipboard

If a malformed message gets into Rabbit, Celery worker fails to start

Open djsmith42 opened this issue 10 years ago • 9 comments

If a publisher manages to publish a corrupted, non-UTF8 decodable message into Rabbit, the Celery worker will fail to start. I propose that it should instead leave the message unacknowledged, log the error, and continue working on other messages.

Here's an example stack trace when this happens:

[2015-10-17 23:34:57,419: ERROR/MainProcess] Unrecoverable error: UnicodeDecodeError('utf8', 'qr=-1&qf=[date:between:2015-09-01,2015-09-Traceback (most recent call last):
  File "venv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "venv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 70, in asynloop
    next(loop)
  File "venv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 272, in create_loop
    item()
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 42, in __call__
    self.set_error_state(exc)
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 39, in __call__
    **dict(self.kwargs, **kwargs) if self.kwargs else kwargs
  File "venv/local/lib/python2.7/site-packages/kombu/transport/base.py", line 144, in _read
    drain_events(timeout=0)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 302, in drain_events
    chanmap, None, timeout=timeout,
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 365, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 336, in read_timeout
    return self.method_reader.read_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 126, in _next_method
    self._process_content_header(channel, payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 154, in _process_content_header
    partial.add_header(payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 54, in add_header
    self.msg._load_properties(payload[12:])
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 476, in _load_properties
    d[key] = getattr(r, 'read_' + proptype)()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 191, in read_item
    val = self.read_table()  # recurse
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 150, in read_item
    val = self.read_longstr()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 131, in read_longstr
    return self.input.read(slen).decode('utf-8')
  File "venv/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 80: invalid start byte

Versions:

Celery: 3.1.16 Kombu: 3.0.23

Using pyamqp://

djsmith42 avatar Oct 21 '15 00:10 djsmith42

It cannot leave the message unacknowledged, as that means it will reduce the prefetch limit.

The only other way it could deal with it is to log the event and discard the message, as requeueing it will cause a loop.

ask avatar Dec 11 '15 23:12 ask

@ask That works for me.

djsmith42 avatar Dec 12 '15 00:12 djsmith42

Correction! This bug is actually in amqp/serialization.py (separate project). The version I have (1.4.9) assumes all headers are UTF-8 encoded with this code:

    def read_longstr(self):
        """Read a string that's up to 2**32 bytes.

        The encoding isn't specified in the AMQP spec, so
        assume it's utf-8

        """
        self.bitcount = self.bits = 0
        slen = unpack('>I', self.input.read(4))[0]
        return self.input.read(slen).decode('utf-8')

But that is not a safe assumption, as header values are totally unconstrained. They could be binary data, for example.

Which is why the UnicodeDecodeError is thrown. I see that the current amqp/serialization.py code is very different in master. Does the current code not have this limitation?

djsmith42 avatar Jan 23 '16 04:01 djsmith42

Here's a minimal snippet that can repro this crash and put your Celery worker into an unrecoverable loop:

my_task.apply_async(args=['test'], headers={'foo': '\x8d'})

The '\x8d is not UTF-8, and thus when the worker tries to read the header table, it raise UnicodeDecodeError, which will take down your Celery worker and prevent it from coming back up:

[2016-01-23 04:15:51,355: ERROR/MainProcess celery.worker] Unrecoverable error: UnicodeDecodeError('utf8', '\x8d', 0, 1, 'invalid start byte')

I would love to submit a PR to fix this, but I'm not super clear on what version of py-amqp to work on. What do you recommend?

Once I know which version to work on, I'm thinking about modifying the py-amqp serialization module to ignore header values that cannot be decoded with UTF-8.

djsmith42 avatar Jan 23 '16 04:01 djsmith42

Never mind, I found the 1.4 branch. Here's my PR: https://github.com/celery/py-amqp/pull/78

djsmith42 avatar Jan 23 '16 04:01 djsmith42

Hi! I'm having the same issue with Celery 4.1.0 and kombu 4.1.0. Any idea?

danhenriquesc avatar Jan 08 '18 18:01 danhenriquesc

can you cleanly apply the patch on pyamqp?

auvipy avatar Jan 15 '18 16:01 auvipy

The bug is still present, it does not require a feedback but a simple fix...

vazir avatar Oct 09 '18 19:10 vazir

could you tell which versions of celery you are using and facing this issue?

auvipy avatar Oct 13 '18 07:10 auvipy