kombu
kombu copied to clipboard
JSON decode error "Expecting value: line 1 column 1 (char 0)"
I'm using the ConsumerMixin
class to interact with json
formatted messages appearing in the filesystem of the container that is running the script. It works fine until a message that, for whatever reason, appears that is empty / not in JSON format / has been corrupted. When that happens, the script fails with an Expecting value: line 1 column 1 (char 0)
error.
This appears to be happening at the level of trying to interpret the whole message initially, not decoding the message body, as I have tried overriding the on_decode_error
method and it never gets as far as calling this.
It would be really useful to have some handling for this, as the script exiting on this error is quite a big issue.
Hey @rdauncey53 :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!
We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.
I've noticed that there is no test in test_serialisation.py
that tests for handling of an empty string (there is one that tests for handling of a None
value, but not ""
).
If I run kombu.utils.json.loads("")
, I get the following error:
Traceback (most recent call last):
File "./test.py", line 3, in <module>
loads("")
File "/home/developer/kombu/kombu/utils/json.py", line 97, in loads
return _loads(s, object_hook=object_hook)
File "/usr/lib/python3.8/json/__init__.py", line 370, in loads
return cls(**kw).decode(s)
File "/usr/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
This matches the expectation given what we have seen above - for whatever reason, we have empty message files appearing (which we are looking into) and the library is failing when it tries to decode rather than being able to handle the error.
The error makes sense since an empty JSON string is an invalid JSON value. We should improve our error handling in this case. Care to contribute a test case to our test suite?
Hi @thedrow,
There are two linked issues here. The producer seems to, for whatever reason, occasionally fail to write the message and instead leaves an empty file behind (having done a bunch of debugging work here, this is happening somewhere in the producer.publish
call, maybe when it loses connection? Although I'm unclear what losing connection to a filesystem looks like...), which then threw up issues when the consumer tried to decode the message.
Error handling would be really good here. Given that adding a test case is a couple of lines of code to add to the test suite, should this not be added at the same time as the error handling is implemented? If I add this test case now the tests will just fail.
If there isn't any error handling for this case, I'd like to see why first. It would also be useful to see the code which reproduces such a situation with the filesystem message broker.
Hi @thedrow (sorry for the slow response, I have been away),
We use this library in conjunction with an alerting tool. When an alert is received, a post_receive
method is called, which has the following block inside:
test_queue = kombu.Queue("test", routing_key="test")
self.producer.publish(
body,
declare=[test_queue],
retry=True,
serializer="json",
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
)
LOG.info(f"Successfully sent message to Filesystem Queue")
Here body
is a string containing the alert body.
The __init__
function of the class has the following:
transport_options = {
"data_folder_out": "<OUT_FOLDER>",
"data_folder_in": "<IN_FOLDER>",
}
self.connection = kombu.Connection(
"filesystem://", transport_options=transport_options
)
try:
self.connection.connect()
except Exception as e:
LOG.error("Failed to connect to File Queue")
raise RuntimeError
self.channel = self.connection.default_channel
self.producer = Producer(channel=self.channel)
(I've replaced the in and out folder paths for confidentiality reasons)
The other important thing to note is that the process that runs here is multithreaded, so multiple threads may be called the post_receive
method simultaneously. Could this be an issue?
I'm happy to create an MR with a new test case if that would be helpful.
I'm happy to create an MR with a new test case if that would be helpful.
that will be most welcome
Recently, I meet this issue. After a lot debug work , I may get a way to fix this issue. The version of kombu is v4.5.0. Let's see this file filesystem.py.
f.open()
will immediately create the msg file in workdir , Another process which consume this msg may operate this file before producer write content to this msg file.
We need add two line to fix this issue. First add the code f.flush()
after line 75 f.write(str_to_bytes(dumps(payload)))
; second add the code import time;time.sleep(1)
before line 100 try
.
@rdauncey53 @thedrow I am a new pythoner, please tell me if this work correctly,thanks!