celery-singleton icon indicating copy to clipboard operation
celery-singleton copied to clipboard

Handle Retry

Open lsaavedr opened this issue 5 years ago • 11 comments

This close definitively pull request GH-8, GH-9 and issues GH-10, GH-20. I think that with some like this can be chain works also.

lsaavedr avatar Aug 10 '20 04:08 lsaavedr

Would be awesome to get this merged in. Do you need any help reviewing it?

jayfk avatar Sep 02 '20 12:09 jayfk

Hey guys, sorry about the delay. And thanks for submitting this @lsaavedr
This looks good at a glance, but I would love some tests to go with it. If either of you want to take care of that I'm happy to merge.

  • To verify that the retry works
  • Verify that the lock is held throughout all retries
  • and released whether the retries eventually succeed or fail (though I'm assuming celery calls on_success/failure here as usual, in which case it doesn't need a test)

I'm also curious about what celery does with the task ID on retry. Does it re-queue with a new ID or re-use the same ID?

steinitzu avatar Sep 02 '20 16:09 steinitzu

Any progress here? I need this functionality but I do not want to maintain a separate package just for this fix.

JonnieDoe avatar Sep 06 '21 16:09 JonnieDoe

Progress?

process0 avatar Sep 27 '21 16:09 process0

Sorry all, been neglecting this library as I don't use celery much anymore.

I'm not sure about this PR. I would like to see some reasoning for the self._unlock_to_super_retry flag, seems odd to me and looks like it would introduce a race condition.

Tests would be appreciated as well.

steinitzu avatar Sep 27 '21 17:09 steinitzu

Thanks for the response. Can you explain a bit more about how this would introduce a race condition? If you were to implement retry, how would you do it?

process0 avatar Sep 27 '21 17:09 process0

Maybe racing is not such an issue, I haven't looked deeply into how retrying works in celery. Possibly if the lock times out before the retry call, another task could run and the retry will also go through.
I think a safer way of doing this would be to check whether the existing lock has the same task ID as the current task.

This might be an issue as well: https://github.com/steinitzu/celery-singleton/blob/527d227a6d593567eb3f5af1795ac1f67b5726da/celery_singleton/singleton.py#L120-L124

With this approach the lock would not be removed when apply_async fails during retry.

steinitzu avatar Sep 27 '21 20:09 steinitzu

Thanks. After trying out this MR and hacking around it, I could not get it to work. This project does the same as celery-once (also abandoned) by attempting to lock before apply_async (which then calls send_task).

I was able to modify this library to provide locking post send_task with retry capabilities. I haven't dug too deep into the retry functionality (eta/countdowns etc) but afaict a message is sent via apply_async, consumed from the broker, and, if eta/countdown, is put into a timer pool in the worker (possibly to be ACK'd when its countdown/etc is reached).

This sample code should be enough to build off of if anyone is interested:

class Singleton(Task):
    def __init__(self, *args, **kwargs):
        self.singleton_backend = RedisBackend(REDIS_URI)
        self._lock_key = None
        self.max_retries = None  # Try forever, change if you must
        self.__run = None

    @property
    def lock_key(self, task_args=None, task_kwargs=None):
        if self._lock_key:
            return self._lock_key

        # Generate your lock key however

        return self._lock_key

    def lock(self):
        lock = self.singleton_backend.lock(self.lock_key, self.request.id, expiry=60*5)
        logger.info(f'Attempted lock for {self.lock_key} = {lock}')

        if not lock:
            """
            Override the task function so it is not called but retried.
            """
            def terminated(*args, **kwargs):
                self.retry(countdown=60)

            # may need to do the same for __call__
            self.__run = self.run
            self.run = terminated
        else:
            if self.__run:
                self.run = self.__run

        return lock

    def unlock(self):
        unlock = self.singleton_backend.unlock(self.lock_key)
        logger.info(f'Attempted unlock for {self.lock_key} = {unlock}')

...

@signals.task_prerun.connect
def connect_task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **e):
    if isinstance(task, Singleton):
        task.lock()

...

@signals.task_postrun.connect
def connect_task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **e):
    if isinstance(task, Singleton) and state is not states.RETRY:
        task.unlock()

process0 avatar Sep 27 '21 22:09 process0

Any progress?, maybe it could help to test this fix.

Jonamaita avatar Oct 18 '22 13:10 Jonamaita

We actually found a simpler way:

    def retry(self, *args, **kwargs):
        self.release_lock(task_args=self.request.args, task_kwargs=self.request.kwargs)
        return super().retry(*args, **kwargs)

Happy to submit a PR if needed

elavaud avatar Mar 03 '23 00:03 elavaud

Is this project abandoned? There is any other tool similar to this one?

jsevillaamd avatar Nov 20 '23 17:11 jsevillaamd