kombu icon indicating copy to clipboard operation
kombu copied to clipboard

JSON decode error "Expecting value: line 1 column 1 (char 0)"

Open rdauncey53 opened this issue 2 years ago • 8 comments

I'm using the ConsumerMixin class to interact with json formatted messages appearing in the filesystem of the container that is running the script. It works fine until a message that, for whatever reason, appears that is empty / not in JSON format / has been corrupted. When that happens, the script fails with an Expecting value: line 1 column 1 (char 0) error.

This appears to be happening at the level of trying to interpret the whole message initially, not decoding the message body, as I have tried overriding the on_decode_error method and it never gets as far as calling this.

It would be really useful to have some handling for this, as the script exiting on this error is quite a big issue.

rdauncey53 avatar May 23 '22 08:05 rdauncey53

Hey @rdauncey53 :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.

I've noticed that there is no test in test_serialisation.py that tests for handling of an empty string (there is one that tests for handling of a None value, but not "").

If I run kombu.utils.json.loads(""), I get the following error:

Traceback (most recent call last):
  File "./test.py", line 3, in <module>
    loads("")
  File "/home/developer/kombu/kombu/utils/json.py", line 97, in loads
    return _loads(s, object_hook=object_hook)
  File "/usr/lib/python3.8/json/__init__.py", line 370, in loads
    return cls(**kw).decode(s)
  File "/usr/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

This matches the expectation given what we have seen above - for whatever reason, we have empty message files appearing (which we are looking into) and the library is failing when it tries to decode rather than being able to handle the error.

rdauncey53 avatar May 24 '22 08:05 rdauncey53

The error makes sense since an empty JSON string is an invalid JSON value. We should improve our error handling in this case. Care to contribute a test case to our test suite?

thedrow avatar Jun 13 '22 09:06 thedrow

Hi @thedrow,

There are two linked issues here. The producer seems to, for whatever reason, occasionally fail to write the message and instead leaves an empty file behind (having done a bunch of debugging work here, this is happening somewhere in the producer.publish call, maybe when it loses connection? Although I'm unclear what losing connection to a filesystem looks like...), which then threw up issues when the consumer tried to decode the message.

Error handling would be really good here. Given that adding a test case is a couple of lines of code to add to the test suite, should this not be added at the same time as the error handling is implemented? If I add this test case now the tests will just fail.

rdauncey53 avatar Jun 13 '22 11:06 rdauncey53

If there isn't any error handling for this case, I'd like to see why first. It would also be useful to see the code which reproduces such a situation with the filesystem message broker.

thedrow avatar Jun 13 '22 12:06 thedrow

Hi @thedrow (sorry for the slow response, I have been away),

We use this library in conjunction with an alerting tool. When an alert is received, a post_receive method is called, which has the following block inside:

        test_queue = kombu.Queue("test", routing_key="test")
        self.producer.publish(
            body,
            declare=[test_queue],
            retry=True,
            serializer="json",
            exchange=test_queue.exchange,
            routing_key=test_queue.routing_key,
        )
        LOG.info(f"Successfully sent message to Filesystem Queue")

Here body is a string containing the alert body.

The __init__ function of the class has the following:

        transport_options = {
            "data_folder_out": "<OUT_FOLDER>",
            "data_folder_in": "<IN_FOLDER>",
        }
        self.connection = kombu.Connection(
            "filesystem://", transport_options=transport_options
        )
        try:
            self.connection.connect()
        except Exception as e:
            LOG.error("Failed to connect to File Queue")
            raise RuntimeError

        self.channel = self.connection.default_channel
        self.producer = Producer(channel=self.channel)

(I've replaced the in and out folder paths for confidentiality reasons)

The other important thing to note is that the process that runs here is multithreaded, so multiple threads may be called the post_receive method simultaneously. Could this be an issue?

I'm happy to create an MR with a new test case if that would be helpful.

rdauncey53 avatar Jun 27 '22 15:06 rdauncey53

I'm happy to create an MR with a new test case if that would be helpful.

that will be most welcome

auvipy avatar Jun 28 '22 11:06 auvipy

Recently, I meet this issue. After a lot debug work , I may get a way to fix this issue. The version of kombu is v4.5.0. Let's see this file filesystem.py.
f.open() will immediately create the msg file in workdir , Another process which consume this msg may operate this file before producer write content to this msg file. We need add two line to fix this issue. First add the code f.flush() after line 75 f.write(str_to_bytes(dumps(payload))) ; second add the code import time;time.sleep(1) before line 100 try . @rdauncey53 @thedrow I am a new pythoner, please tell me if this work correctly,thanks!

withnate1216 avatar May 24 '23 11:05 withnate1216