Celery не видит объекты в базе данных
Я знаю, что подобные вопросы уже задавались здесь, но ни один из найденных мною ответов не оказался подходящим. Надеюсь, теперь мне повезет.
Итак, у меня есть приложение Django, использующее Postgres, RabbitMQ и Celery. Когда я запускаю компоненты по отдельности, все работает нормально. Проблема возникает при запуске через docker-compose.
По какой-то причине задачи celery не могут найти объект в базе данных. Объект существует, более того, в нем хранится идентификатор задачи celery (например, для возможной последующей отмены задачи).
Вот части кода, относящиеся к проблеме:
# docker-compose.yml
version: "3.7"
services:
db:
image: postgres:12.4
container_name: "fbrq_db"
volumes:
- postgres_data:/var/lib/postgresql/data/
env_file:
- .env
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
networks:
- default
rabbitmq:
restart: always
container_name: "fbrq_rabbit"
image: rabbitmq:3-management-alpine
ports:
- 5672:5672
- 15672:15672
networks:
- default
app:
restart: always
container_name: "fbrq_app"
build: .
volumes:
- .:/code
- ./static:/code/static
command: gunicorn --bind 0.0.0.0:8000 fbrq_api.wsgi
ports:
- "8000:8000"
networks:
- default
celery:
restart: always
container_name: "fbrq_celery"
build: .
command: celery -A fbrq_api worker -l info
env_file:
- ./.env
depends_on:
- app
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
- DJANGO_SETTINGS_MODULE=fbrq_api.settings
networks:
- default
celery-beat:
restart: always
container_name: "fbrq_celery-beat"
build: .
command: celery -A fbrq_api beat -l info
env_file:
- ./.env
depends_on:
- app
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
networks:
- default
volumes:
postgres_data:
# tasks.py
class MessageTaskManager:
@staticmethod
def create_message_celery_task(message):
message_send_time = get_message_send_time(
message.mailing, message.client
)
task = send_mailing.apply_async(
args=[message.id], eta=message_send_time
)
celery_task_id = task.id
message.celery_task_id = celery_task_id
message.save()
@shared_task(bind=True, acks_late=True, name="Mailing")
def send_mailing(self, message_id):
try:
message = Message.objects.get(id=message_id)
message.status = Message.Status.NOT_DELIVERED
try:
api_data = {
"id": message_id,
"phone": message.client.phone,
"text": message.mailing.text,
}
headers = {
"Authorization": api_token,
}
response = requests.post(
api_url + str(message_id), json=api_data, headers=headers
)
response.raise_for_status()
message.status = Message.Status.DELIVERED
except (Timeout, HTTPError, RequestException) as e:
MessageTaskManager.create_new_message_and_send(message)
except Exception as e:
MessageTaskManager.create_new_message_and_send(message)
finally:
message.save()
except Message.DoesNotExist as e:
logger.error(f"Message_{message_id} doesn't exist yet")
self.retry(exc=e, countdown=10)
# views.py
def get_mailing_messages(mailing, created=False):
if created:
clients = [
client
for client in mailing.get_filtered_clients()
if get_message_send_time(mailing, client)
]
mailing.set_clients_count(clients)
return list(
Message.objects.create(mailing=mailing, client=client)
for client in clients
)
return [message for message in Message.objects.filter(mailing=mailing)]
class MailingViewSet(viewsets.ModelViewSet):
queryset = Mailing.objects.all()
serializer_class = MailingSerializer
def create(self, request, *args, **kwargs):
logger.info(f"Request from API to create mailing: {request.data}")
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
mailing_id = serializer.data["id"]
mailing = Mailing.objects.get(id=mailing_id)
messages = get_mailing_messages(mailing, created=True)
for message in messages:
transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
# message_task_manager.create_message_celery_task(message)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers=headers,
)
except serializers.ValidationError as e:
logger.error(f"Failed to validate data. Errors: {e.detail}")
return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)
Вы видите, что изначально я просто вызвал метод создания задачи celery:
for message in messages:
message_task_manager.create_message_celery_task(message)
Но после неудачи и небольшого изучения проблемы я изменил эту часть кода:
for message in messages:
transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
Сообщения, которые я получаю, выглядят следующим образом:
[2024-02-10 08:34:02,930: ERROR/ForkPoolWorker-8] Message_1207 doesn't exist yet
[2024-02-10 08:34:02,932: INFO/MainProcess] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] received
[2024-02-10 08:34:02,935: INFO/ForkPoolWorker-8] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] retry: Retry in 10s: DoesNotExist('Message matching query does not exist.')
Жульничество, как time.sleep()
, тоже не помогает.