django-celery-beat icon indicating copy to clipboard operation
django-celery-beat copied to clipboard

Migrating from djcelery

Open alon-unifi opened this issue 6 years ago • 19 comments

I'm in the process of switching from djcelery to django_celery-beat when upgrading to celery 4.x, as djcelery is no longer supported in celery 4. Is there a recommended way to migrate all the djcelery model data to django-celery-beat models? It's pretty clear that the projects are related to each other by looking at the model structure and committers. Seems like the models are almost exactly the same (with some additional ones in django-celery-beat). Yet I can't find any mention of djcelery in the django-celery-beat docs.

Should I simply create a data migration in django to directly copy over data from djcelery app models to the "matching" django-celery-beat app models? I'm happy to contribute information to the readme once I understand the guidelines of achieving this, if that will help other as well.

Thanks!

alon-unifi avatar Jun 28 '18 08:06 alon-unifi

Experiencing similar issue trying to upgrade several of our projects from 3.x to 4.x and finding our beat tasks missing (but found in the djcelery table).

ntravis avatar Jul 11 '18 15:07 ntravis

Hi @ask. Will appreciate a quick comment from you on this if possible. I'm assuming there are a class of djcelery users that would will also face this question and wondering if you could advise on the right approach to take (even in high level). Many thanks.

alon-unifi avatar Jul 29 '18 12:07 alon-unifi

My solution to this problem -- there is probably a cleaner step # 2 or even a way to do this in fewer commands but this worked for me :)

  1. Run the following to get a dump of your set up in djcelery and save it to djcelery.json django-admin dumpdata djcelery --format json --exclude djcelery.taskmeta -o djcelery.json
  2. Then inside of the file replace djcelery with django_celery_beat
  3. Run the following to load the edited dump into your database: django-admin loaddata djcelery.json

tgehrs avatar Oct 30 '18 18:10 tgehrs

@tgehrs sounds like you performed a migration outside of migrations :) if the original djcelery is completely dead (no future updates), it would be good to have one update pushed to add a migration to move you onto this package, but maybe crossing package boundaries like that isn't standard.

ntravis avatar Oct 30 '18 19:10 ntravis

@ntravis I agree that would be nice, hopefully my "solution" will save a few people 20 minutes of frustration :) maybe a transition guide in the docs would be a nice in between?

tgehrs avatar Oct 31 '18 12:10 tgehrs

Please send a PR with your workarounds

auvipy avatar Nov 02 '18 06:11 auvipy

@auvipy will do. I attempted to clean it up a little bit and write a function to automate it. But in attempting to test it seems that you cannot have both djcelery and django_celery_beat both installed at the same time due to conflicts in celery version, can you confirm that is the case?

If they cannot both be installed at the same time I will plan to add my previous solution to the installation portion of README.rst, is that a good plan?

If they can both be installed at the same time where in the project should I put the below?

from io import StringIO
import tempfile
import json

from django.core.management import call_command

def import_djcelery_data():
    #dump the old data
    out = StringIO()
    call_command('dumpdata', 'djcelery', format='json', exclude=['djcelery.taskmeta'], stdout=out)
    #transform the old data to new data
    old_data = json.loads(out.getvalue())
    new_data = []
    for old_datum in old_data:
      for key,value in old_datum.items():
          if 'djcelery' in str(value):
              old_datum.pop(key)
              old_datum[key] = value.replace('djcelery', 'django_celery_beat')
          new_data.append(old_datum)
    new_data = json.dumps(new_data)
    # Reload now dumped data
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as tmp:
        tmp.write(new_data)
        tmp.seek(0)
        call_command('loaddata', tmp.name, verbosity=0)

tgehrs avatar Nov 02 '18 13:11 tgehrs

Doesnt django_celery_beat already populate the periodictask and other tasks already if the tasks are registered properly with celery 4.2?

milind-shakya-sp avatar Nov 29 '18 21:11 milind-shakya-sp

@milind-shakya-sp you are correct in your statement but this issue concerns how to move from the deprecated djcelery library to the django_celery_beat library

tgehrs avatar Nov 29 '18 21:11 tgehrs

I understand, am just thinking what models, data are you thinking of migrating. I can see that some of the tables in the new django-celery-beat are already populated, during the registration of tasks, what other data are you looking to migrate?

milind-shakya-sp avatar Nov 29 '18 21:11 milind-shakya-sp

Hi, I'm responsible for migrating a medium-sized django project off of django-celery in the very near future, and would love to know what the status of all this is. I can't imagine I'm the only one facing this in the next 6 months or so, given the python2 EOL.

nthall avatar May 30 '19 17:05 nthall

Well, you obviously can't just use the same tables, since django names them according to the app name which has changed, but you could probably easily add an manage.py command to import data form the old djcelery tables. If anyone wants to take a stab at it, please send us a PR!

liquidpele avatar May 30 '19 20:05 liquidpele

@nthall i think either of my two solutions should do the trick, let me know if you need clarification on either. There probably are better ways to do it too :)

A) Manual -- this is how I did it because I needed something quick to get it completed

  1. Run the following to get a dump of your set up in djcelery and save it to djcelery.json django-admin dumpdata djcelery --format json --exclude djcelery.taskmeta -o djcelery.json
  2. Then inside of the file go through and do a find and replace for djcelery with django_celery_beat
  3. Run the following to load the edited dump into your database: django-admin loaddata djcelery.json

B) Script -- This is a script I wrote after the fact, but never tried/tested it. I was under the assumption that removing the django-celery app from settings would delete the database. After doing a little more reading today, that does not seem to be the case, so this script should work

from io import StringIO
import tempfile
import json

from django.core.management import call_command

def import_djcelery_data():
    #dump the old data
    out = StringIO()
    call_command('dumpdata', 'djcelery', format='json', exclude=['djcelery.taskmeta'], stdout=out)
    #transform the old data to new data
    old_data = json.loads(out.getvalue())
    new_data = []
    for old_datum in old_data:
      for key,value in old_datum.items():
          if 'djcelery' in str(value):
              old_datum.pop(key)
              old_datum[key] = value.replace('djcelery', 'django_celery_beat')
          new_data.append(old_datum)
    new_data = json.dumps(new_data)
    # Reload now dumped data
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as tmp:
        tmp.write(new_data)
        tmp.seek(0)
        call_command('loaddata', tmp.name, verbosity=0)

tgehrs avatar May 30 '19 21:05 tgehrs

I got an error on Python 3.8 for script above, didn't happen in 3.6, not sure what cause is:

  File "/opt/master/src/app/migrations/0004_celery4.py", line 22, in import_djcelery_data
    for key,value in old_datum.items():
RuntimeError: dictionary keys changed during iteration

Changed line to

    for key, value in list(old_datum.items()):

ciarancourtney avatar May 18 '20 10:05 ciarancourtney

I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone

Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated

Instructions:

python manage.py migrate_from_djcelery
# Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai

Below is the command file

commands/migrate_from_djcelery.py

from django.core.management import BaseCommand
from django.db import transaction
from django_celery_beat.models import (
    IntervalSchedule,
    CrontabSchedule,
    PeriodicTasks,
    PeriodicTask,
)

from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import (
    execute,
    DjIntervalSchedule,
    DjCrontabSchedule,
    DjPeriodicTask,
    DjPeriodicTasks,
)


class Command(BaseCommand):
    def add_arguments(self, parser):
        parser.add_argument("-tz", help="指定旧版djcelery运行的时区")

    @transaction.atomic
    def handle(self, *args, **options):
        tz = "UTC"  # pylint: disable=invalid-name
        if options["tz"]:
            tz = options["tz"]  # pylint: disable=invalid-name
        new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask)
        old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask)
        # 迁移带时区的表
        execute(CrontabSchedule, DjCrontabSchedule, tz)
        # 迁移不带时区的表
        for new_table, old_table in zip(new_db_table, old_db_table):
            execute(new_table, old_table)

commands/handlers/migrate_from_djcelery_handler.py

The handler file is used to restore the structure of the djcelery model

from django.core.management.base import CommandError
from django.db import models
from django.utils.translation import ugettext_lazy as _


class DjCrontabSchedule(models.Model):
    minute = models.CharField(max_length=64, default="*")
    hour = models.CharField(max_length=64, default="*")
    day_of_week = models.CharField(max_length=64, default="*",)
    day_of_month = models.CharField(max_length=64, default="*")
    month_of_year = models.CharField(max_length=64, default="*")

    class Meta:
        db_table = "djcelery_crontabschedule"


class DjPeriodicTasks(models.Model):
    ident = models.SmallIntegerField(default=1, primary_key=True, unique=True)
    last_update = models.DateTimeField(null=False)

    class Meta:
        db_table = "djcelery_periodictasks"


class DjIntervalSchedule(models.Model):
    every = models.IntegerField(_("every"), null=False)
    period = models.CharField(_("period"), max_length=24)

    class Meta:
        db_table = "djcelery_intervalschedule"
        verbose_name = _("interval")
        verbose_name_plural = _("intervals")
        ordering = ["period", "every"]


class DjPeriodicTask(models.Model):
    name = models.CharField(
        _("name"), max_length=200, unique=True, help_text=_("Useful description"),
    )
    task = models.CharField(_("task name"), max_length=200)
    interval = models.ForeignKey(
        DjIntervalSchedule,
        null=True,
        blank=True,
        verbose_name=_("interval"),
        on_delete=models.CASCADE,
    )
    crontab = models.ForeignKey(
        DjCrontabSchedule,
        null=True,
        blank=True,
        verbose_name=_("crontab"),
        on_delete=models.CASCADE,
        help_text=_("Use one of interval/crontab"),
    )
    args = models.TextField(
        _("Arguments"),
        blank=True,
        default="[]",
        help_text=_("JSON encoded positional arguments"),
    )
    kwargs = models.TextField(
        _("Keyword arguments"),
        blank=True,
        default="{}",
        help_text=_("JSON encoded keyword arguments"),
    )
    queue = models.CharField(
        _("queue"),
        max_length=200,
        blank=True,
        null=True,
        default=None,
        help_text=_("Queue defined in CELERY_QUEUES"),
    )
    exchange = models.CharField(
        _("exchange"), max_length=200, blank=True, null=True, default=None,
    )
    routing_key = models.CharField(
        _("routing key"), max_length=200, blank=True, null=True, default=None,
    )
    expires = models.DateTimeField(_("expires"), blank=True, null=True,)
    enabled = models.BooleanField(_("enabled"), default=True,)
    last_run_at = models.DateTimeField(
        auto_now=False, auto_now_add=False, editable=False, blank=True, null=True,
    )
    total_run_count = models.PositiveIntegerField(default=0, editable=False,)
    date_changed = models.DateTimeField(auto_now=True)
    description = models.TextField(_("description"), blank=True)

    no_changes = False

    class Meta:
        db_table = "djcelery_periodictask"
        verbose_name = _("periodic task")
        verbose_name_plural = _("periodic tasks")


class DjWorkerState(models.Model):
    hostname = models.CharField(_("hostname"), max_length=255, unique=True)
    last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("worker")
        verbose_name_plural = _("workers")
        get_latest_by = "last_heartbeat"
        ordering = ["-last_heartbeat"]


class DjTaskState(models.Model):
    state = models.CharField(_("state"), max_length=64)
    task_id = models.CharField(_("UUID"), max_length=36, unique=True)
    name = models.CharField(_("name"), max_length=200, null=True, db_index=True,)
    tstamp = models.DateTimeField(_("event received at"), db_index=True)
    args = models.TextField(_("Arguments"), null=True)
    kwargs = models.TextField(_("Keyword arguments"), null=True)
    eta = models.DateTimeField(_("ETA"), null=True)
    expires = models.DateTimeField(_("expires"), null=True)
    result = models.TextField(_("result"), null=True)
    traceback = models.TextField(_("traceback"), null=True)
    runtime = models.FloatField(
        _("execution time"), null=True, help_text=_("in seconds if task succeeded"),
    )
    retries = models.IntegerField(_("number of retries"), default=0)
    worker = models.ForeignKey(
        DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE,
    )
    hidden = models.BooleanField(editable=False, default=False, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("task")
        verbose_name_plural = _("tasks")
        get_latest_by = "tstamp"
        ordering = ["-tstamp"]


def execute(new_table, old_table, tz=None):  # pylint: disable=invalid-name
    if new_table.objects.exists():
        print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库")
        raise CommandError(
            "The target database {} already has data and cannot be migrated".format(
                new_table._meta.model_name
            )
        )
    for old_data in old_table.objects.all():
        new_data = new_table()
        if tz:
            new_data.__setattr__("timezone", tz)
        for field in old_data._meta.fields:
            field_name = field.name
            # 判断是否为外键
            if field_name in ("crontab", "interval", "worker"):
                try:
                    # 写入外键id
                    new_data.__setattr__(
                        field_name + "_id", old_data.__getattribute__(field_name).id
                    )
                except AttributeError:
                    new_data.__setattr__(field_name + "_id", None)
            else:
                new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
        new_data.save()

JavHou avatar Nov 17 '20 03:11 JavHou

I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone

Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated

Instructions:

python manage.py migrate_from_djcelery
# Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai

Below is the command file

commands/migrate_from_djcelery.py

from django.core.management import BaseCommand
from django.db import transaction
from django_celery_beat.models import (
    IntervalSchedule,
    CrontabSchedule,
    PeriodicTasks,
    PeriodicTask,
)

from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import (
    execute,
    DjIntervalSchedule,
    DjCrontabSchedule,
    DjPeriodicTask,
    DjPeriodicTasks,
)


class Command(BaseCommand):
    def add_arguments(self, parser):
        parser.add_argument("-tz", help="指定旧版djcelery运行的时区")

    @transaction.atomic
    def handle(self, *args, **options):
        tz = "UTC"  # pylint: disable=invalid-name
        if options["tz"]:
            tz = options["tz"]  # pylint: disable=invalid-name
        new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask)
        old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask)
        # 迁移带时区的表
        execute(CrontabSchedule, DjCrontabSchedule, tz)
        # 迁移不带时区的表
        for new_table, old_table in zip(new_db_table, old_db_table):
            execute(new_table, old_table)

commands/handlers/migrate_from_djcelery_handler.py

The handler file is used to restore the structure of the djcelery model

from django.core.management.base import CommandError
from django.db import models
from django.utils.translation import ugettext_lazy as _


class DjCrontabSchedule(models.Model):
    minute = models.CharField(max_length=64, default="*")
    hour = models.CharField(max_length=64, default="*")
    day_of_week = models.CharField(max_length=64, default="*",)
    day_of_month = models.CharField(max_length=64, default="*")
    month_of_year = models.CharField(max_length=64, default="*")

    class Meta:
        db_table = "djcelery_crontabschedule"


class DjPeriodicTasks(models.Model):
    ident = models.SmallIntegerField(default=1, primary_key=True, unique=True)
    last_update = models.DateTimeField(null=False)

    class Meta:
        db_table = "djcelery_periodictasks"


class DjIntervalSchedule(models.Model):
    every = models.IntegerField(_("every"), null=False)
    period = models.CharField(_("period"), max_length=24)

    class Meta:
        db_table = "djcelery_intervalschedule"
        verbose_name = _("interval")
        verbose_name_plural = _("intervals")
        ordering = ["period", "every"]


class DjPeriodicTask(models.Model):
    name = models.CharField(
        _("name"), max_length=200, unique=True, help_text=_("Useful description"),
    )
    task = models.CharField(_("task name"), max_length=200)
    interval = models.ForeignKey(
        DjIntervalSchedule,
        null=True,
        blank=True,
        verbose_name=_("interval"),
        on_delete=models.CASCADE,
    )
    crontab = models.ForeignKey(
        DjCrontabSchedule,
        null=True,
        blank=True,
        verbose_name=_("crontab"),
        on_delete=models.CASCADE,
        help_text=_("Use one of interval/crontab"),
    )
    args = models.TextField(
        _("Arguments"),
        blank=True,
        default="[]",
        help_text=_("JSON encoded positional arguments"),
    )
    kwargs = models.TextField(
        _("Keyword arguments"),
        blank=True,
        default="{}",
        help_text=_("JSON encoded keyword arguments"),
    )
    queue = models.CharField(
        _("queue"),
        max_length=200,
        blank=True,
        null=True,
        default=None,
        help_text=_("Queue defined in CELERY_QUEUES"),
    )
    exchange = models.CharField(
        _("exchange"), max_length=200, blank=True, null=True, default=None,
    )
    routing_key = models.CharField(
        _("routing key"), max_length=200, blank=True, null=True, default=None,
    )
    expires = models.DateTimeField(_("expires"), blank=True, null=True,)
    enabled = models.BooleanField(_("enabled"), default=True,)
    last_run_at = models.DateTimeField(
        auto_now=False, auto_now_add=False, editable=False, blank=True, null=True,
    )
    total_run_count = models.PositiveIntegerField(default=0, editable=False,)
    date_changed = models.DateTimeField(auto_now=True)
    description = models.TextField(_("description"), blank=True)

    no_changes = False

    class Meta:
        db_table = "djcelery_periodictask"
        verbose_name = _("periodic task")
        verbose_name_plural = _("periodic tasks")


class DjWorkerState(models.Model):
    hostname = models.CharField(_("hostname"), max_length=255, unique=True)
    last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("worker")
        verbose_name_plural = _("workers")
        get_latest_by = "last_heartbeat"
        ordering = ["-last_heartbeat"]


class DjTaskState(models.Model):
    state = models.CharField(_("state"), max_length=64)
    task_id = models.CharField(_("UUID"), max_length=36, unique=True)
    name = models.CharField(_("name"), max_length=200, null=True, db_index=True,)
    tstamp = models.DateTimeField(_("event received at"), db_index=True)
    args = models.TextField(_("Arguments"), null=True)
    kwargs = models.TextField(_("Keyword arguments"), null=True)
    eta = models.DateTimeField(_("ETA"), null=True)
    expires = models.DateTimeField(_("expires"), null=True)
    result = models.TextField(_("result"), null=True)
    traceback = models.TextField(_("traceback"), null=True)
    runtime = models.FloatField(
        _("execution time"), null=True, help_text=_("in seconds if task succeeded"),
    )
    retries = models.IntegerField(_("number of retries"), default=0)
    worker = models.ForeignKey(
        DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE,
    )
    hidden = models.BooleanField(editable=False, default=False, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("task")
        verbose_name_plural = _("tasks")
        get_latest_by = "tstamp"
        ordering = ["-tstamp"]


def execute(new_table, old_table, tz=None):  # pylint: disable=invalid-name
    if new_table.objects.exists():
        print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库")
        raise CommandError(
            "The target database {} already has data and cannot be migrated".format(
                new_table._meta.model_name
            )
        )
    for old_data in old_table.objects.all():
        new_data = new_table()
        if tz:
            new_data.__setattr__("timezone", tz)
        for field in old_data._meta.fields:
            field_name = field.name
            # 判断是否为外键
            if field_name in ("crontab", "interval", "worker"):
                try:
                    # 写入外键id
                    new_data.__setattr__(
                        field_name + "_id", old_data.__getattribute__(field_name).id
                    )
                except AttributeError:
                    new_data.__setattr__(field_name + "_id", None)
            else:
                new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
        new_data.save()

hey, would you mind including this script in the docs migration guide?

auvipy avatar Dec 14 '20 14:12 auvipy

I wrote the django command to migrate djcelery to django-celery-beat. Since the table fields of the djcelery table and django-celery-beat are not much different, the mapping method is used directly, I hope it will be helpful to everyone

Note: The premise of using this command is that django-celery-beat has been installed and django-celery-beat models has been migrated

Instructions:

python manage.py migrate_from_djcelery
# Optional parameter: -tz specifies the time zone in which celery was running before, and does not specify the UTC time zone by default. For example: -tz Asia/Shanghai

Below is the command file commands/migrate_from_djcelery.py

from django.core.management import BaseCommand
from django.db import transaction
from django_celery_beat.models import (
    IntervalSchedule,
    CrontabSchedule,
    PeriodicTasks,
    PeriodicTask,
)

from blueapps.contrib.bk_commands.management.handlers.migrate_from_djcelery_handler import (
    execute,
    DjIntervalSchedule,
    DjCrontabSchedule,
    DjPeriodicTask,
    DjPeriodicTasks,
)


class Command(BaseCommand):
    def add_arguments(self, parser):
        parser.add_argument("-tz", help="指定旧版djcelery运行的时区")

    @transaction.atomic
    def handle(self, *args, **options):
        tz = "UTC"  # pylint: disable=invalid-name
        if options["tz"]:
            tz = options["tz"]  # pylint: disable=invalid-name
        new_db_table = (IntervalSchedule, PeriodicTasks, PeriodicTask)
        old_db_table = (DjIntervalSchedule, DjPeriodicTasks, DjPeriodicTask)
        # 迁移带时区的表
        execute(CrontabSchedule, DjCrontabSchedule, tz)
        # 迁移不带时区的表
        for new_table, old_table in zip(new_db_table, old_db_table):
            execute(new_table, old_table)

commands/handlers/migrate_from_djcelery_handler.py

The handler file is used to restore the structure of the djcelery model

from django.core.management.base import CommandError
from django.db import models
from django.utils.translation import ugettext_lazy as _


class DjCrontabSchedule(models.Model):
    minute = models.CharField(max_length=64, default="*")
    hour = models.CharField(max_length=64, default="*")
    day_of_week = models.CharField(max_length=64, default="*",)
    day_of_month = models.CharField(max_length=64, default="*")
    month_of_year = models.CharField(max_length=64, default="*")

    class Meta:
        db_table = "djcelery_crontabschedule"


class DjPeriodicTasks(models.Model):
    ident = models.SmallIntegerField(default=1, primary_key=True, unique=True)
    last_update = models.DateTimeField(null=False)

    class Meta:
        db_table = "djcelery_periodictasks"


class DjIntervalSchedule(models.Model):
    every = models.IntegerField(_("every"), null=False)
    period = models.CharField(_("period"), max_length=24)

    class Meta:
        db_table = "djcelery_intervalschedule"
        verbose_name = _("interval")
        verbose_name_plural = _("intervals")
        ordering = ["period", "every"]


class DjPeriodicTask(models.Model):
    name = models.CharField(
        _("name"), max_length=200, unique=True, help_text=_("Useful description"),
    )
    task = models.CharField(_("task name"), max_length=200)
    interval = models.ForeignKey(
        DjIntervalSchedule,
        null=True,
        blank=True,
        verbose_name=_("interval"),
        on_delete=models.CASCADE,
    )
    crontab = models.ForeignKey(
        DjCrontabSchedule,
        null=True,
        blank=True,
        verbose_name=_("crontab"),
        on_delete=models.CASCADE,
        help_text=_("Use one of interval/crontab"),
    )
    args = models.TextField(
        _("Arguments"),
        blank=True,
        default="[]",
        help_text=_("JSON encoded positional arguments"),
    )
    kwargs = models.TextField(
        _("Keyword arguments"),
        blank=True,
        default="{}",
        help_text=_("JSON encoded keyword arguments"),
    )
    queue = models.CharField(
        _("queue"),
        max_length=200,
        blank=True,
        null=True,
        default=None,
        help_text=_("Queue defined in CELERY_QUEUES"),
    )
    exchange = models.CharField(
        _("exchange"), max_length=200, blank=True, null=True, default=None,
    )
    routing_key = models.CharField(
        _("routing key"), max_length=200, blank=True, null=True, default=None,
    )
    expires = models.DateTimeField(_("expires"), blank=True, null=True,)
    enabled = models.BooleanField(_("enabled"), default=True,)
    last_run_at = models.DateTimeField(
        auto_now=False, auto_now_add=False, editable=False, blank=True, null=True,
    )
    total_run_count = models.PositiveIntegerField(default=0, editable=False,)
    date_changed = models.DateTimeField(auto_now=True)
    description = models.TextField(_("description"), blank=True)

    no_changes = False

    class Meta:
        db_table = "djcelery_periodictask"
        verbose_name = _("periodic task")
        verbose_name_plural = _("periodic tasks")


class DjWorkerState(models.Model):
    hostname = models.CharField(_("hostname"), max_length=255, unique=True)
    last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("worker")
        verbose_name_plural = _("workers")
        get_latest_by = "last_heartbeat"
        ordering = ["-last_heartbeat"]


class DjTaskState(models.Model):
    state = models.CharField(_("state"), max_length=64)
    task_id = models.CharField(_("UUID"), max_length=36, unique=True)
    name = models.CharField(_("name"), max_length=200, null=True, db_index=True,)
    tstamp = models.DateTimeField(_("event received at"), db_index=True)
    args = models.TextField(_("Arguments"), null=True)
    kwargs = models.TextField(_("Keyword arguments"), null=True)
    eta = models.DateTimeField(_("ETA"), null=True)
    expires = models.DateTimeField(_("expires"), null=True)
    result = models.TextField(_("result"), null=True)
    traceback = models.TextField(_("traceback"), null=True)
    runtime = models.FloatField(
        _("execution time"), null=True, help_text=_("in seconds if task succeeded"),
    )
    retries = models.IntegerField(_("number of retries"), default=0)
    worker = models.ForeignKey(
        DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE,
    )
    hidden = models.BooleanField(editable=False, default=False, db_index=True)

    class Meta:
        """Model meta-data."""

        verbose_name = _("task")
        verbose_name_plural = _("tasks")
        get_latest_by = "tstamp"
        ordering = ["-tstamp"]


def execute(new_table, old_table, tz=None):  # pylint: disable=invalid-name
    if new_table.objects.exists():
        print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库")
        raise CommandError(
            "The target database {} already has data and cannot be migrated".format(
                new_table._meta.model_name
            )
        )
    for old_data in old_table.objects.all():
        new_data = new_table()
        if tz:
            new_data.__setattr__("timezone", tz)
        for field in old_data._meta.fields:
            field_name = field.name
            # 判断是否为外键
            if field_name in ("crontab", "interval", "worker"):
                try:
                    # 写入外键id
                    new_data.__setattr__(
                        field_name + "_id", old_data.__getattribute__(field_name).id
                    )
                except AttributeError:
                    new_data.__setattr__(field_name + "_id", None)
            else:
                new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
        new_data.save()

hey, would you mind including this script in the docs migration guide?

Of course don’t mind, I hope this script can help more people

JavHou avatar Dec 15 '20 04:12 JavHou

for sure

auvipy avatar Dec 15 '20 08:12 auvipy

Here is a modified version which adds an app_label because that is now required:

from django.db import models
from django.utils.translation import ugettext_lazy as _


class DjCrontabSchedule(models.Model):
    minute = models.CharField(max_length=64, default="*")
    hour = models.CharField(max_length=64, default="*")
    day_of_week = models.CharField(max_length=64, default="*",)
    day_of_month = models.CharField(max_length=64, default="*")
    month_of_year = models.CharField(max_length=64, default="*")

    class Meta:
        app_label = "temp"
        db_table = "djcelery_crontabschedule"


class DjPeriodicTasks(models.Model):
    ident = models.SmallIntegerField(default=1, primary_key=True, unique=True)
    last_update = models.DateTimeField(null=False)

    class Meta:
        app_label = "temp"
        db_table = "djcelery_periodictasks"


class DjIntervalSchedule(models.Model):
    every = models.IntegerField(_("every"), null=False)
    period = models.CharField(_("period"), max_length=24)

    class Meta:
        app_label = "temp"
        db_table = "djcelery_intervalschedule"
        verbose_name = _("interval")
        verbose_name_plural = _("intervals")
        ordering = ["period", "every"]


class DjPeriodicTask(models.Model):
    name = models.CharField(
        _("name"), max_length=200, unique=True, help_text=_("Useful description"),
    )
    task = models.CharField(_("task name"), max_length=200)
    interval = models.ForeignKey(
        DjIntervalSchedule,
        null=True,
        blank=True,
        verbose_name=_("interval"),
        on_delete=models.CASCADE,
    )
    crontab = models.ForeignKey(
        DjCrontabSchedule,
        null=True,
        blank=True,
        verbose_name=_("crontab"),
        on_delete=models.CASCADE,
        help_text=_("Use one of interval/crontab"),
    )
    args = models.TextField(
        _("Arguments"),
        blank=True,
        default="[]",
        help_text=_("JSON encoded positional arguments"),
    )
    kwargs = models.TextField(
        _("Keyword arguments"),
        blank=True,
        default="{}",
        help_text=_("JSON encoded keyword arguments"),
    )
    queue = models.CharField(
        _("queue"),
        max_length=200,
        blank=True,
        null=True,
        default=None,
        help_text=_("Queue defined in CELERY_QUEUES"),
    )
    exchange = models.CharField(
        _("exchange"), max_length=200, blank=True, null=True, default=None,
    )
    routing_key = models.CharField(
        _("routing key"), max_length=200, blank=True, null=True, default=None,
    )
    expires = models.DateTimeField(_("expires"), blank=True, null=True,)
    enabled = models.BooleanField(_("enabled"), default=True,)
    last_run_at = models.DateTimeField(
        auto_now=False, auto_now_add=False, editable=False, blank=True, null=True,
    )
    total_run_count = models.PositiveIntegerField(default=0, editable=False,)
    date_changed = models.DateTimeField(auto_now=True)
    description = models.TextField(_("description"), blank=True)

    no_changes = False

    class Meta:
        app_label = "temp"
        db_table = "djcelery_periodictask"
        verbose_name = _("periodic task")
        verbose_name_plural = _("periodic tasks")


class DjWorkerState(models.Model):
    hostname = models.CharField(_("hostname"), max_length=255, unique=True)
    last_heartbeat = models.DateTimeField(_("last heartbeat"), null=True, db_index=True)

    class Meta:
        """Model meta-data."""
        app_label = "temp"
        verbose_name = _("worker")
        verbose_name_plural = _("workers")
        get_latest_by = "last_heartbeat"
        ordering = ["-last_heartbeat"]


class DjTaskState(models.Model):
    state = models.CharField(_("state"), max_length=64)
    task_id = models.CharField(_("UUID"), max_length=36, unique=True)
    name = models.CharField(_("name"), max_length=200, null=True, db_index=True,)
    tstamp = models.DateTimeField(_("event received at"), db_index=True)
    args = models.TextField(_("Arguments"), null=True)
    kwargs = models.TextField(_("Keyword arguments"), null=True)
    eta = models.DateTimeField(_("ETA"), null=True)
    expires = models.DateTimeField(_("expires"), null=True)
    result = models.TextField(_("result"), null=True)
    traceback = models.TextField(_("traceback"), null=True)
    runtime = models.FloatField(
        _("execution time"), null=True, help_text=_("in seconds if task succeeded"),
    )
    retries = models.IntegerField(_("number of retries"), default=0)
    worker = models.ForeignKey(
        DjWorkerState, null=True, verbose_name=_("worker"), on_delete=models.CASCADE,
    )
    hidden = models.BooleanField(editable=False, default=False, db_index=True)

    class Meta:
        """Model meta-data."""
        app_label = "temp"
        verbose_name = _("task")
        verbose_name_plural = _("tasks")
        get_latest_by = "tstamp"
        ordering = ["-tstamp"]


def execute(new_table, old_table, tz=None):  # pylint: disable=invalid-name
    if new_table.objects.exists():
        print(f"目标数据库:{new_table._meta.model_name}不为空,跳过迁移该数据库")
        raise CommandError(
            "The target database {} already has data and cannot be migrated".format(
                new_table._meta.model_name
            )
        )
    for old_data in old_table.objects.all():
        new_data = new_table()
        if tz:
            new_data.__setattr__("timezone", tz)
        for field in old_data._meta.fields:
            field_name = field.name
            # 判断是否为外键
            if field_name in ("crontab", "interval", "worker"):
                try:
                    # 写入外键id
                    new_data.__setattr__(
                        field_name + "_id", old_data.__getattribute__(field_name).id
                    )
                except AttributeError:
                    new_data.__setattr__(field_name + "_id", None)
            else:
                new_data.__setattr__(field_name, old_data.__getattribute__(field_name))
        new_data.save()

sbywater avatar Jan 27 '22 19:01 sbywater