django-celery-beat
django-celery-beat copied to clipboard
Removing tasks from celery_beat config doesn't remove them from database.
Summary:
I had the (obviously now incorrect) assumption that if I used celery_beat
in the celery config dictionary to define my schedule that if I changed it (renamed or removed items) that the database schedule would be synced to the removals.
I don't expect this issue to be fixed -- I'm just documenting it for other people looking for an answer.
- Celery Version: 4.2.1
- Celery-Beat Version: 1.1.1
Exact steps to reproduce the issue:
- Configure
celery_beat
in config - Run beat
- Remove some tasks/add some tasks to
celery_beat
- Rerun beat
Detailed information
I've documented a work around here, creating my own scheduler:
https://stackoverflow.com/q/56047284/2077386
import logging
from django_celery_beat.models import PeriodicTask
from django_celery_beat.models import PeriodicTasks
from django_celery_beat.schedulers import DatabaseScheduler
from django.db import transaction
class DatabaseSchedulerWithCleanup(DatabaseScheduler):
def setup_schedule(self):
schedule = self.app.conf.beat_schedule
with transaction.atomic():
num, info = PeriodicTask.objects.\
exclude(task__startswith='celery.').\
exclude(name__in=schedule.keys()).\
delete()
logging.info("Removed %d obsolete periodic tasks.", num)
if num > 0:
PeriodicTasks.update_changed()
super(DatabaseSchedulerWithCleanup, self).setup_schedule()
I'm posting it here for anyone else to use -- and in case you want to add it to the project.
This scheduler assumes you are using celery_beat
in your celery config exclusively to define your schedule as it would wipe out any tasks not included in there. It could theoretically wipe out tasks from other apps (see above I whitelist celery.*).
So.... I don't think it's actually a great candidate for the project.
Thanks @rrauenza!
I'm new to this project but looking at your solution, it seems like maybe this could be something that should be more clear in the django-celery-beat documentation?
I'm guessing that there may be a case where that data is needed for auditing purposes but then also it would be good to warn people that there could be build up.
I will look and see if this is referenced in the docs already and if not, then I will try and find a place to add a mention of this behavior.
maybe this helps to alleviate this issue https://github.com/celery/django-celery-beat/pull/389?
+1
+1
please elaborate. plus 1 comment isn't helpful
[2022-12-08 11:23:18,586: ERROR/MainProcess] Received unregistered task of type 'task_heartbeat'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
strategy = strategies[type_]
KeyError: 'task_heartbeat'
This is still happening. I removed my task function and removed the task from CELERY_BEAT_SCHEDULE
. Celery should not be attempting to execute this task, or scheduling new tasks for this since it is removed from the schedule
@richardARPANET You haven't described how you've integrated the workaround. Did you add and enable the alternate scheduler DatabaseSchedulerWithCleanup
? Did you check the logs to see if the item was removed?
Same problem, deleted all about task, but it still tries to execute each time I start celery
One way was to
import logging
from django_celery_beat import models
logger = logging.getLogger(__name__)
def clear_celery_beat_tables():
try:
logger.info('Clearing all celery beat tables')
models.CrontabSchedule.objects.all().delete()
models.SolarSchedule.objects.all().delete()
models.ClockedSchedule.objects.all().delete()
models.IntervalSchedule.objects.all().delete()
models.PeriodicTask.objects.all().delete()
models.PeriodicTasks.objects.all().delete()
logger.info('Celery beat tables cleared')
except AttributeError:
logger.info('Error while clearing celery tables')
Then call this when your application starts (before Celery app init ideally)