celery icon indicating copy to clipboard operation
celery copied to clipboard

revoking / removing tasks from queue, while workers are offline

Open urzbs opened this issue 11 months ago • 8 comments

Issue Description

Overview

I'm currently utilizing Celery within a distributed system to handle various tasks across remote workers. However, there are instances where these workers may be offline due to reasons such as cloud maintenance or other unforeseen circumstances. My primary system acts as a task producer, enqueuing tasks into different Celery queues.

Problem Statement

In certain scenarios, it becomes necessary to terminate or remove specific tasks from execution entirely. To achieve this, I'm employing the following method:

app.control.revoke(task_id, terminate=True)

This successfully terminates the process and marks the task state as "REVOKED".

Issue

The challenge arises when the worker associated with the task's queue is offline. In such cases, the broadcast signal sent by .revoke() is lost. Consequently, even if tasks have been revoked, upon restarting the workers, these tasks are still executed.

Attempted Solution

I've consulted the Celery documentation regarding persistent-revokes and followed the instructions outlined here: Celery Documentation - Persistent Revokes. However, the problem persists as the .state file cannot be created when there's no connection, such as when the worker is offline.

Request for Assistance

I'm seeking guidance on ensuring that revoked tasks are never executed, regardless of the worker's online/offline status. Specifically:

  • Is there a method to purge a specific task from the Redis broker or mark it as "REVOKED" even when the worker is offline?
  • Are there alternative strategies or configurations within Celery that can address this issue effectively?

Your insights and assistance in resolving this matter are greatly appreciated.

urzbs avatar Mar 04 '24 14:03 urzbs

I'm also looking for a solution to this problem!

Right now, my best bet is to manually access the queue in Redis, and to delete the tasks from the queue. This is neither desirable nor pretty, so any alternative would be appreciated!

alexpotv avatar Mar 17 '24 16:03 alexpotv

Also, hitting this issue. It was quite surprising to see this semantics.

shcheklein avatar Jul 10 '24 23:07 shcheklein

Also, hitting this issue. It was quite surprising to see this semantics.

Curious to know, what's your opinion on the best workaround for this?

alexpotv avatar Jul 11 '24 02:07 alexpotv

@alexpotv I don't know yet tbh. I was not able to find anything quickly. Some ideas to explore, check: see if it's possible to drop it directly on the broken (some internal API), check if it's possible to register dynamically a queue on the worker and at the end purge it.

Have you found any approach to this so far?

shcheklein avatar Jul 11 '24 02:07 shcheklein

@alexpotv I don't know yet tbh. I was not able to find anything quickly. Some ideas to explore, check: see if it's possible to drop it directly on the broken (some internal API), check if it's possible to register dynamically a queue on the worker and at the end purge it.

Have you found any approach to this so far?

For now, I'm dropping it directly in the broker (Redis queue). In my case, I already had a connection to the Redis instance for something else in my FastAPI app, but it would be an additional dependency if I hadn't.

Thanks for the info!

alexpotv avatar Jul 11 '24 02:07 alexpotv

@alexpotv do you have some code how you do this exactly? I also have redis connection.

shcheklein avatar Jul 11 '24 02:07 shcheklein

@shcheklein Here's how I did it using FastAPI:

from redis import Redis
from redis.lock import Lock as RedisLock

@router.post("/{task_id}/stop")
async def stop_task(task_id: str, redis_client: Redis, redis_lock: RedisLock):
    if not redis_lock.acquire(blocking_timeout=3):
        raise CouldNotStopTaskException()

    try:
        celery_app.control.revoke(task_id, terminate=True)
        _remove_task_from_redis_queue(task_id, redis_client)

    finally:
        redis_lock.release()


def _remove_task_from_redis_queue(task_id: str, redis_client: Redis):
    queue = redis_client.lrange('celery', 0, -1)
    for task_json in queue:
        task = json.loads(task_json)
        try:
            if task.get('headers').get('id') == task_id:
                redis_client.lrem('celery', 1, task_json)
                break
        except AttributeError:
            pass

    redis_client.delete(f"celery-task-meta-{task_id}")

Please note that this is not meant to be copy/pasted, but just shows how I did it. Also, redis_lock and redis_client should be resolved as dependencies in the endpoint parameters.

alexpotv avatar Jul 11 '24 13:07 alexpotv

@shcheklein I just found out from this StackOverflow answer that it's possible to use the Celery app instance itself to access the Redis instance, without the need for an external Redis client. Thought I would share!

alexpotv avatar Aug 15 '24 01:08 alexpotv