Celery не видит объекты в базе данных

Я знаю, что подобные вопросы уже задавались здесь, но ни один из найденных мною ответов не оказался подходящим. Надеюсь, теперь мне повезет.

Итак, у меня есть приложение Django, использующее Postgres, RabbitMQ и Celery. Когда я запускаю компоненты по отдельности, все работает нормально. Проблема возникает при запуске через docker-compose.

По какой-то причине задачи celery не могут найти объект в базе данных. Объект существует, более того, в нем хранится идентификатор задачи celery (например, для возможной последующей отмены задачи).

Вот части кода, относящиеся к проблеме:

# docker-compose.yml

version: "3.7"


services:
  db:
    image: postgres:12.4
    container_name: "fbrq_db"
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    env_file:
      - .env
    environment:
      - POSTGRES_HOST_AUTH_METHOD=trust
    networks:
      - default

  rabbitmq:
    restart: always
    container_name: "fbrq_rabbit"
    image: rabbitmq:3-management-alpine
    ports:
      - 5672:5672
      - 15672:15672
    networks:
      - default

  app:
    restart: always
    container_name: "fbrq_app"
    build: .
    volumes:
      - .:/code
      - ./static:/code/static
    command: gunicorn --bind 0.0.0.0:8000 fbrq_api.wsgi
    ports:
      - "8000:8000"
    networks:
      - default

  celery:
    restart: always
    container_name: "fbrq_celery"
    build: .
    command: celery -A fbrq_api worker -l info
    env_file:
      - ./.env
    depends_on:
      - app
      - rabbitmq
    environment:
      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
      - DJANGO_SETTINGS_MODULE=fbrq_api.settings
    networks:
      - default

  celery-beat:
    restart: always
    container_name: "fbrq_celery-beat"
    build: .
    command: celery -A fbrq_api beat -l info
    env_file:
      - ./.env
    depends_on:
      - app
      - rabbitmq
    environment:
      - CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672/
    networks:
      - default

volumes:
  postgres_data:
# tasks.py


class MessageTaskManager:
    @staticmethod
    def create_message_celery_task(message):
        message_send_time = get_message_send_time(
            message.mailing, message.client
        )
        task = send_mailing.apply_async(
            args=[message.id], eta=message_send_time
        )
        celery_task_id = task.id
        message.celery_task_id = celery_task_id
        message.save()


@shared_task(bind=True, acks_late=True, name="Mailing")
def send_mailing(self, message_id):
    try:
        message = Message.objects.get(id=message_id)
        message.status = Message.Status.NOT_DELIVERED
        try:
            api_data = {
                "id": message_id,
                "phone": message.client.phone,
                "text": message.mailing.text,
            }
            headers = {
                "Authorization": api_token,
            }
            response = requests.post(
                api_url + str(message_id), json=api_data, headers=headers
            )
            response.raise_for_status()
            message.status = Message.Status.DELIVERED
        except (Timeout, HTTPError, RequestException) as e:
            MessageTaskManager.create_new_message_and_send(message)
        except Exception as e:
            MessageTaskManager.create_new_message_and_send(message)
        finally:
            message.save()

    except Message.DoesNotExist as e:
        logger.error(f"Message_{message_id} doesn't exist yet")
        self.retry(exc=e, countdown=10)
# views.py


def get_mailing_messages(mailing, created=False):
    if created:
        clients = [
            client
            for client in mailing.get_filtered_clients()
            if get_message_send_time(mailing, client)
        ]
        mailing.set_clients_count(clients)
        return list(
            Message.objects.create(mailing=mailing, client=client)
            for client in clients
        )
    return [message for message in Message.objects.filter(mailing=mailing)]


class MailingViewSet(viewsets.ModelViewSet):
    queryset = Mailing.objects.all()
    serializer_class = MailingSerializer


    def create(self, request, *args, **kwargs):
        logger.info(f"Request from API to create mailing: {request.data}")
        serializer = self.get_serializer(data=request.data)
        try:
            serializer.is_valid(raise_exception=True)
            self.perform_create(serializer)
            headers = self.get_success_headers(serializer.data)
            mailing_id = serializer.data["id"]
            mailing = Mailing.objects.get(id=mailing_id)
            messages = get_mailing_messages(mailing, created=True)
            for message in messages:
                transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))
                # message_task_manager.create_message_celery_task(message)
            return Response(
                serializer.data,
                status=status.HTTP_201_CREATED,
                headers=headers,
            )
        except serializers.ValidationError as e:
            logger.error(f"Failed to validate data. Errors: {e.detail}")
            return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)

Вы видите, что изначально я просто вызвал метод создания задачи celery:

for message in messages:
    message_task_manager.create_message_celery_task(message)

Но после неудачи и небольшого изучения проблемы я изменил эту часть кода:

for message in messages:
    transaction.on_commit(partial(message_task_manager.create_message_celery_task, message=message))

Сообщения, которые я получаю, выглядят следующим образом:

[2024-02-10 08:34:02,930: ERROR/ForkPoolWorker-8] Message_1207 doesn't exist yet
[2024-02-10 08:34:02,932: INFO/MainProcess] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] received
[2024-02-10 08:34:02,935: INFO/ForkPoolWorker-8] Task Mailing[08226fa1-aa4a-4ba3-adb3-da5125295cc7] retry: Retry in 10s: DoesNotExist('Message matching query does not exist.')

Жульничество, как time.sleep(), тоже не помогает.

Вернуться на верх