Celery не выполняет задание вообще / периодически
эту функцию я использовал в разделе управление/команды и она работала, но я хотел бы использовать ее для периодического обновления фидов. например, каждые 15 минут или около того.
celery.py
from celery import Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery('mysite',
broker='redis://',
backend='rpc://',
include=['mysite.tasks'],
timezone='UTC')
if __name__ == '__main__':
app.start()
tasks.py
from .celery import app
from news.models import Feed, Article
import feedparser
from datetime import datetime, timezone
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(60.0, handle.s(), name='update feeds every 60 seconds')
@app.task
def handle(self, *args, **kwargs):
feeds = Feed.objects.all()
for feed in feeds:
new_list = feedparser.parse(feed.url)
for entry in new_list.entries:
start = datetime.now(timezone.utc)
end = datetime(*(entry.published_parsed[0:6])).replace(tzinfo=timezone.utc)
if (start - end).days < 2 and not Article.objects.filter(url=entry.link).exists():
article = Article()
article.title = entry.title
article.url = entry.link
article.description = entry.description
dateString = end.strftime('%Y-%m-%d %H:%M:%S %z')
article.publication_date = dateString
article.save()
else:
pass
я запускаю celery -A mysite worker -l INFO
и есть
[tasks]
. mysite.tasks.handle
я пробовал также celery -A mysite beat
ошибок нет, но я не вижу никакого эффекта на моем сайте
я также попытался выполнить обе команды в двух терминалах.
в каталоге настроек создайте файл для сельдерея
celery_conf.py
import os
from celery import Celery
from datetime import timedelta
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<path to settings like=> config.settings>')
celery_app = Celery('<settings direcory name like=>config>')
celery_app.autodiscover_tasks()
celery_app.conf.broker_url = 'amqp://'
celery_app.conf.result_backend = 'rpc://'
в ваших приложениях, везде, где вам нужен сельдерей, создайте файл tasks.py Создайте свою собственную функцию (в файле tasks.py) и установите декоратор shared_task на function
tasks.py
from celery import shared_task
@shared_task
def send_email():
.....
Celery использует shared_task для идентификации задач
*** Не забудьте использовать задержку при вызове задачи***