Django Celery with prefork workers breaks OpenTelemetry metrics

I have a Django application I wanted to instrument with OpenTelemetry for traces and metrics. I created an otel_config.py file next to my manage.py with this content:

# resources
def get_default_service_instance_id():
    try:
        hostname = socket.gethostname() or "unknown-host"
    except Exception as e:
        hostname = "unknown-host"
    try:
        process_id = os.getpid()
    except Exception as e:
        process_id = "unknown-pid"

    return f"{hostname}-{process_id}"

service_name = "my-service"
otlp_endpoint = "http://otel-collector:4318"
service_instance_id = get_default_service_instance_id()

resource = Resource.create(
    {
        SERVICE_NAME: service_name,
        SERVICE_INSTANCE_ID: service_instance_id,
    }
)

# traces
otlp_endpoint_traces = urljoin(otlp_endpoint, "/v1/traces")
trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint_traces)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
trace.get_tracer_provider().add_span_processor(span_processor)

# metrics
otlp_endpoint_metrics = urljoin(otlp_endpoint, "/v1/metrics")
metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint_metrics)
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

# instrument
DjangoInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
CeleryInstrumentor().instrument()

Then, I simply imported it at the end of my settings.py file like below:

import otel_config

Although my traces and metrics work fine in most cases, my OpenTelemetry metrics are broken in the case of celery workers with prefork mode.
In case of prefork workers, the child processes happen to get the same SERVICE_INSTANCE_ID as the parent process. Therefore, different child processes manipulate the same metric value as each has its own exclusive memory. Thus, the value in my collector gets changed very often and is wrong since it's not the aggregation value of all the child processes. It's not the same in the case of my thread workers (--pool=threads) since all different threads of the same worker use the same metric instance in memory, and the final value in the collector is always the correct aggregation value.


I've tried multiple solutions, but I get other drawbacks that I'll explain hereafter.

  1. Using worker_init and worker_process_init signals

here, I removed import otel_config from settings.py and added it in worker_init and worker_process_init signal handlers like below:

from celery.signals import worker_init, worker_process_init
@worker_init
def worker_init_handler(sender, **kwargs):
    pool_cls = str(sender.pool_cls) if hasattr(sender, 'pool_cls') else None

    if "threads" in pool_cls: # only if --pool=threads
        import otel_config

@worker_process_init.connect
def worker_process_init_handler(**kwargs):
    import otel_config

I had to keep worker_init_handler as I also have thread workers, and in threads workers, no worker_process_init signal is sent. However, I needed the instrumentation and metrics in my thread workers as well.
I also had to add import otel_config to my wsgi.py file for my Django API traces and metrics.

However, in this approach, I lose the metrics sent by the parent process. I have a task_received signal handler that exports some metrics, and this signal is handled only in the parent process.
I'm also not sure if it's a good practice to instrument OpenTelemetry in multiple places instead of only one place (i.e. settings.py).

  1. Reinstrumentation in case of prefork child processes

In this approach, I tried to re-import everything in case of the child process initialization:

@worker_process_init.connect
def worker_process_init_handler(**kwargs):
    import import otel_config

However, Python Otel works on a singleton basis. meaning setting the MeterProvider and TracerProvider and instrumenting (e.g. DjangoInstrumentor().instrument()) once, results in blockage of next settings and instrumentations. So, this reimporting in worker_process_init_handler here wouldn't do anything at all. I even tried other hacks like using the following in case of worker_process_init_handler (child processes):

# trace.set_tracer_provider(tracer_provider)  # this one is singleton so commented
trace._TRACER_PROVIDER = tracer_provider

...

# metrics.set_meter_provider(meter_provider)  # this one is singleton so commented
metrics._internal._METER_PROVIDER = meter_provider

But this approach is too hacky and requires other hacks on the Instrumentors and get_meter usage in the codebase.


All in all, I'd like to know if anyone had any similar experience and what the practice would be in such issues.

Вернуться на верх