Как создавать задачи Celery во время выполнения в Django?

У меня есть приложение, в котором пользователь может ввести временной график и URL веб-сайта. Затем я получаю весь HTML-код с этого сайта и ввожу его в БД.

После добавления Celery будет периодически проверять сайт, загружая код и проверяя изменения каждые N минут/час/день.

Мой celery.py.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'derp.settings')
app = Celery('derp')
app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Затем у меня есть мой файл tasks.py.

channel_layer = get_channel_layer()
app = Celery('derp')

def website_links(html_path: str = ""):
    ##Get website text and A tags
    return (text, links)

    @app.task
def watch_links_and_text(url):
    text, links = website_links(url) # TODO Check status code if 404 but in DB mark url_active = False
    object, created = URLWatcher.objects.get_or_create(url=url)
    if created: ## True if website not found = object is created & not found
        for index, line in enumerate(text):
            line = LineWatcher.website.add_line(line=line, line_number=index, fk_url=object)
            line.save()
        return
    else:
        website_obj = object.fk_url
        website_obj.get()
        for index, line in enumerate(text):
            obj, cre = LineWatcher.objects.get(line=line, hash=hash, line_number=index)

Итак, я получаю этот объект dat из запроса:

{"cron":{"name":"task name","url":"https://randomsite.com/news","min":"10","hour":"0","day":"0"}}

Затем я разбираю данные и пытаюсь создать задачу следующим образом:

schedule, _ = CrontabSchedule.objects.get_or_create(minute=minu, hour=hour, day_of_week=day_of_week, day_of_month="*", month_of_year="*")
            task = PeriodicTask.objects.create(crontab=schedule, name=name, task="watcher.tasks.watch_links_and_text", args=json.dumps([url]))
            task.save()

Я вижу задачи в http://127.0.0.1:8000/admin/django_celery_beat/periodictask/ они просто не выполняются, когда я смотрю на Celery или Celery worker. Когда я смотрю на TaskResults, там пусто, хотя задача имеет правильные аргументы и данные в БД.

Как я могу добавить эти задачи, чтобы они могли быть добавлены и запущены во время выполнения?

Где я ошибся?

Спасибо.

Спасибо.

Вернуться на верх