bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

fix(python): fixed getting data from rawData to fromJSON

Open YeaMerci opened this issue 1 year ago • 7 comments

Hi! 👋 I fixed issues https://github.com/taskforcesh/bullmq/issues/2010

The fromJSON method takes a rawData argument, but if you print it out, it becomes clear that the keys and values ​​are byte strings. The difference is that the hash for the seemingly identical string 'hi' and b'hi' will be different. Despite this, the method tries to get non-existent keys, and if they do, the value is also byte strings. I believe this has been causing a lot of bugs since 2023.

I added a function that decodes byte strings and processes rawData, wrote documentation for it, refactored fromJSON and then tested it.

I really need this library now and would not like to fork and delegate such responsibility to myself, so please approve 😉

YeaMerci avatar Aug 08 '24 00:08 YeaMerci

hi @YeaMerci, looks like python tests are hanging

roggervalf avatar Aug 08 '24 02:08 roggervalf

@roggervalf I ran tests locally on my modification, and an error appeared:

Traceback (most recent call last):
  File "/var/data/python/lib/python3.11/site-packages/redis/asyncio/connection.py", line 218, in __del__
    self._close()
  File "/var/data/python/lib/python3.11/site-packages/redis/asyncio/connection.py", line 225, in _close
    self._writer.close()
  File "/usr/lib/python3.11/asyncio/streams.py", line 358, in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/selector_events.py", line 864, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.11/asyncio/base_events.py", line 762, in call_soon
    self._check_closed()
  File "/usr/lib/python3.11/asyncio/base_events.py", line 520, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

However, I cloned the repository before my changes and ran tests to make sure they were working, but I got the same error.

The only difference is that with my modification, you can now get data from tasks and somehow process it, after which the tasks are created correctly, which was not the case before. We can somehow work with this.

Although I think it's worth working on the tests. I think my modification is not the cause of this error. Maybe it's something to do with the tests themselves.

YeaMerci avatar Aug 08 '24 11:08 YeaMerci

tests are still passing without your change in other pipelines unfortunately

roggervalf avatar Aug 09 '24 12:08 roggervalf

Not sure what this PR is fixing, @YeaMerci could you please clarify a bit?

manast avatar Aug 09 '24 13:08 manast

@manast

Not sure what this PR is fixing, @YeaMerci could you please clarify a bit?

Sure.

I was trying to use the python version of bullmq in a work project. To begin the test, I created a queue and connected to the test redis. I added a couple of tasks and saw that everything was working.

I then created a Worker and a test function whose job was simply to output data in a task (Job.data). I started submitting tasks, but noticed that no matter what data was input, only an empty dictionary "{}" was output.

After that, I went into the source code (job.py), where I saw the fromJSON method in the Job class. Then I saw the problem.:

    @staticmethod
    def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
        """
        Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)

        @param queue: the queue where the job belongs to.
        @param json: the plain object containing the job.
        @param jobId: an optional job id (overrides the id coming from the JSON object)
        """
        data = json.loads(rawData.get("data", '{}'))
        opts = optsFromJSON(json.loads(rawData.get("opts", '{}')))

        job = Job(queue, rawData.get("name"), data, opts)
        job.id = jobId or rawData.get("id", b'').decode("utf-8")

        job.progress = json.loads(rawData.get("progress",  '0'))
        job.delay = int(rawData.get("delay", "0"))
        job.timestamp = int(rawData.get("timestamp", "0"))

        if rawData.get("finishedOn"):
            job.finishedOn = int(rawData.get("finishedOn"))

        if rawData.get("processedOn"):
            job.processedOn = int(rawData.get("processedOn"))

        if rawData.get("rjk"):
            job.repeatJobKey = rawData.get("rjk")

        if rawData.get("ats"):
            job.attemptsStarted = int(rawData.get("ats"))

        job.failedReason = rawData.get("failedReason")
        job.attemptsMade = int(rawData.get("attemptsMade") or rawData.get("atm") or "0")

        returnvalue = rawData.get("returnvalue")
        if type(returnvalue) == str:
            job.returnvalue = getReturnValue(returnvalue)

        job.stacktrace = json.loads(rawData.get("stacktrace", "[]"))

        if rawData.get("parentKey"):
            job.parentKey = rawData.get("parentKey")

        if rawData.get("parent"):
            job.parent = json.loads(rawData.get("parent"))

        return job

The fact is that lua scripts return byte strings, so the keys and values ​​of the rawData dictionary of this method are not just strings, but byte strings, but during the course of this method they try to get values ​​from the dictionary as if it did not contain byte strings.

In other words, you are trying to get values ​​for non-existent keys, and here is an example of rawData, which comes as the input of the fromJSON method:

rawData = {
    b'name': b'__default__',
    b'data': b'{"foo":"bar"}',
    b'opts': b'{"delay":0,"attempts":0}',
    b'timestamp': b'1723027780902',
    b'delay': b'0',
    b'priority': b'0',
    b'ats': b'22',
    b'stalledCounter': b'21',
    b'processedOn': b'1723066399673'
}

As you can see, the keys and values ​​of the dictionary are byte strings, you can read what they are here.

However, for example b"data" is not equal to "data". That is:

print(b"data" == "data")
print("data" == "data")

>>> False
>>> True

Now let’s try to access rawData and get task (job) data from the queue as it is implemented in fromJSON:

import json

rawData = {
    b'name': b'__default__',
    b'data': b'{"foo":"bar"}',
    b'opts': b'{"delay":0,"attempts":0}',
    b'timestamp': b'1723027780902',
    b'delay': b'0',
    b'priority': b'0',
    b'ats': b'22',
    b'stalledCounter': b'21',
    b'processedOn': b'1723066399673'
}

data = json.loads(rawData.get("data", '{}'))

print(data)

>>> {}

Thus, data can never be retrieved from the queue until either the keys and values ​​are decoded, or until you access the values ​​using existing keys.

That is, if we do this:

import json

rawData = {
    b'name': b'__default__',
    b'data': b'{"foo":"bar"}',
    b'opts': b'{"delay":0,"attempts":0}',
    b'timestamp': b'1723027780902',
    b'delay': b'0',
    b'priority': b'0',
    b'ats': b'22',
    b'stalledCounter': b'21',
    b'processedOn': b'1723066399673'
}

data = json.loads(rawData.get(b"data", '{}'))

print(data)

>>> {'foo': 'bar'}

So you either need to decode the byte strings to access the same keys, or add b to search for existing byte keys.

In short, because of b the hash will be different and you will never get the values. It's best to decode byte strings to be sure it won't cause errors in the future.

If I'm not mistaken, then lua scripts in such a case will return byte strings not only in python and this is normal.

Considering that this way you are trying to get almost all the values ​​from rawData, then I have no idea how it even worked

YeaMerci avatar Aug 10 '24 07:08 YeaMerci

The logic is broken. It's a good idea to make a simple test that checks whether it is possible to obtain data from the Job.

YeaMerci avatar Aug 10 '24 07:08 YeaMerci

tests are still passing without your change in other pipelines unfortunately

Then can you please tell me which specific test is failing?

YeaMerci avatar Aug 10 '24 11:08 YeaMerci

@roggervalf @manast Looks like I missed something:

def optsFromJSON(rawOpts: dict) -> dict:
    # opts = json.loads(rawOpts)
    opts = rawOpts

    option_entries = opts.items()

    options = {}
    for item in option_entries:
        attribute_name = item[0]
        value = item[1]
        if attribute_name in optsDecodeMap:
            options[optsDecodeMap[attribute_name]] = value
        else:
            options[attribute_name] = value

    return options

I didn’t notice that this function performs more complex logic than just trying to somehow get some data from rawData and removed it.

I’ll fix it now and make a couple of changes just to be safe.

YeaMerci avatar Aug 12 '24 11:08 YeaMerci

@roggervalf @manast Now should work

YeaMerci avatar Aug 12 '24 13:08 YeaMerci

@roggervalf @manast Hi! I read the test logs.

Damn, I don’t know how, but I missed another moment.

After my corrections, there was no longer a need for multiple ifs and string decoding using json.loads, but I did not change such points everywhere, leaving the previous logic.

Now I've changed that.

Of the 4 failed tests, 3 should now pass because they were due to returnvalue.

However, I still don’t know what to do with the fourth test. The error in this test is because the actual value of failedJob.failedReason does not match the expected value. The test expects failedReason to be enclosed in quotes, but the actual value to be without quotes. Please tell me, failedReason should actually be in quotes?

YeaMerci avatar Aug 13 '24 11:08 YeaMerci

To be more precise, 2 tests will now be passed, and in order for 2 more to pass, you need to solve the problem with failedReason.

======================================================================
FAIL: test_process_jobs_fail (tests.worker_tests.TestWorker)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 64, in _callTestMethod
    self._callMaybeAsync(method)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 87, in _callMaybeAsync
    return self._asyncioTestLoop.run_until_complete(fut)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 101, in _asyncioLoopRunner
    ret = await awaitable
  File "/home/runner/work/bullmq/bullmq/python/tests/worker_tests.py", line 163, in test_process_jobs_fail
    self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
AssertionError: 'Failed' != '"Failed"'
- Failed
+ "Failed"
? +      +
======================================================================
FAIL: test_process_job_fail_with_nan_as_return_value (tests.worker_tests.TestWorker)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 64, in _callTestMethod
    self._callMaybeAsync(method)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 87, in _callMaybeAsync
    return self._asyncioTestLoop.run_until_complete(fut)
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/unittest/async_case.py", line 101, in _asyncioLoopRunner
    ret = await awaitable
  File "/home/runner/work/bullmq/bullmq/python/tests/worker_tests.py", line 132, in test_process_job_fail_with_nan_as_return_value
    self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
AssertionError: 'Out of range float values are not JSON compliant' != '"Out of range float values are not JSON compliant"'
- Out of range float values are not JSON compliant
+ "Out of range float values are not JSON compliant"
? +   

The failure message itself is correct, but there are no quotes, so the tests fail. Here, either the quotes are missing somewhere and you need to find it, but I can’t imagine where, or there are still errors in 2 tests:

   async def test_process_job_fail_with_nan_as_return_value(self):
        queue = Queue(queueName)
        data = {"foo": "bar"}
        job = await queue.add("test-job", data, {"removeOnComplete": False})

        failedReason = "Out of range float values are not JSON compliant"

        async def process(job: Job, token: str):
            print("Processing job", job)
            return float('nan')

        worker = Worker(queueName, process)

        processing = Future()
        worker.on("failed", lambda job, result: processing.set_result(None))
        await processing
        failedJob = await Job.fromId(queue, job.id)


        self.assertEqual(failedJob.id, job.id)
        self.assertEqual(failedJob.attemptsMade, 1)
        self.assertEqual(failedJob.data, data)
        self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
        self.assertEqual(len(failedJob.stacktrace), 1)
        self.assertEqual(failedJob.returnvalue, None)
        self.assertNotEqual(failedJob.finishedOn, None)
        
        await worker.close(force=True)
        await queue.close()

    async def test_process_jobs_fail(self):
        queue = Queue(queueName)
        data = {"foo": "bar"}
        job = await queue.add("test-job", data, {"removeOnComplete": False})

        failedReason = "Failed"

        async def process(job: Job, token: str):
            print("Processing job", job)
            raise Exception(failedReason)

        worker = Worker(queueName, process)

        processing = Future()
        worker.on("failed", lambda job, result: processing.set_result(None))

        await processing

        failedJob = await Job.fromId(queue, job.id)

        self.assertEqual(failedJob.id, job.id)
        self.assertEqual(failedJob.attemptsMade, 1)
        self.assertEqual(failedJob.data, data)
        self.assertEqual(failedJob.failedReason, f'"{failedReason}"')
        self.assertEqual(len(failedJob.stacktrace), 1)
        self.assertEqual(failedJob.returnvalue, None)
        self.assertNotEqual(failedJob.finishedOn, None)

        await worker.close(force=True)
        await queue.close()

I'm interested in this line:

self.assertEqual(failed Job.failed Reason, f'"{failedReason}"')

We definitely need failedReason to be equal not to "Some kind of fail message", but to " 'Some kind of fail message}' "?

YeaMerci avatar Aug 13 '24 12:08 YeaMerci

In theory, the last edit

YeaMerci avatar Aug 13 '24 12:08 YeaMerci

Unfortunately the python pipeline is still hanging. Did the test run all the way locally in your machine?

manast avatar Aug 13 '24 13:08 manast

Unfortunately the python pipeline is still hanging. Did the test run all the way locally in your machine?

If you look at the logs, there is still progress in improvements. Problems with returnvalue and tests for processing fails queues have been resolved.

All that remains is to shake the last test and everything will be passed.

After these improvements, I would add a new test that checks that the Job provides multiple data for working with them and would add such an example to the README

YeaMerci avatar Aug 13 '24 16:08 YeaMerci

@roggervalf @manast Hi! I think it's a victory.

Now all tests will pass successfully. Mostly bugs with decoding binary strings and architecture. Data from job can be received and processed directly in the worker function.

I would add one more test and README

YeaMerci avatar Aug 19 '24 19:08 YeaMerci

Well, the last test was not executed due to the fact that I confused ats with atm and attemptsMade was assigned to ats, not atm, but now everything is normal ;)

YeaMerci avatar Aug 19 '24 19:08 YeaMerci

Damn, forgot the type casting. I fixed it, well, this time it’s all right

YeaMerci avatar Aug 19 '24 21:08 YeaMerci

More precisely, I’m talking about the same decoding of a byte string. In the case of job_id, it could be done both through int() and decode(), however, if the second is not applied to a byte string, then an error occurs, which failed the tests this time

YeaMerci avatar Aug 19 '24 21:08 YeaMerci

I've added a new test. It's very simple. The point is to receive the job data in the worker function and do something with it; in my case, simply return it for testing.

This test is the essence of this pull request, I could not access the job data and do something with it inside the function in order to have some kind of processing logic.

In this teset, we create a queue, put a task in it, create a worker and processing functions. In the processing function we return the job.data attribute. We then check that the returned object is equivalent to the original data.

This way we can be sure that we can access the job data directly in the worker function.

This did not happen before my changes, so in the fromJSON function we tried to get values ​​from rawDATA using non-existent keys that are equivalent but not byte keys. So data was just {}, which also caused other problems.

You don't have any tests to make sure that you can correctly access the data in the worker function and do something with it, you basically just print job

YeaMerci avatar Aug 20 '24 09:08 YeaMerci


    async def test_getting_job_data_in_process(self) -> None:
        queue = Queue(queueName)
        data = {"foo": "bar"}
        job = await queue.add(
            "test-job", data,
            {"removeOnComplete": False}
        )

        async def process(job: Job, token: str) -> str:
            return job.data

        worker = Worker(queueName, process)
        processing = Future()
        worker.on(
            "completed",
            lambda job, result: processing.set_result(None)
        )

        await processing
        completedJob = await Job.fromId(queue, job.id)

        self.assertEqual(completedJob.id, job.id)
        self.assertEqual(completedJob.attemptsMade, 1)
        self.assertEqual(completedJob.data, data)
        self.assertEqual(completedJob.returnvalue, data)
        self.assertNotEqual(completedJob.finishedOn, None)

        await worker.close(force=True)
        await queue.close()

YeaMerci avatar Aug 20 '24 09:08 YeaMerci

It is quite possible that on different redis configurations or under your specific conditions, the strings will not be byte strings, but in other cases they will be.

The point is that this modification does not break the existing logic and allows us to solve a problem that some people have

YeaMerci avatar Aug 20 '24 09:08 YeaMerci

hi @YeaMerci I search for this kind of error and https://redis-py.readthedocs.io/en/latest/examples/connection_examples.html#By-default-Redis-return-binary-responses,-to-decode-them-use-decode_responses=True could you try to add this option into your connection config. It could be that in your case is needed

roggervalf avatar Aug 20 '24 14:08 roggervalf

hi @YeaMerci I search for this kind of error and https://redis-py.readthedocs.io/en/latest/examples/connection_examples.html#By-default-Redis-return-binary-responses,-to-decode -them-use-decode_responses=True could you try to add this option into your connection config. It could be that in your case is needed

Okay, this solution probably helps, but why not make it work adequately out of the box? A person who came from ts to python will think that your functionality does not work, but my changes allow it to work regardless of whether decoding is on or off, so I think that my pull request should be uploaded.

Yes, even if there is no documentation or custom error for such cases, and in ts you are just bullmq without any brand of decoding.

YeaMerci avatar Aug 21 '24 09:08 YeaMerci

@roggervalf @manast It’s just that the modification itself does not break anything; a repository with it can now simply avoid such an error without the need for investigation. In addition, I refactored it according to your style

YeaMerci avatar Aug 21 '24 09:08 YeaMerci

At least such a case should be described in the documentation, because it’s a complete ass to dig into the source code and try to fix a bug that is not considered one. There would have been at least one warning, at least something, but it wasn’t

YeaMerci avatar Aug 22 '24 13:08 YeaMerci