bullmq icon indicating copy to clipboard operation
bullmq copied to clipboard

Add AbortSignal to Job for Worker Graceful Shutdowns

Open dillonstreator opened this issue 11 months ago • 3 comments

Description:

It would be beneficial if the Worker provided an AbortSignal tied to the Worker#close() method, allowing tasks to gracefully finish or abort when the worker is shutting down. For backward compatibility, the AbortSignal could be added as a new field on the Job object passed to the worker’s callback function.

Use Case:

This feature aligns with the graceful shutdown guidance in the BullMQ documentation, enhancing its implementation by providing a more robust mechanism to handle long-running tasks during shutdown. It is particularly useful when:

• Workers are performing tasks that involve external I/O (e.g., HTTP requests, database queries) and need to cancel operations gracefully. • CPU-bound tasks need a mechanism to periodically check for shutdown signals to exit cleanly.

Proposed API:

The AbortSignal would be added as a field on the Job object. Two examples are provided below to demonstrate its usage:

  1. Handling an HTTP Request with fetch:

The AbortSignal can be passed directly into APIs like fetch that support operation cancellation:

const worker = new Worker('queueName', async (job) => {
  try {
    const response = await fetch('https://example.com/api', { signal: job.signal });
    // ...
  } catch (error) { /* ... */ }
}, { connection: redisConnection });
  1. CPU-bound Task with Periodic Aborted Checks:

For long-running, CPU-bound tasks, you can periodically check job.signal.aborted to exit gracefully:

const worker = new Worker('queueName', async (job) => {
  try {
    let result = 0;
    for (let i = 0; i < 1e9; i++) {
      if (job.signal.aborted) {
        console.log('Aborting computation...');
        break;
      }
      result += i; // Simulate computation
    }
    // ...
  } catch (error) { /* ... */ }
}, { connection: redisConnection });

When worker.close() is called, the signal on the job will be aborted:

await worker.close(); // Signals abort to all in-progress jobs

Advantages:

Backward Compatibility: Adding the signal field to Job ensures existing worker callback implementations remain unaffected. • Enhances BullMQ’s support for graceful shutdowns. • Aligns with modern async patterns, enabling integration with APIs like fetch and efficient task management in CPU-intensive operations.

Why AbortSignal?

The AbortSignal API is widely adopted in modern JavaScript for managing asynchronous control flow, operation cancellation, and cleanup. Examples of its adoption include:

• The native fetch API for canceling HTTP requests. • Database libraries (e.g., MongoDB, Sequelize) for canceling queries.

By introducing AbortSignal, BullMQ would align with this growing standard, making it more developer-friendly and versatile.

dillonstreator avatar Jan 17 '25 01:01 dillonstreator

This would be an awesome addition! It could complement the stalled check very well, once a job is discarded due to a stall, it would be helpful to have a signal that notifies the Worker to abort the current processing job

tonivj5 avatar Jul 04 '25 15:07 tonivj5

@tonivj5 it will not work for stalled jobs as normally when a job gets stalled it's because the worker processing it is not able to update the lock on the job, so it is another worker who actually marks the job as stalled, and thus the abort signal will be of no use in this case.

manast avatar Jul 07 '25 09:07 manast

@manast thanks for the correction! 🙇🏻 anyway, could it be used to notify worker (at some point, when the worker isn't responsive enough) to stop working on a job?

tonivj5 avatar Jul 07 '25 10:07 tonivj5