celery-singleton
celery-singleton copied to clipboard
Handle Retry
This close definitively pull request GH-8, GH-9 and issues GH-10, GH-20. I think that with some like this can be chain works also.
Would be awesome to get this merged in. Do you need any help reviewing it?
Hey guys, sorry about the delay. And thanks for submitting this @lsaavedr
This looks good at a glance, but I would love some tests to go with it. If either of you want to take care of that I'm happy to merge.
- To verify that the retry works
- Verify that the lock is held throughout all retries
- and released whether the retries eventually succeed or fail (though I'm assuming celery calls
on_success/failurehere as usual, in which case it doesn't need a test)
I'm also curious about what celery does with the task ID on retry. Does it re-queue with a new ID or re-use the same ID?
Any progress here? I need this functionality but I do not want to maintain a separate package just for this fix.
Progress?
Sorry all, been neglecting this library as I don't use celery much anymore.
I'm not sure about this PR. I would like to see some reasoning for the self._unlock_to_super_retry flag, seems odd to me and looks like it would introduce a race condition.
Tests would be appreciated as well.
Thanks for the response. Can you explain a bit more about how this would introduce a race condition? If you were to implement retry, how would you do it?
Maybe racing is not such an issue, I haven't looked deeply into how retrying works in celery.
Possibly if the lock times out before the retry call, another task could run and the retry will also go through.
I think a safer way of doing this would be to check whether the existing lock has the same task ID as the current task.
This might be an issue as well: https://github.com/steinitzu/celery-singleton/blob/527d227a6d593567eb3f5af1795ac1f67b5726da/celery_singleton/singleton.py#L120-L124
With this approach the lock would not be removed when apply_async fails during retry.
Thanks. After trying out this MR and hacking around it, I could not get it to work. This project does the same as celery-once (also abandoned) by attempting to lock before apply_async (which then calls send_task).
I was able to modify this library to provide locking post send_task with retry capabilities. I haven't dug too deep into the retry functionality (eta/countdowns etc) but afaict a message is sent via apply_async, consumed from the broker, and, if eta/countdown, is put into a timer pool in the worker (possibly to be ACK'd when its countdown/etc is reached).
This sample code should be enough to build off of if anyone is interested:
class Singleton(Task):
def __init__(self, *args, **kwargs):
self.singleton_backend = RedisBackend(REDIS_URI)
self._lock_key = None
self.max_retries = None # Try forever, change if you must
self.__run = None
@property
def lock_key(self, task_args=None, task_kwargs=None):
if self._lock_key:
return self._lock_key
# Generate your lock key however
return self._lock_key
def lock(self):
lock = self.singleton_backend.lock(self.lock_key, self.request.id, expiry=60*5)
logger.info(f'Attempted lock for {self.lock_key} = {lock}')
if not lock:
"""
Override the task function so it is not called but retried.
"""
def terminated(*args, **kwargs):
self.retry(countdown=60)
# may need to do the same for __call__
self.__run = self.run
self.run = terminated
else:
if self.__run:
self.run = self.__run
return lock
def unlock(self):
unlock = self.singleton_backend.unlock(self.lock_key)
logger.info(f'Attempted unlock for {self.lock_key} = {unlock}')
...
@signals.task_prerun.connect
def connect_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **e):
if isinstance(task, Singleton):
task.lock()
...
@signals.task_postrun.connect
def connect_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **e):
if isinstance(task, Singleton) and state is not states.RETRY:
task.unlock()
Any progress?, maybe it could help to test this fix.
We actually found a simpler way:
def retry(self, *args, **kwargs):
self.release_lock(task_args=self.request.args, task_kwargs=self.request.kwargs)
return super().retry(*args, **kwargs)
Happy to submit a PR if needed
Is this project abandoned? There is any other tool similar to this one?