Получение ошибки при установке динамического планирования задач с помощью Django-celery-beat

Я настраиваю Dynamic cron на основе создания модели для создания другого объекта модели

Ошибка, которую я получаю

Невозможно присвоить "(<CrontabSchedule: 0 0 27 1 * (m/h/dM/MY/d) UTC>, True)": "PeriodicTask.crontab" должен быть экземпляром "CrontabSchedule"

.

вот мой код, где периодическая задача будет создана путем создания этой модели

import json
from django.utils import timezone
import uuid
from django.db import models
from django_celery_beat.models import CrontabSchedule, PeriodicTask

# Create your models here.


class Rule(models.Model):
    rule_id = models.UUIDField(
         primary_key = False,
         default = uuid.uuid4,
         editable = True)
    name = models.CharField('r=Rule Name',max_length=255,blank=True,null=True)
    compliance_frequency = models.CharField(max_length=40,choices=(('QUARTERLY','QUARTERLY'),
    ('MONTHLY','MONTHLY'),('YEARLY','YEARLY')),default='YEARLY')
    applicable_from = models.DateField(default=timezone.now)
    applicable_to = models.DateField(null=True,blank=True)
    task = models.OneToOneField(
        PeriodicTask,
        on_delete=models.CASCADE,
        null=True,
        blank=True
    )
       
    def setup_task(self):
        self.task = PeriodicTask.objects.create(
            name=self.name,
            task='create_compliance',
            crontab=self.crontab_schedule,
            args=json.dumps([self.id,self.name,self.compliance_frequency]),
            start_time= timezone.now(),
        )
        self.save()

    @property
    def crontab_schedule(self):
        if self.compliance_frequency == "QUARTERLY":
            return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
            day_of_month='27',month_of_year='3,6,9,12')
        if self.compliance_frequency == "MONTHLY":
            return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
            day_of_month='27',month_of_year='*')
        if self.compliance_frequency == 'YEARLY':
            return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
            day_of_month='27',month_of_year='1')

        raise NotImplementedError(
            '''Interval Schedule for {interval} is not added.'''.format(
                interval=self.compliance_frequency.

value))

теперь с помощью signal.py я запускаю setup_task для установки задачи

from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import Rule


@receiver(post_save,sender=Rule)
def create_periodic_task(sender, instance, created, **kwargs):
    if created:
        instance.setup_task()
        instance.task.enabled = True
        instance.task.save()
        print('signal working')

затем с помощью триггера запускаем задачу для создания модели

from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
from celery import shared_task
from pytz import timezone
from test3.models import Compliance

logger = get_task_logger(__name__)


@shared_task
def create_compliance(id,name,compliance_frequency):

    Compliance.objects.create(rule_id=id,name=name,compliance_frequency=compliance_frequency)

    id = Compliance.object.get(id=id)
    logger.info(id.id,'periodic task working bitch')
Вернуться на верх