django-q icon indicating copy to clipboard operation
django-q copied to clipboard

Add 'retry' to opt_keys in 'async_task'

Open kennyhei opened this issue 4 years ago • 2 comments

Hi again @Koed00

I'd like to add retry to opt_keys in async_task. We have implemented our own custom ORM broker that makes use of custom retry value like this:

def _timeout(retry=None):
    retry = retry or Conf.RETRY
    return timezone.now() - timedelta(seconds=retry)


class ORM(BaseORM):

    def enqueue(self, task):
        data = SignedPackage.loads(task)
        retry = data.get('retry')
        package = self.get_connection().create(
            key=self.list_key, payload=task, lock=_timeout(retry)
        )
        return package.pk

    def dequeue(self):
        # Query queued tasks using default _timeout
        tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[
            0:Conf.BULK
        ]
        if tasks:
            task_list = []
            for task in tasks:
                # Check here if task has custom timeout. If lock
                # is greater than custom timeout time, skip
                retry = task.task().get('retry')
                if task.lock > _timeout(retry):
                    continue
                if (
                    self.get_connection()
                    .filter(id=task.id, lock=task.lock)
                    .update(lock=timezone.now())
                ):
                    task_list.append((task.pk, task.payload))
                # else don't process, as another cluster has been faster than us on that task
            return task_list
        # empty queue, spare the cpu
        sleep(Conf.POLL)

This isn't at the moment possible because retry is missing from opt_keys and thus not added in the task payload data.

kennyhei avatar Sep 24 '21 09:09 kennyhei

I see this as a valuable feature. We have a few tasks that can take several minutes. I don't think it appropriate to raise the retry for all tasks in our app (including some that take a few seconds) to several minutes. It is curious that the timeout can be configured per task, but not the retry.

jasonbodily avatar Oct 06 '21 22:10 jasonbodily

@jasonbodily For now I have solved this by checking whether task has custom timeout and then creating retry variable on the fly which equals to timeout + 5 minutes:

def _timeout(retry=None):
    retry = retry or Conf.RETRY
    return timezone.now() - timedelta(seconds=retry)

class ORM(BaseORM):

    def _get_retry(self, data):
        # Retry 5 minutes after timeout
        if not data.get('timeout'):
            return None
        return timeout + 300

    def enqueue(self, task):
        data = SignedPackage.loads(task)
        retry = self._get_retry(data)
        package = self.get_connection().create(
            key=self.list_key, payload=task, lock=_timeout(retry)
        )
        return package.pk

kennyhei avatar Oct 14 '21 15:10 kennyhei