Получение ошибки при установке динамического планирования задач с помощью Django-celery-beat
Я настраиваю Dynamic cron на основе создания модели для создания другого объекта модели
Ошибка, которую я получаю
Невозможно присвоить "(<CrontabSchedule: 0 0 27 1 * (m/h/dM/MY/d) UTC>, True)": "PeriodicTask.crontab" должен быть экземпляром "CrontabSchedule"
.
вот мой код, где периодическая задача будет создана путем создания этой модели
import json
from django.utils import timezone
import uuid
from django.db import models
from django_celery_beat.models import CrontabSchedule, PeriodicTask
# Create your models here.
class Rule(models.Model):
rule_id = models.UUIDField(
primary_key = False,
default = uuid.uuid4,
editable = True)
name = models.CharField('r=Rule Name',max_length=255,blank=True,null=True)
compliance_frequency = models.CharField(max_length=40,choices=(('QUARTERLY','QUARTERLY'),
('MONTHLY','MONTHLY'),('YEARLY','YEARLY')),default='YEARLY')
applicable_from = models.DateField(default=timezone.now)
applicable_to = models.DateField(null=True,blank=True)
task = models.OneToOneField(
PeriodicTask,
on_delete=models.CASCADE,
null=True,
blank=True
)
def setup_task(self):
self.task = PeriodicTask.objects.create(
name=self.name,
task='create_compliance',
crontab=self.crontab_schedule,
args=json.dumps([self.id,self.name,self.compliance_frequency]),
start_time= timezone.now(),
)
self.save()
@property
def crontab_schedule(self):
if self.compliance_frequency == "QUARTERLY":
return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
day_of_month='27',month_of_year='3,6,9,12')
if self.compliance_frequency == "MONTHLY":
return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
day_of_month='27',month_of_year='*')
if self.compliance_frequency == 'YEARLY':
return CrontabSchedule.objects.get_or_create(minute='0',hour='0',day_of_week='*',
day_of_month='27',month_of_year='1')
raise NotImplementedError(
'''Interval Schedule for {interval} is not added.'''.format(
interval=self.compliance_frequency.
value))
теперь с помощью signal.py я запускаю setup_task для установки задачи
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import Rule
@receiver(post_save,sender=Rule)
def create_periodic_task(sender, instance, created, **kwargs):
if created:
instance.setup_task()
instance.task.enabled = True
instance.task.save()
print('signal working')
затем с помощью триггера запускаем задачу для создания модели
from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
from celery import shared_task
from pytz import timezone
from test3.models import Compliance
logger = get_task_logger(__name__)
@shared_task
def create_compliance(id,name,compliance_frequency):
Compliance.objects.create(rule_id=id,name=name,compliance_frequency=compliance_frequency)
id = Compliance.object.get(id=id)
logger.info(id.id,'periodic task working bitch')