Django Celery with prefork workers breaks OpenTelemetry metrics
I have a Django application I wanted to instrument with OpenTelemetry for traces and metrics. I created an otel_config.py
file next to my manage.py
with this content:
# resources
def get_default_service_instance_id():
try:
hostname = socket.gethostname() or "unknown-host"
except Exception as e:
hostname = "unknown-host"
try:
process_id = os.getpid()
except Exception as e:
process_id = "unknown-pid"
return f"{hostname}-{process_id}"
service_name = "my-service"
otlp_endpoint = "http://otel-collector:4318"
service_instance_id = get_default_service_instance_id()
resource = Resource.create(
{
SERVICE_NAME: service_name,
SERVICE_INSTANCE_ID: service_instance_id,
}
)
# traces
otlp_endpoint_traces = urljoin(otlp_endpoint, "/v1/traces")
trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint_traces)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
trace.get_tracer_provider().add_span_processor(span_processor)
# metrics
otlp_endpoint_metrics = urljoin(otlp_endpoint, "/v1/metrics")
metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint_metrics)
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# instrument
DjangoInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
CeleryInstrumentor().instrument()
Then, I simply imported it at the end of my settings.py
file like below:
import otel_config
Although my traces and metrics work fine in most cases, my OpenTelemetry metrics are broken in the case of celery workers with prefork mode.
In case of prefork workers, the child processes happen to get the same SERVICE_INSTANCE_ID
as the parent process. Therefore, different child processes manipulate the same metric value as each has its own exclusive memory. Thus, the value in my collector gets changed very often and is wrong since it's not the aggregation value of all the child processes.
It's not the same in the case of my thread workers (--pool=threads
) since all different threads of the same worker use the same metric instance in memory, and the final value in the collector is always the correct aggregation value.
I've tried multiple solutions, but I get other drawbacks that I'll explain hereafter.
- Using
worker_init
andworker_process_init
signals
here, I removed import otel_config
from settings.py
and added it in worker_init
and worker_process_init
signal handlers like below:
from celery.signals import worker_init, worker_process_init
@worker_init
def worker_init_handler(sender, **kwargs):
pool_cls = str(sender.pool_cls) if hasattr(sender, 'pool_cls') else None
if "threads" in pool_cls: # only if --pool=threads
import otel_config
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
import otel_config
I had to keep worker_init_handler
as I also have thread workers, and in threads workers, no worker_process_init
signal is sent. However, I needed the instrumentation and metrics in my thread workers as well.
I also had to add import otel_config
to my wsgi.py
file for my Django API traces and metrics.
However, in this approach, I lose the metrics sent by the parent process. I have a task_received
signal handler that exports some metrics, and this signal is handled only in the parent process.
I'm also not sure if it's a good practice to instrument OpenTelemetry in multiple places instead of only one place (i.e. settings.py
).
- Reinstrumentation in case of prefork child processes
In this approach, I tried to re-import everything in case of the child process initialization:
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
import import otel_config
However, Python Otel works on a singleton basis. meaning setting the MeterProvider
and TracerProvider
and instrumenting (e.g. DjangoInstrumentor().instrument()
) once, results in blockage of next settings and instrumentations. So, this reimporting in worker_process_init_handler
here wouldn't do anything at all. I even tried other hacks like using the following in case of worker_process_init_handler
(child processes):
# trace.set_tracer_provider(tracer_provider) # this one is singleton so commented
trace._TRACER_PROVIDER = tracer_provider
...
# metrics.set_meter_provider(meter_provider) # this one is singleton so commented
metrics._internal._METER_PROVIDER = meter_provider
But this approach is too hacky and requires other hacks on the Instrumentors and get_meter
usage in the codebase.
All in all, I'd like to know if anyone had any similar experience and what the practice would be in such issues.
Yeah, this is a common issue with prefork mode since child processes copy the parent memory, including the SERVICE_INSTANCE_ID
. The best way is to initialize OpenTelemetry separately inside each child process using worker_process_init
, and use worker_init
for thread workers. Here’s a clean way to do it:
otel_config.py
import os, socket
from urllib.parse import urljoin
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_INSTANCE_ID
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import OTLPSpanExporter, BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import OTLPMetricExporter, PeriodicExportingMetricReader
from opentelemetry.instrumentation.django import DjangoInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.instrumentation.celery import CeleryInstrumentor
def generate_instance_id():
hostname = socket.gethostname()
pid = os.getpid()
return f"{hostname}-{pid}"
def init_otel(service_name="my-service"):
resource = Resource.create({
SERVICE_NAME: service_name,
SERVICE_INSTANCE_ID: generate_instance_id(),
})
trace_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4318/v1/traces")
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
metric_exporter = OTLPMetricExporter(endpoint="http://otel-collector:4318/v1/metrics")
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
DjangoInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
CeleryInstrumentor().instrument()
settings.py
if os.getenv("RUN_MAIN") == "true":
import otel_config
otel_config.init_otel(service_name="my-django-app")
wsgi.py
import otel_config
otel_config.init_otel(service_name="my-django-app")
celery.py
from celery.signals import worker_init, worker_process_init
@worker_process_init.connect
def setup_prefork_worker(**kwargs):
import otel_config
otel_config.init_otel(service_name="my-celery-worker")
@worker_init.connect
def setup_thread_worker(sender=None, **kwargs):
pool = str(sender.pool_cls) if hasattr(sender, "pool_cls") else ""
if "threads" in pool:
import otel_config
otel_config.init_otel(service_name="my-celery-worker")