Celery не выполняет задание вообще / периодически

эту функцию я использовал в разделе управление/команды и она работала, но я хотел бы использовать ее для периодического обновления фидов. например, каждые 15 минут или около того.

celery.py

from celery import Celery
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

app = Celery('mysite',
             broker='redis://',
             backend='rpc://',
             include=['mysite.tasks'],
             timezone='UTC')

if __name__ == '__main__':
    app.start()

tasks.py

from .celery import app
from news.models import Feed, Article
import feedparser
from datetime import datetime, timezone


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(60.0, handle.s(), name='update feeds every 60 seconds')


@app.task
def handle(self, *args, **kwargs):
    feeds = Feed.objects.all()
    for feed in feeds:
        new_list = feedparser.parse(feed.url)
        for entry in new_list.entries:
            start = datetime.now(timezone.utc)
            end = datetime(*(entry.published_parsed[0:6])).replace(tzinfo=timezone.utc)
            if (start - end).days < 2 and not Article.objects.filter(url=entry.link).exists():
                article = Article()
                article.title = entry.title
                article.url = entry.link
                article.description = entry.description
                dateString = end.strftime('%Y-%m-%d %H:%M:%S %z')

                article.publication_date = dateString
                article.save()
            else:
                pass

я запускаю celery -A mysite worker -l INFO и есть [tasks]
. mysite.tasks.handle

я пробовал также celery -A mysite beat
ошибок нет, но я не вижу никакого эффекта на моем сайте

я также попытался выполнить обе команды в двух терминалах.

в каталоге настроек создайте файл для сельдерея

celery_conf.py


import os
from celery import Celery
from datetime import timedelta

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<path to settings like=> config.settings>')

celery_app = Celery('<settings direcory name like=>config>')
celery_app.autodiscover_tasks()

celery_app.conf.broker_url = 'amqp://'
celery_app.conf.result_backend = 'rpc://'

в ваших приложениях, везде, где вам нужен сельдерей, создайте файл tasks.py Создайте свою собственную функцию (в файле tasks.py) и установите декоратор shared_task на function

tasks.py

from celery import shared_task

@shared_task
def send_email():
 .....

Celery использует shared_task для идентификации задач

*** Не забудьте использовать задержку при вызове задачи***

Вернуться на верх