celery_longterm_scheduler icon indicating copy to clipboard operation
celery_longterm_scheduler copied to clipboard

Handle long countdowns as well as ETAs

Open wryun opened this issue 4 years ago • 6 comments

I would find it useful if Task was overridden to handle long countdowns (configurable) as well as ETAs.

Would you be open to a PR?

I believe this is particularly important when you're using redis as a celery backend, since long countdowns can push you past the visibility timeout and cause duplicates: https://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#id1

wryun avatar Jan 07 '21 02:01 wryun

Amusingly, this is the same thing that Celery does deep in its internals (i.e. countdowns are equivalent to ETAs):

https://github.com/celery/celery/blob/8c5e9888ae10288ae1b2113bdce6a4a41c47354b/celery/app/amqp.py#L294

wryun avatar Jan 07 '21 04:01 wryun

Since countdown is syntactic sugar for eta as @wryun has pointed out, this is definitely a gotcha.

jeffreybrowning avatar Apr 21 '21 22:04 jeffreybrowning

We ended up creating our own somewhat scary implementation that backed onto a django db table:

"""
Back longterm celery tasks onto db.
See PendingTaskCelery for how these get created.
"""

from importlib import import_module
import logging

from django.db import transaction
from django.utils.timezone import now as get_now

from kombu.utils.uuid import uuid

from grok import celery_app
from grok.models import PendingTask

__all__ = ['execute_pending_tasks', 'schedule_pending_task']


logger = logging.getLogger('grok.main')


def schedule_pending_task(eta, args, kwargs):
  # If it's a pending task, we assume etas/countdowns have already been dealt with.
  assert kwargs.get('eta') is None, 'Attempting to schedule pending task with embedded eta'
  assert kwargs.get('countdown') is None, 'Attempting to schedule pending task with embedded countdown'

  # Use the celery task_id for scheduler storage.
  # This may make it slightly nicer to follow a task through the system.
  if not kwargs.get('task_id'):
    kwargs['task_id'] = uuid()

  # We can't JSON encode task type (it's the task function itself).
  task_type = kwargs.get('task_type')
  if task_type:
    kwargs['task_type'] = f'{task_type.__module__}:{task_type.__name__}'

  pt = PendingTask.objects.create(id=kwargs['task_id'], eta=eta, args=args, kwargs=kwargs)
  logger.info('Scheduled task %s for %s', pt.id, pt.eta)

  return kwargs['task_id']


def execute_pending_tasks(current_time):
  while True:
    with transaction.atomic():
      # skip_locked means it's safe to run multiple of these at the same time without blocking,
      # but we don't intend to do this...
      pt = PendingTask.objects.select_for_update(skip_locked=True).filter(eta__lte=current_time).order_by('eta').first()
      if not pt:
        break

      if pt.kwargs.get('task_type'):
        mod, name = pt.kwargs['task_type'].split(':')
        pt.kwargs['task_type'] = getattr(import_module(mod), name)

      celery_app.send_task(*pt.args, **pt.kwargs)

      logger.info('Enqueued scheduled task %s (expected at %s, enqueued at %s)', pt.id, pt.eta, get_now())
      pt.delete()
class PendingTaskCelery(Celery):
  """
  Intercept send_task on long countdowns/ETAs and send them to our PendingTasks.
  Inspired by https://github.com/ZeitOnline/celery_longterm_scheduler
  This avoids the issue with countdowns exceeding the visibility timeout:
  https://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#redis-caveats
  """
  def send_task(self, *args, **kwargs):
    # Convert any long countdowns to ETAs.
    countdown = kwargs.get('countdown', 0)
    if countdown > LONG_COUNTDOWN_THRESHOLD:
      # Amusingly, the underlying code in Celery does the same thing to countdowns:
      # https://github.com/celery/celery/blob/8c5e9888ae10288ae1b2113bdce6a4a41c47354b/celery/app/amqp.py#L294
      del kwargs['countdown']
      kwargs['eta'] = get_now() + timedelta(seconds=countdown)

    if kwargs.get('eta') is None:  # Can't just check for eta, because sometimes it's set to None (e.g. on retries)
      return super().send_task(*args, **kwargs)
    else:
      from grok.core.pending_tasks import schedule_pending_task
      result_cls = kwargs.pop('result_cls', self.AsyncResult)

      if 'task_id' not in kwargs:
        kwargs['task_id'] = uuid()

      schedule_pending_task(kwargs.pop('eta'), args, kwargs)
      return result_cls(kwargs['task_id'])
      

Same BSD licence as applied to this repository if you want to use it, since it was inspired by.

wryun avatar Apr 22 '21 01:04 wryun

(there's a celery beat scheduled task which calls execute_pending_tasks)

wryun avatar Apr 22 '21 01:04 wryun

@wryun I'm struggling to do same thing. I have constantly getting hundreds of tasks each hour, and some of them have eta for several days.

You are launching this celery beat periodic task each minute?

SuperMasterBlasterLaser avatar Sep 12 '22 10:09 SuperMasterBlasterLaser

@SuperMasterBlasterLaser yes, we run execute_pending_tasks every minute. The code above appears to have been working well for the past year and a half, but please do your own checking ;)

If you want better granularity on the ETAs than we need, you could change this so that execute_pending_tasks puts things on the queue with an ETA if they're less than the beat interval (i.e. if the beat interval is 1 minute, anything due in the next minute should be queued, as opposed to the current approach which is to wait until they're past due).

wryun avatar Sep 19 '22 23:09 wryun