Django Celery с помощью prefork workers нарушает показатели OpenTelemetry

У меня есть приложение на Django, которое я хотел бы использовать с OpenTelemetry для отслеживания и метрик. Я создал файл otel_config.py рядом с моим manage.py с таким содержимым:

# resources
def get_default_service_instance_id():
    try:
        hostname = socket.gethostname() or "unknown-host"
    except Exception as e:
        hostname = "unknown-host"
    try:
        process_id = os.getpid()
    except Exception as e:
        process_id = "unknown-pid"

    return f"{hostname}-{process_id}"

service_name = "my-service"
otlp_endpoint = "http://otel-collector:4318"
service_instance_id = get_default_service_instance_id()

resource = Resource.create(
    {
        SERVICE_NAME: service_name,
        SERVICE_INSTANCE_ID: service_instance_id,
    }
)

# traces
otlp_endpoint_traces = urljoin(otlp_endpoint, "/v1/traces")
trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint_traces)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
trace.get_tracer_provider().add_span_processor(span_processor)

# metrics
otlp_endpoint_metrics = urljoin(otlp_endpoint, "/v1/metrics")
metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint_metrics)
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

# instrument
DjangoInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
CeleryInstrumentor().instrument()

Затем я просто импортировал его в конец моего settings.py файла, как показано ниже:

import otel_config

Хотя мои трассировки и метрики в большинстве случаев работают нормально, мои метрики OpenTelemetry не работают в случае celery workers в режиме предварительной обработки.
В случае с обработчиками prefork дочерние процессы получают то же значение SERVICE_INSTANCE_ID, что и родительский процесс. Таким образом, разные дочерние процессы манипулируют одним и тем же значением показателя, поскольку каждый из них имеет свою собственную эксклюзивную память. Таким образом, значение в моем коллекторе меняется очень часто и является неправильным, поскольку оно не является значением агрегации всех дочерних процессов. В случае с моими рабочими потоками это не одно и то же (--pool=threads), поскольку все разные потоки одного и того же рабочего процесса используют один и тот же экземпляр метрики в памяти, и конечное значение в сборщике всегда является правильным значением агрегации.

<время работы/>

Я перепробовал несколько решений, но у меня есть и другие недостатки, о которых я расскажу ниже.

  1. С использованием worker_init и worker_process_init сигналов

здесь я удалил import otel_config из settings.py и добавил его в worker_init и worker_process_init обработчики сигналов, как показано ниже:

from celery.signals import worker_init, worker_process_init
@worker_init
def worker_init_handler(sender, **kwargs):
    pool_cls = str(sender.pool_cls) if hasattr(sender, 'pool_cls') else None

    if "threads" in pool_cls: # only if --pool=threads
        import otel_config

@worker_process_init.connect
def worker_process_init_handler(**kwargs):
    import otel_config

Мне пришлось оставить worker_init_handler, так как у меня также есть рабочие потоки, а в рабочих потоках сигнал worker_process_init не отправляется. Однако мне также нужны были инструменты и показатели в моих рабочих потоках.
Мне также пришлось добавить import otel_config в мой файл wsgi.py для моих трассировок и показателей Django API.

Однако при таком подходе я теряю метрики, отправленные родительским процессом. У меня есть обработчик сигналов task_received, который экспортирует некоторые метрики, и этот сигнал обрабатывается только в родительском процессе.
Я также не уверен, что это хорошая практика - использовать OpenTelemetry в нескольких местах, а не только в одном (т.е. settings.py).

  1. Переинструментация в случае дочерних процессов prefork

При таком подходе я попытался повторно импортировать все в случае инициализации дочернего процесса:

@worker_process_init.connect
def worker_process_init_handler(**kwargs):
    import import otel_config

Однако Python Otel работает на одноэлементной основе. это означает, что однократная настройка параметров MeterProvider и TracerProvider и настройка инструментов (например, DjangoInstrumentor().instrument()) приводит к блокировке следующих настроек и инструментов. Таким образом, повторный импорт в worker_process_init_handler здесь вообще ничего не даст. Я даже пробовал другие способы взлома, например, используя следующее в случае worker_process_init_handler (дочерних процессов):

# trace.set_tracer_provider(tracer_provider)  # this one is singleton so commented
trace._TRACER_PROVIDER = tracer_provider

...

# metrics.set_meter_provider(meter_provider)  # this one is singleton so commented
metrics._internal._METER_PROVIDER = meter_provider

Но этот подход слишком сложен и требует других взломов инструментов и использования get_meter в кодовой базе.

<время работы/>

В общем, я хотел бы знать, был ли у кого-нибудь подобный опыт и какова практика в подобных вопросах.

Да, это распространенная проблема в режиме предварительной обработки, поскольку дочерние процессы копируют родительскую память, включая SERVICE_INSTANCE_ID. Лучший способ - инициализировать OpenTelemetry отдельно внутри каждого дочернего процесса, используя worker_process_init, и использовать worker_init для рабочих потоков. Вот простой способ сделать это:

<время работы/>

otel_config.py

import os, socket
from urllib.parse import urljoin
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_INSTANCE_ID
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import OTLPSpanExporter, BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import OTLPMetricExporter, PeriodicExportingMetricReader
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor

def generate_instance_id():
    hostname = socket.gethostname()
    pid = os.getpid()
    return f"{hostname}-{pid}"

def init_otel(service_name="my-service"):
    resource = Resource.create({
        SERVICE_NAME: service_name,
        SERVICE_INSTANCE_ID: generate_instance_id(),
    })

    trace_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4318/v1/traces")
    tracer_provider = TracerProvider(resource=resource)
    tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
    trace.set_tracer_provider(tracer_provider)

    metric_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4318/v1/metrics")
    metric_reader = PeriodicExportingMetricReader(metric_exporter)
    meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
    metrics.set_meter_provider(meter_provider)

    DjangoInstrumentor().instrument()
    Psycopg2Instrumentor().instrument()
    CeleryInstrumentor().instrument()
<время работы/>

settings.py

if os.getenv("RUN_MAIN") == "true":
    import otel_config
    otel_config.init_otel(service_name="my-django-app")
<время работы/>

wsgi.py

import otel_config
otel_config.init_otel(service_name="my-django-app")
<время работы/>

celery.py

from celery.signals import worker_init, worker_process_init

@worker_process_init.connect
def setup_prefork_worker(**kwargs):
    import otel_config
    otel_config.init_otel(service_name="my-celery-worker")

@worker_init.connect
def setup_thread_worker(sender=None, **kwargs):
    pool = str(sender.pool_cls) if hasattr(sender, "pool_cls") else ""
    if "threads" in pool:
        import otel_config
        otel_config.init_otel(service_name="my-celery-worker")
Вернуться на верх