bullmq
bullmq copied to clipboard
[Bug]: Examples in python bullmq documentation are not working
Version
v0.5.6
Platform
Python
What happened?
I tried to run the examples on the below link
https://docs.bullmq.io/python/introduction
The example was not working. As an introduction, there should be ready to use examples
Prerequisite
pip install redis --upgrade
pip install bullmq==v0.5.6
Here is an example of the working python code
Add Job
import asyncio
import redis.asyncio as redis
from bullmq import Queue
r = redis.Redis()
async def main():
queue = Queue('my-queue')
job = await queue.add('my-job', {'foo': 'bar'})
job_id = job.id
job_data = job.data
print(f"Job ID: {job_id}")
print(f"Job Data: {job_data}")
asyncio.run(main())
Get a Job
from asyncio import Future, run
from bullmq import Queue, Worker, Job
async def main():
async def process(job: Job, token: str):
print("Processing job", job.id)
print(job.data)
return "done"
while True:
processing = Future()
worker = Worker("my-queue", process)
worker.on("completed", lambda job, result: processing.set_result(None))
await processing
run(main())
Let me know if there are other such examples, you have created or have better ways to get job and update it's status
How to reproduce.
Try to run the examples on https://docs.bullmq.io/python/introduction
Relevant log output
No response
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I am facing a weird issue
- I ran the add a job multiple times.
- When I ran the get-job.py It ran into a lock issue as below
oot@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 10
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 11
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 12
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 13
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 14
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 15
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 16
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 17
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python add.py
Job ID: 18
Job Data: {'foo': 'bar'}
root@13b31d743662:/usr/src/app/helper# python get.py
Processing job 10
{'foo': 'bar'}
Processing job 11
{'foo': 'bar'}
Processing job 12
{'foo': 'bar'}
Processing job 13
{'foo': 'bar'}
Error processing job invalid state
Error moving job to failed Missing lock for job 13.moveToFailed
Processing job 15
{'foo': 'bar'}
Processing job 14
{'foo': 'bar'}
Error processing job invalid state
Processing job 16
{'foo': 'bar'}
Error moving job to failed Missing lock for job 14.moveToFailed
Processing job 18
{'foo': 'bar'}
Processing job 17
{'foo': 'bar'}
@manast can you take a look, in the above issue?
Is it the way consumer is written creating the problem?
@vraj-fyno not sure what I should look for about this issue. Why don't you use the example code from the documentation instead of writing your own that does not work?
The example code does not work out of the box, so I had to write the code and the issue being if there are multiple entries in the queue, the get job function fails. Do you have some working examples that I can try?
BTW I had taken this code from the test file itself!
You say that this code is not working: https://docs.bullmq.io/python/introduction Then you paste some random code that is not working. 🤔
The Python version is still under heavy development, so most users using it right now are NodeJS users that need interoperability with Python. We will improve the documentation as the module gets more mature.
I also come with the same use case, where majority of our services in node are using bullmq, so for interoperability we need bullmq in python too.
While trying the code on the documentation was not even starting out of the box, I thought maybe I can make it work by using the asyncio library and it is working perfectly fine for adding items to queue.
While consuming, although the code is working if you see the above console, there is an issue in lock of a job. I suppose it's because of internal implementation in bullmq. If there are more than x amount of jobs queued up, it runs into a lock error.
So long story short, I have figured out a way to send data from python to bull, but consuming is still a problem if there are more data already in the queue.
If you take code from the tests it is not going to work as those tests are written to test some specific edge cases, so you would not write code like that in practice. I see you are using a very old version too, last version is https://pypi.org/project/bullmq/1.5.0/ Why don't you tell me what is happening with the example code here https://docs.bullmq.io/python/introduction as it should work?
Hey, maybe I can weigh in on that.
In general the "issue" with the doc code is, that - if simply copy/pasted to a python script - the script immediately exits (well... as expected, but might put off more inexperienced devs). I guess this is not an issue in itself, as the package is in heavy development and the bullmqs code should be embedded in a working application anyhow.
The second thing I found was, that the "process" method nowadays requires two positional arguments. The job and a token.
The "minimal example" on my side, which seems to work is the following:
from bullmq import Worker
import asyncio
async def doSomethingAsync(job, token):
print(job.data)
print(token)
await asyncio.sleep(2)
return "done"
async def process(job, token):
# job.data will include the data added to the queue
return await doSomethingAsync(job, token)
async def main():
# Feel free to remove the connection parameter, if your redis runs on localhost
worker = Worker("PROCESSFILE", process, {"connection": "rediss://<user>:<password>@<host>:<port>"})
while True: # Add some breaking conditions here
await asyncio.sleep(1)
# When no need to process more jobs we should close the worker
await worker.close()
if __name__ == "__main__":
asyncio.run(main())
Thanks, I improved the documentation a bit based on your feedback. However a couple of things:
- token is still omitted as it is only needed when processing jobs manually, so in order to not confuse users we just omit it until we have new examples for manual jobs.
- The await in your
return await doSomethingAsync(job, token)is not necessary.
The documentation is really misleading needs more information in key areas:
- It might very well be that
tokenis only needed when processing jobs manually. However, if we omit the second parameter (token) from theprocessfunction, the code simply does not work and errors withError processing job process() takes 1 positional argument but 2 were given. This should absolutely be fixed in the example code. - How do we keep the worker running?
If we remove the loop, the worker will never really run or process any jobs. Is the comment false? Should we replace the loop with something else? Shouldn't the worker itself keep the process alive? The docs would benefit greatly from more information here.
If you remove the loop then you must also add logic to close the worker when necessary, as the code in the example will just continue with worker.close() and then nothing will be processed. The loop is just to illustrate some asynchronizity in your application. Most likely you will remove both the loop and move the worker.close() to a different method that you call when the service is going to be shut down.
Hi, i am starting to use the python version of bullmq. I expanded on the provided example and replaced the while loop with awaiting shutdown event. I thought other people may benefit from this change. See the PR here: https://github.com/taskforcesh/bullmq/pull/2677/files
@cagejsn thanks!
I am closing this as the examples should be correct now, if something is wrong please open a new issue.
I'm looking at this now. The documentation code as written (replacing doSomethingAsync(job, token) with a print statement, say) shows a lint error on assigning process.
The error shows up at runtime when trying to mark the job complete.
Argument of type "(job: Job, job_token: str) -> Coroutine[Any, Any, Future[Unknown]]" cannot be assigned to parameter "processor" of type "(Job, str) -> Future[Unknown]" in function "__init__"
Type "(job: Job, job_token: str) -> Coroutine[Any, Any, Future[Unknown]]" is incompatible with type "(Job, str) -> Future[Unknown]"
Function return type "Coroutine[Any, Any, Future[Unknown]]" is incompatible with type "Future[Unknown]"
"Coroutine[Any, Any, Future[Unknown]]" is incompatible with "Future[Unknown]"
@SamBallantyne you need to return a future I think, so just printing to the console may not be correct.
Btw, here you can find some example code from the tests that is running correctly: https://github.com/taskforcesh/bullmq/blob/master/python/tests/worker_tests.py