bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

[Python] Worker Initialized but Not Processing Jobs

Open akramhecini opened this issue 9 months ago • 3 comments

I have implemented a Flask API with a BullMQ Worker to handle report generation. Although the job is successfully added to the queue and the worker appears to be initialized correctly ("Worker is ready and listening for jobs." is logged), the report generation function is never executed.

Relevant Code The key parts of the code are as follows:

  • Adding the job to the queue in the Flask API:
@app.route('/generate-report', methods=['POST'])
async def generate_report():
    data = request.get_json()
    survey_id = data.get("id")

    if not survey_id:
        return jsonify({"error": "ID missing"}), 400
    job = await queue.add("generate_report", {"survey_id": survey_id})
    logger.info(f"Added job to queue with ID: {job.id}")

    return jsonify({"message": "Task in progress", "task_id": job.id})
  • Job processing by the Worker:
async def process_job(job):
    survey_id = job.data.get("survey_id")
    logger.info(f"Processing job with survey ID: {survey_id}")
    return await generate_report_task(survey_id)
  • Worker Initialization:
async def main_worker():
    try:
        logger.info("Initializing worker...")
        worker = Worker("generate_reports_queue", process_job)
        logger.info("Worker is ready and listening for jobs.")
        await asyncio.Future()  # Keep the worker running
    except Exception as e:
        logger.error(f"Worker error: {e}", exc_info=True)

Expected Behavior I expected the generate_report_task function to be called and the report to be generated once the job is picked up by the worker.

Observed Behavior

  • The job is successfully added to the queue.
  • The message "Worker is ready and listening for jobs." appears in the logs.
  • However, the generate_report_task function is never called.

Technical Details

  • Python 3.11
  • Running on WSL2

Request for Help I believe the worker is not correctly intercepting jobs from the queue. Any suggestions or troubleshooting tips would be greatly appreciated. Thank you in advance for your help!

akramhecini avatar Mar 10 '25 14:03 akramhecini

Where is the code that instantiates the queue instance?

manast avatar Mar 10 '25 14:03 manast

Where is the code that instantiates the queue instance?

at the top of the code as a global variable.

Here's the complete code :

from flask import Flask, request, jsonify
from bullmq import Queue, Worker
import asyncio
from skezia_tools import *
import signal
import sys
import logging
import threading

app = Flask(__name__)
queue = Queue("generate_reports_queue")

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ========================== #
#  Report Generation Function
# ========================== #

async def generate_report_task(survey_id):
    try:
        logger.info(f"Starting report generation for survey ID: {survey_id}")
        reponses, questions = fetch_responses(survey_id)
        data_df = responses_json_to_df(reponses, "definition")
        survey_data = bdv_json_to_df(questions["definition"].iloc[0])

        output_path = f"output_{survey_id}.docx"

        decision_maker_with_report_streaming(
            data=data_df,
            df_bbl=survey_data,
            template="Template_SKEZIA.docx",
            bucket_name="reu-data",
            key=output_path
        )

        logger.info(f"Report generated successfully for survey ID: {survey_id}")
        return {"message": "Rapport généré avec succès", "file_path": output_path}

    except Exception as e:
        logger.error(f"Error generating report for survey ID {survey_id}: {str(e)}")
        return {"error": str(e)}

# ========================== #
#  Flask API
# ========================== #

@app.route('/generate-report', methods=['POST'])
async def generate_report():
    data = request.get_json()
    survey_id = data.get("id")

    if not survey_id:
        return jsonify({"error": "ID manquant"}), 400
    job = await queue.add("generate_report", {"survey_id": survey_id})
    logger.info(f"Added job to queue with ID: {job.id}")

    return jsonify({"message": "Tâche en cours", "task_id": job.id})

# ========================== #
#  BullMQ Worker
# ========================== #

async def process_job(job):
    survey_id = job.data.get("survey_id")
    logger.info(f"Processing job with survey ID: {survey_id}")
    return await generate_report_task(survey_id)

async def main_worker():
    try:
        logger.info("Initializing worker...")
        worker = Worker("generate_reports_queue", process_job)
        logger.info("Worker is ready and listening for jobs.")
        await asyncio.Future()
    except Exception as e:
        logger.error(f"Worker error: {e}", exc_info=True)

def run_worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main_worker())

# Handle graceful shutdown
def shutdown(signal, frame):
    logger.info("Shutting down gracefully...")
    sys.exit(0)

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

if __name__ == "__main__":
    worker_thread = threading.Thread(target=run_worker, daemon=True)
    worker_thread.start()
    app.run(debug=True, use_reloader=False)

akramhecini avatar Mar 12 '25 12:03 akramhecini

As I am not a master in python I got the following answer from Grok:

Recommended Solution Path

Based on the analysis, the most likely issue is that the worker isn’t connecting to the same Redis instance as the queue, or the threading setup is causing the worker to fail silently. Here’s a streamlined approach:

  1. Add Redis Configuration: Update your code to explicitly specify the Redis connection:

redis_config = {"host": "localhost", "port": 6379, "db": 0}
queue = Queue("generate_reports_queue", connection=redis_config)
worker = Worker("generate_reports_queue", process_job, connection=redis_config)
  1. Test Worker in Main Thread: Temporarily run:

if __name__ == "__main__":
    asyncio.run(main_worker())
  1. Add a job from another script and check logs.

Switch to Multiprocessing if Needed: If threading is the issue, use:


from multiprocessing import Process

def run_worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(main_worker())

if __name__ == "__main__":
    worker_process = Process(target=run_worker)
    worker_process.start()
    app.run(debug=True, use_reloader=False)
    worker_process.join()
  1. Add Debug Route: Implement /queue-status to monitor jobs:

@app.route('/queue-status', methods=['GET'])
async def queue_status():
    jobs = await queue.getJobs(['waiting', 'active', 'completed', 'failed'])
    return jsonify([job.as_json() for job in jobs])

manast avatar Mar 14 '25 15:03 manast