celery-singleton icon indicating copy to clipboard operation
celery-singleton copied to clipboard

Option to release lock on task acceptance

Open smcoll opened this issue 7 years ago • 8 comments

My app watches for data changes, and then schedules a singleton task for cache regeneration. The singleton is very helpful, because i'm not queueing the same task multiple times if data changes in succession; any additional data changes will be reflected whenever the task is finally run.

However, i'm dealing with a race condition: if a task has been accepted, and data has changed after the task has accessed the database but before the task is completed (which is when the lock is released), the latest changes will not be reflected in the result- and any attempts to re-schedule the task during that time will fail because the lock still exists. i think that in my case, i need the lock to be released when the task is accepted, not when it's finished. Sure, i will have a case where the same task is being run simultaneously, but the first task's results are already going to be stale before the task is completed anyway.

Celery deals with these different scenarios with acks_late, so maybe one approach is to release the lock in conjunction with task acknowledgement. Otherwise, an additional setting is desirable, so i can have the lock released during the early acknowledgment, rather than after the task is completed.

smcoll avatar Aug 01 '17 18:08 smcoll

Hmmm, I'll have to think about it. There are bound to be a bunch of different scenarios where you want the lock released early.

I'm thinking a convenient way to release a specific lock would be the way to go. Then you could do it on any signal or wherever is appropriate. That would make things most flexible, rather than maintaining release_on_x options for every possible case.

Right now I think the only easy way to release the lock on demand is from within the task itself, by calling self.release_lock(*task_args, **task_kwargs)
Ideally you should be able to release the lock with just the task ID cause that's usually readily available in any celery signals.

steinitzu avatar Aug 01 '17 19:08 steinitzu

For the time being, i'm doing this in my project: [EDIT: this doesn't work]

class EarlySingleton(Singleton):
    """ Implementation of the celery Singleton which releases the lock when the
    task has begun executing, rather than after success/failure.
    """
    abstract = True

    def apply(self, args=None, kwargs=None,
              link=None, link_error=None,
              task_id=None, retries=None, throw=None,
              logfile=None, loglevel=None, headers=None, **options):
        self.release_lock(*args, **kwargs)
        return super(EarlySingleton, self).apply(
            args, kwargs, link, link_error, task_id, retries, throw, logfile,
            loglevel, headers, **options
        )

    def on_failure(self, *args, **kwargs):
        pass

    def on_success(self, *args, **kwargs):
        pass

~Seems like releasing the lock is a bit unreliable, though- not yet sure why.~

Your idea about signals is interesting. i suppose you're thinking that some tasks might need the lock to be released early, and others late.

smcoll avatar Aug 01 '17 21:08 smcoll

Is apply ever called? I think it's mainly for testing, but I could be wrong. Looks like it's called by apply_async when always_eager is set http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html#Task.apply_async
Not sure if it's used by the worker, if it is and lock isn't always released then you may have stumbled upon a bug :/

I'm pretty sure you could use the prerun hook and do something like this

@task_prerun.connect
def delete_lock(task=None, args=None, kwargs=None, **more_signal_kwargs):
    args = args or []
    kwargs = kwargs or {}
    if task.name == 'task_that_should_unlock':
        task.release_lock(*args, **kwargs)

Or delete the lock in your run method:

@celery_app.task(binds=True, base=EarlySingleton)
def sometask(self, *args, **kwargs):
    self.release_lock(*args, **kwargs)
    ... 

Task ID verification in Singleton.release_lock might be a good idea so it doesn't delete another task's lock, then you could safely delete locks whenever without having to subclass.

steinitzu avatar Aug 02 '17 02:08 steinitzu

@steinitzu you're right; i made a bad assumption with that apply usage. Your latter example works just fine in my case.

Since aquire_lock takes a task id, release_lock might as well use the same. Is it necessary to use the args/kwargs in release_lock?

smcoll avatar Aug 02 '17 14:08 smcoll

Yeah, there's no way for release_lock to know which key to delete from redis without computing the hash. The locks are stored in key value pairs of LOCK : TASK_ID . Where LOCK is the hash of task_name+arguments.
So aquire_lock needs both, since it computes the hash and stores task_id under it.
release_lock re-computes the hash from source name and arguments and deletes it from redis.

To delete the lock (fast) by task ID alone, we would need to store a second pair, so we have both LOCK : TASK_ID and TASK_ID : LOCK
Which I'm thinking about doing, as long as it doesn't cause any weird side effects.

Then we could release lock by task ID, which might also shave off a couple of cpu cycles since we don't have to recompute the hash on every release at the cost of doubled storage size, which shouldn't be a problem unless you have millions of simultaneous singleton tasks :)

And I could add in something like this:

release_lock(celery_app, task_id)

Which you could call from anywhere without needing a task instance.

steinitzu avatar Aug 02 '17 16:08 steinitzu

see https://github.com/cameronmaske/celery-once which has unlock_before_run

claytondaley avatar Jan 23 '20 18:01 claytondaley

hi @steinitzu, do we have any update on this? like having release_lock(celery_app, task_id) ? Thanks!

Fan-Gong avatar Sep 14 '20 15:09 Fan-Gong

see cameronmaske/celery-once which has unlock_before_run

celery-once seems abandoned, there is no activity since 2019.

utapyngo avatar Jul 06 '21 07:07 utapyngo