django-tenant-schemas
django-tenant-schemas copied to clipboard
How would I use django-celery-beat with db backed scheduler?
I am aware of how to use django-celery-beat in non multi-tenant setup. I am still unclear as to how do I set it up with this package. I will note down my points of concern.
- Where does django-celery-beat package belong, to SHARED_APPS or TENANT_APPS in settings? Because migrating it will create necessary tables in db. But shouldn't they be tenant specific?
- How would I run beat command in this case because as far as I remember I have specify my scheduler. If I use PeriodicTask model from django_celery_beat package I have to put this package in SHARED_APPS which further confuses me as to how will all the tasks be stored.
I can be horribly wrong with confusions and thoughts. But if there is a clarification to this, it would be really helpful.
@bernardopires any thoughts on this one?
First of all, I would like to have django_celery_beat to be part of TENANT_APPS because model belongs to django_celery_beat package will contain tenant-specific data. But I think single celery beat process cannot monitor multiple tables(django_celery_beat_periodictask) for tasks look up, at least I have not come across such settings for celery beat process.
Here goes my solution/workaround for this problem. I put django_celery_beat as part of TENANT_APPS. I created (copied from one of the tenant schemas) all the tables which required for django_celery_beat manually to my public schema. Now all the periodic task needs to be created in public schema for all the tenants. Which can be done by switching schema to the public schema while creating the periodic task. Below is the code to create a periodic task. I have not tested below code as it is But I have used similar code in my application.
def create_or_update_periodic_task():
current_schema = connection.schema_name
# Make and entry to client schema as well. Although it will not be picked up
# by celery beat scheduler. This is to ensure backward compatibility when we
# try to introduce one celery beat for each schema and also keep the tenant specific data
# in respective schema (this is why we used tenant based app).
# Optional Code starts here
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.MINUTES)
pt_obj, created = PeriodicTask.objects.get_or_create(name='periodic_task_name',
task='task.celery_task_name')
pt_obj.interval = schedule
pt_obj.enabled = True
pt_obj.args = json.dumps([])
pt_obj.kwargs = json.dumps({
'param1': 'param1',
'param2': 'param2',
'_schema_name': current_schema
})
pt_obj.save()
# Optional Code ends here
# This will make periodic task entry in public schema
with schema_context('public'):
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.MINUTES)
pt_obj, created = PeriodicTask.objects.get_or_create(name='periodic_task_name',
task='task.celery_task_name')
pt_obj.interval = schedule
pt_obj.enabled = False
pt_obj.args = json.dumps([])
pt_obj.kwargs = json.dumps({
'param1': 'param1',
'param2': 'param2',
'_schema_name': current_schema
})
pt_obj.save()
Now we need to overwrite the beat apply_async method to use the correct schema while executing the task. Use AutoUpdateScheduler class in your beat_scheduler setting as below. Supposing if you save the AutoUpdateScheduler class in a file call beat_schedulers.py in your project. beat_scheduler = 'beat_schedulers:AutoUpdateScheduler'
from django_celery_beat.schedulers import DatabaseScheduler
from tenant_schemas.utils import schema_context
class AutoUpdateScheduler(DatabaseScheduler):
# This is to automatically load the tasks in case of dynamic additio/deletion.
def tick(self, *args, **kwargs):
if self.schedule_changed():
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()]
to_remove = [key for key in self.schedule.keys() if key not in new_schedule.keys()]
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
# This is to execute celery task in correct schema context.
def apply_async(self, entry, producer=None, advance=True, **kwargs):
schema_name = entry.kwargs.get('_schema_name', None)
if schema_name is not None:
del entry.kwargs['_schema_name']
with schema_context(schema_name):
return super(AutoUpdateScheduler, self).apply_async(entry, producer, advance, **kwargs)
else:
return super(AutoUpdateScheduler, self).apply_async(entry, producer, advance, **kwargs)
Now you run the celery beat and all task should be executed properly. I hope this helps.
I also like the thoughts of @maciej-gol and his code example at the bottem of the page: https://pypi.org/project/tenant-schemas-celery/ As long as the tasks can be administered in the public schema, but executed with the tenants, then he provides a good solution.
where to add this line ? please help beat_scheduler = 'beat_schedulers:AutoUpdateScheduler'
where to add this line ? please help beat_scheduler = 'beat_schedulers:AutoUpdateScheduler'
After you declare celery instance
app = Celery("proj")
app.conf.update(beat_scheduler="beat_schedulers:AutoUpdateScheduler")