How to create celery periodic task in django rest framework Views?

I use django rest framework.

I want the celery task to execute an operation once on the date that the user enters...

this is my celery conf:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Bimsanj.settings')

celery_app = Celery('Bimsanj')

celery_app.conf.task_queues = (
    Queue('default', routing_key='default'),
    Queue('reminder', routing_key='reminder'),
)

celery_app.conf.task_default_queue = 'default'

celery_app.conf.task_routes = {
    'send_reminder_message_task' : {'queue' : 'reminder'},
}

# Load task modules from all registered Django apps.
celery_app.autodiscover_tasks()

celery_app.conf.broker_url = BROKER_URL
celery_app.conf.result_backend = RESULT_BACKEND

and this is my View:

class InsuranceReminderView(GenericViewSet, CreateModelMixin):
    serializer_class = InsuranceReminderSerializer
    model = InsuranceReminder

    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        headers = self.get_success_headers(serializer.data)
        clocked = ClockedSchedule.objects.create(serializer.validated_data['due_date'])
        PeriodicTask.objects.create(
            clocked=clocked,
            name=str(uuid4()),
            one_off=True,
            task="apps.insurance.tasks.send_message_insurance_reminder",
            args=([serializer.validated_data['title'],
                    serializer.validated_data['mobile'],
                    serializer.validated_data['due_date']])
        )
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

and this is my task:

logger = get_task_logger(__name__)

@celery_app.task(name="apps.insurance.tasks.send_message_insurance_reminder")
def send_message_insurance_reminder(title, mobile, due_date):
    logger.info('send message insurance reminder')
    # return send_reminder_message(title, mobile, due_date)
    return f'{title} _ {mobile} _ {due_date}'

What is the correct way to create a periodic task and execute the task at the date that the user enters?

Thanks.

Back to Top