Как бороться с повторяющимися датами в Django?
В настоящее время я разрабатываю веб-сайт тренажерного зала и использую django-recurrence для обработки повторяющихся тренировок.
Однако я не уверен, как работать с датами повторения в наборе запросов Django. В частности, я хочу отсортировать тренинги по следующей дате и отфильтровать тренинги по определенной дате, например, чтобы показать сеансы, которые происходят в выбранный день. Каков наилучший способ добиться этого с помощью django-recurrence?
Должен ли я извлекать следующее событие вручную и сохранять его в модели, или есть более эффективный подход для фильтрации и сортировки повторяющихся событий? Я был бы признателен за любые советы или примеры!
class Training(TimestampModel):
template = models.ForeignKey(TrainingTemplate, on_delete=models.CASCADE, related_name='trainings')
start_time = models.TimeField()
end_time = models.TimeField()
location = models.ForeignKey(TrainingLocation, on_delete=models.PROTECT, related_name='trainings')
recurrences = RecurrenceField()
members = models.ManyToManyField(Member, through=MemberTraining, related_name='trainings', blank=True)
objects = TrainingManager()
У меня есть несколько идей, как с этим справиться, но все они кажутся мне чересчур сложными.
- Добавьте поле next_training_date в модель обучения и используйте задачу Celery ETA для обновления этого поля каждый раз по окончании обучения.
Например, когда администратор создает новое обучение, при сохранении вычисляется значение next_training_date. Затем создается запланированное задание Celery (с указанием времени прибытия) для повторного обновления этой даты после окончания текущего сеанса.
Хотя это может сработать, кажется, что просто поддержка фильтрации и сортировки по датам повторения требует слишком больших затрат.
Что бы вы порекомендовали в качестве простого и эффективного способа обработки повторяющихся событий и фильтрации наборов запросов с помощью django-recurrence? Есть ли лучший шаблон или подход для обработки повторяющихся событий в Django?
Спустя некоторое время я пришел к рабочему решению. Я решил использовать Celery task для вычисления даты следующего появления, когда обучение пройдет.
Будущие улучшения могут касаться расписания celery-beat, чтобы убедиться, что все тренировки были обновлены в конце дня.
Я не могу утверждать, что это лучшее решение, но если у кого-то есть больше опыта работы с подобным типом файлов, я был бы признателен за любые изменения или улучшения.
models.py
class Training(TimestampModel):
template = models.ForeignKey(TrainingTemplate, on_delete=models.CASCADE, related_name='trainings')
start_time = models.TimeField()
end_time = models.TimeField()
location = models.ForeignKey(TrainingLocation, on_delete=models.PROTECT, related_name='trainings')
recurrences = RecurrenceField()
next_occurrence = models.DateTimeField(null=True, editable=False)
next_occurrence_celery_task_id = models.UUIDField(null=True, editable=False)
members = models.ManyToManyField(Member, through=MemberTraining, related_name='trainings', blank=True)
objects = TrainingManager()
class Meta:
ordering = ['next_occurrence', 'start_time']
indexes = [
models.Index(fields=['next_occurrence', 'start_time']),
]
def save(self, *args, **kwargs):
self.next_occurrence = self.calculate_next_occurrence()
super().save(*args, **kwargs)
def calculate_next_occurrence(self) -> datetime.datetime | None:
try:
now = timezone.now()
today = timezone.make_naive(now).replace(hour=0, minute=0, second=0, microsecond=0)
next_date = self.recurrences.after(today, inc=True)
combined = datetime.datetime.combine(next_date.date(), self.start_time)
aware_dt = timezone.make_aware(combined)
if aware_dt > now:
return aware_dt
next_date = self.recurrences.after(today + timedelta(days=1), inc=True)
return timezone.make_aware(datetime.datetime.combine(next_date.date(), self.start_time))
except AttributeError:
return None
signals.py
@receiver(post_save, sender=Training)
def calculate_next_occurrence(sender, instance, *args, **kwargs):
if instance.next_occurrence:
if instance.next_occurrence_celery_task_id:
AsyncResult(str(instance.next_occurrence_celery_task_id)).revoke()
result = calculate_next_training_occurrence.apply_async(
args=(instance.id, ),
eta=instance.next_occurrence,
retry=False,
)
Training.objects.filter(id=instance.id).update(next_occurrence_celery_task_id=result.id)
tasks.py
@shared_task
def calculate_next_training_occurrence(training_id: int):
try:
training = Training.objects.get(id=training_id)
training.next_occurrence = training.calculate_next_occurrence()
training.save(update_fields=['next_occurrence'])
except ObjectDoesNotExist:
pass