celery-singleton
celery-singleton copied to clipboard
Option to release lock on task acceptance
My app watches for data changes, and then schedules a singleton task for cache regeneration. The singleton is very helpful, because i'm not queueing the same task multiple times if data changes in succession; any additional data changes will be reflected whenever the task is finally run.
However, i'm dealing with a race condition: if a task has been accepted, and data has changed after the task has accessed the database but before the task is completed (which is when the lock is released), the latest changes will not be reflected in the result- and any attempts to re-schedule the task during that time will fail because the lock still exists. i think that in my case, i need the lock to be released when the task is accepted, not when it's finished. Sure, i will have a case where the same task is being run simultaneously, but the first task's results are already going to be stale before the task is completed anyway.
Celery deals with these different scenarios with acks_late
, so maybe one approach is to release the lock in conjunction with task acknowledgement. Otherwise, an additional setting is desirable, so i can have the lock released during the early acknowledgment, rather than after the task is completed.
Hmmm, I'll have to think about it. There are bound to be a bunch of different scenarios where you want the lock released early.
I'm thinking a convenient way to release a specific lock would be the way to go. Then you could do it on any signal or wherever is appropriate. That would make things most flexible, rather than maintaining release_on_x
options for every possible case.
Right now I think the only easy way to release the lock on demand is from within the task itself, by calling self.release_lock(*task_args, **task_kwargs)
Ideally you should be able to release the lock with just the task ID cause that's usually readily available in any celery signals.
For the time being, i'm doing this in my project: [EDIT: this doesn't work]
class EarlySingleton(Singleton):
""" Implementation of the celery Singleton which releases the lock when the
task has begun executing, rather than after success/failure.
"""
abstract = True
def apply(self, args=None, kwargs=None,
link=None, link_error=None,
task_id=None, retries=None, throw=None,
logfile=None, loglevel=None, headers=None, **options):
self.release_lock(*args, **kwargs)
return super(EarlySingleton, self).apply(
args, kwargs, link, link_error, task_id, retries, throw, logfile,
loglevel, headers, **options
)
def on_failure(self, *args, **kwargs):
pass
def on_success(self, *args, **kwargs):
pass
~Seems like releasing the lock is a bit unreliable, though- not yet sure why.~
Your idea about signals is interesting. i suppose you're thinking that some tasks might need the lock to be released early, and others late.
Is apply
ever called? I think it's mainly for testing, but I could be wrong.
Looks like it's called by apply_async
when always_eager is set http://docs.celeryproject.org/en/latest/_modules/celery/app/task.html#Task.apply_async
Not sure if it's used by the worker, if it is and lock isn't always released then you may have stumbled upon a bug :/
I'm pretty sure you could use the prerun hook and do something like this
@task_prerun.connect
def delete_lock(task=None, args=None, kwargs=None, **more_signal_kwargs):
args = args or []
kwargs = kwargs or {}
if task.name == 'task_that_should_unlock':
task.release_lock(*args, **kwargs)
Or delete the lock in your run method:
@celery_app.task(binds=True, base=EarlySingleton)
def sometask(self, *args, **kwargs):
self.release_lock(*args, **kwargs)
...
Task ID verification in Singleton.release_lock
might be a good idea so it doesn't delete another task's lock, then you could safely delete locks whenever without having to subclass.
@steinitzu you're right; i made a bad assumption with that apply
usage. Your latter example works just fine in my case.
Since aquire_lock
takes a task id, release_lock
might as well use the same. Is it necessary to use the args/kwargs in release_lock
?
Yeah, there's no way for release_lock
to know which key to delete from redis without computing the hash.
The locks are stored in key value pairs of LOCK
: TASK_ID
. Where LOCK
is the hash of task_name+arguments.
So aquire_lock
needs both, since it computes the hash and stores task_id
under it.
release_lock
re-computes the hash from source name and arguments and deletes it from redis.
To delete the lock (fast) by task ID alone, we would need to store a second pair, so we have both LOCK : TASK_ID
and TASK_ID : LOCK
Which I'm thinking about doing, as long as it doesn't cause any weird side effects.
Then we could release lock by task ID, which might also shave off a couple of cpu cycles since we don't have to recompute the hash on every release at the cost of doubled storage size, which shouldn't be a problem unless you have millions of simultaneous singleton tasks :)
And I could add in something like this:
release_lock(celery_app, task_id)
Which you could call from anywhere without needing a task instance.
see https://github.com/cameronmaske/celery-once which has unlock_before_run
hi @steinitzu, do we have any update on this? like having release_lock(celery_app, task_id)
? Thanks!
see cameronmaske/celery-once which has
unlock_before_run
celery-once seems abandoned, there is no activity since 2019.