bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Bug]: Examples in python bullmq documentation are not working

Open vraj-fyno opened this issue 2 years ago • 12 comments

Version

v0.5.6

Platform

Python

What happened?

I tried to run the examples on the below link

https://docs.bullmq.io/python/introduction

The example was not working. As an introduction, there should be ready to use examples

Prerequisite

pip install redis --upgrade
pip install bullmq==v0.5.6

Here is an example of the working python code

Add Job

import asyncio
import redis.asyncio as redis

from bullmq import Queue
r = redis.Redis()

async def main():
    queue = Queue('my-queue')
    job = await queue.add('my-job', {'foo': 'bar'})
    job_id = job.id
    job_data = job.data
    print(f"Job ID: {job_id}")
    print(f"Job Data: {job_data}")

asyncio.run(main())

Get a Job

from asyncio import Future, run
from bullmq import Queue, Worker, Job


async def main():
    async def process(job: Job, token: str):
        print("Processing job", job.id)
        print(job.data)
        return "done"



    while True:
        processing = Future()
        worker = Worker("my-queue", process)
        worker.on("completed", lambda job, result: processing.set_result(None))

        await processing

run(main())

Let me know if there are other such examples, you have created or have better ways to get job and update it's status

How to reproduce.

Try to run the examples on https://docs.bullmq.io/python/introduction

Relevant log output

No response

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

vraj-fyno avatar Jun 23 '23 16:06 vraj-fyno

I am facing a weird issue

  • I ran the add a job multiple times.
  • When I ran the get-job.py It ran into a lock issue as below
oot@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 10
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 11
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 12
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 13
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 14
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 15
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 16
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 17
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py 
Job ID: 18
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python get.py 
Processing job 10
{'foo': 'bar'}
Processing job 11
{'foo': 'bar'}
Processing job 12
{'foo': 'bar'}
Processing job 13
{'foo': 'bar'}
Error processing job invalid state
Error moving job to failed Missing lock for job 13.moveToFailed
Processing job 15
{'foo': 'bar'}
Processing job 14
{'foo': 'bar'}
Error processing job invalid state
Processing job 16
{'foo': 'bar'}
Error moving job to failed Missing lock for job 14.moveToFailed
Processing job 18
{'foo': 'bar'}
Processing job 17
{'foo': 'bar'}

vraj-fyno avatar Jun 29 '23 17:06 vraj-fyno

@manast can you take a look, in the above issue?

Is it the way consumer is written creating the problem?

vraj-fyno avatar Jul 05 '23 04:07 vraj-fyno

@vraj-fyno not sure what I should look for about this issue. Why don't you use the example code from the documentation instead of writing your own that does not work?

manast avatar Jul 05 '23 09:07 manast

The example code does not work out of the box, so I had to write the code and the issue being if there are multiple entries in the queue, the get job function fails. Do you have some working examples that I can try?

BTW I had taken this code from the test file itself!

vraj-fyno avatar Jul 05 '23 09:07 vraj-fyno

You say that this code is not working: https://docs.bullmq.io/python/introduction Then you paste some random code that is not working. 🤔

manast avatar Jul 05 '23 09:07 manast

The Python version is still under heavy development, so most users using it right now are NodeJS users that need interoperability with Python. We will improve the documentation as the module gets more mature.

manast avatar Jul 05 '23 09:07 manast

I also come with the same use case, where majority of our services in node are using bullmq, so for interoperability we need bullmq in python too.

While trying the code on the documentation was not even starting out of the box, I thought maybe I can make it work by using the asyncio library and it is working perfectly fine for adding items to queue.

While consuming, although the code is working if you see the above console, there is an issue in lock of a job. I suppose it's because of internal implementation in bullmq. If there are more than x amount of jobs queued up, it runs into a lock error.

So long story short, I have figured out a way to send data from python to bull, but consuming is still a problem if there are more data already in the queue.

vraj-fyno avatar Jul 05 '23 09:07 vraj-fyno

If you take code from the tests it is not going to work as those tests are written to test some specific edge cases, so you would not write code like that in practice. I see you are using a very old version too, last version is https://pypi.org/project/bullmq/1.5.0/ Why don't you tell me what is happening with the example code here https://docs.bullmq.io/python/introduction as it should work?

manast avatar Jul 05 '23 10:07 manast

Hey, maybe I can weigh in on that.

In general the "issue" with the doc code is, that - if simply copy/pasted to a python script - the script immediately exits (well... as expected, but might put off more inexperienced devs). I guess this is not an issue in itself, as the package is in heavy development and the bullmqs code should be embedded in a working application anyhow.

The second thing I found was, that the "process" method nowadays requires two positional arguments. The job and a token.

The "minimal example" on my side, which seems to work is the following:

from bullmq import Worker
import asyncio


async def doSomethingAsync(job, token):
    print(job.data)
    print(token)
    await asyncio.sleep(2)
    return "done"

async def process(job, token):
    # job.data will include the data added to the queue
    return await doSomethingAsync(job, token)

async def main():
    # Feel free to remove the connection parameter, if your redis runs on localhost
    worker = Worker("PROCESSFILE", process, {"connection": "rediss://<user>:<password>@<host>:<port>"})

    while True: # Add some breaking conditions here
        await asyncio.sleep(1)

    # When no need to process more jobs we should close the worker
    await worker.close()

if __name__ == "__main__":
    asyncio.run(main())

andnig avatar Aug 08 '23 06:08 andnig

Thanks, I improved the documentation a bit based on your feedback. However a couple of things:

  • token is still omitted as it is only needed when processing jobs manually, so in order to not confuse users we just omit it until we have new examples for manual jobs.
  • The await in your return await doSomethingAsync(job, token) is not necessary.

manast avatar Aug 11 '23 09:08 manast

The documentation is really misleading needs more information in key areas:

  • It might very well be that token is only needed when processing jobs manually. However, if we omit the second parameter (token) from the process function, the code simply does not work and errors with Error processing job process() takes 1 positional argument but 2 were given. This should absolutely be fixed in the example code.
  • How do we keep the worker running? Screenshot 2023-09-27 at 12 44 52

If we remove the loop, the worker will never really run or process any jobs. Is the comment false? Should we replace the loop with something else? Shouldn't the worker itself keep the process alive? The docs would benefit greatly from more information here.

CodingMeSwiftly avatar Sep 27 '23 10:09 CodingMeSwiftly

If you remove the loop then you must also add logic to close the worker when necessary, as the code in the example will just continue with worker.close() and then nothing will be processed. The loop is just to illustrate some asynchronizity in your application. Most likely you will remove both the loop and move the worker.close() to a different method that you call when the service is going to be shut down.

manast avatar Sep 29 '23 13:09 manast

Hi, i am starting to use the python version of bullmq. I expanded on the provided example and replaced the while loop with awaiting shutdown event. I thought other people may benefit from this change. See the PR here: https://github.com/taskforcesh/bullmq/pull/2677/files

cagejsn avatar Jul 28 '24 21:07 cagejsn

@cagejsn thanks!

manast avatar Jul 28 '24 21:07 manast

I am closing this as the examples should be correct now, if something is wrong please open a new issue.

manast avatar Aug 09 '24 13:08 manast

I'm looking at this now. The documentation code as written (replacing doSomethingAsync(job, token) with a print statement, say) shows a lint error on assigning process.

The error shows up at runtime when trying to mark the job complete.

Argument of type "(job: Job, job_token: str) -> Coroutine[Any, Any, Future[Unknown]]" cannot be assigned to parameter "processor" of type "(Job, str) -> Future[Unknown]" in function "__init__"
  Type "(job: Job, job_token: str) -> Coroutine[Any, Any, Future[Unknown]]" is incompatible with type "(Job, str) -> Future[Unknown]"
    Function return type "Coroutine[Any, Any, Future[Unknown]]" is incompatible with type "Future[Unknown]"
      "Coroutine[Any, Any, Future[Unknown]]" is incompatible with "Future[Unknown]"

SamBallantyne avatar Aug 16 '24 04:08 SamBallantyne

@SamBallantyne you need to return a future I think, so just printing to the console may not be correct.

manast avatar Aug 16 '24 15:08 manast

Btw, here you can find some example code from the tests that is running correctly: https://github.com/taskforcesh/bullmq/blob/master/python/tests/worker_tests.py

manast avatar Aug 16 '24 15:08 manast