Как вернуть журналы в контейнер Airflow из удаленно вызванных, Dockerized celery workers

Я работаю над Dockerized Python/Django проектом, включающим контейнер для Celery workers, в который я интегрировал готовые docker контейнеры airflow.

У меня Airflow успешно запускает задачи celery в предварительно созданном контейнере, инстанцируя приложение Celery с указанным брокером redis и back end, и делая удаленный вызов через send_task; однако ни один из логов, выполняемых задачей celery, не попадает обратно в журналы Airflow.

Изначально, в качестве доказательства концепции, поскольку я совсем новичок в Airflow, я настроил его на выполнение одного и того же кода, открыв его для контейнеров Airflow и создав задачи airflow для его выполнения на рабочем контейнере airflow celery. Это привело к тому, что все логи были захвачены, но это определенно не тот способ, который мы хотим, так как это делает контейнеры airflow очень толстыми из-за повторения зависимостей из проекта django.

В документации сказано: "Большинство обработчиков задач отправляют журналы по завершении задачи", но я не смог найти более подробную информацию, которая могла бы дать мне подсказку, как включить то же самое в моей ситуации.

Есть ли способ получить эти журналы обратно в airflow при удаленном запуске задач celery?

Вместо того, чтобы "возвращать журналы в Airflow", легко реализуемой альтернативой (поскольку Airflow поддерживает ее изначально) является активация удаленного протоколирования. Таким образом, все логи от всех рабочих будут попадать, например, на S3, а веб-сервер будет автоматически получать их.

Ниже показано, как настроить удаленное протоколирование с использованием бэкенда S3. Другие варианты (например, Google Cloud Storage, Elastic) могут быть реализованы аналогично.

  1. Установите remote_logging на True в airflow.cfg
  2. Создайте URI соединения Airflow. Этот пример из официальной документации особенно полезен IMO. В итоге должно получиться что-то вроде:
aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/? 
endpoint_url=http%3A%2F%2Fs3%3A4566%2F

При необходимости можно также создать коннектино через графический интерфейс веб-сервера.

  1. Сделайте URI соединения доступным для Airflow. Один из способов сделать это - убедиться, что переменная окружения AIRFLOW_CONN_{YOUR_CONNECTION_NAME} доступна. Пример для имени соединения REMOTE_LOGS_S3:
export AIRFLOW_CONN_REMOTE_LOGS_S3=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?endpoint_url=http%3A%2F%2Fs3%3A4566%2F
  1. Установите remote_log_conn_id на имя соединения (например, REMOTE_LOGS_S3) в airflow.cfg
  2. .
  3. Установите remote_base_log_folder в airflow.cfg на желаемое ведро/префикс. Пример:
remote_base_log_folder = s3://my_bucket_name/my/prefix

Эта связанная СЦ глубже затрагивает тему удаленного протоколирования.

Если требуется отладка, просмотр любых журналов рабочего процесса локально (т.е. внутри рабочего процесса) должен помочь.

Вернуться на верх