Django Celery with prefork workers breaks OpenTelemetry metrics
I have a Django application I wanted to instrument with OpenTelemetry for traces and metrics. I created an otel_config.py
file next to my manage.py
with this content:
# resources
def get_default_service_instance_id():
try:
hostname = socket.gethostname() or "unknown-host"
except Exception as e:
hostname = "unknown-host"
try:
process_id = os.getpid()
except Exception as e:
process_id = "unknown-pid"
return f"{hostname}-{process_id}"
service_name = "my-service"
otlp_endpoint = "http://otel-collector:4318"
service_instance_id = get_default_service_instance_id()
resource = Resource.create(
{
SERVICE_NAME: service_name,
SERVICE_INSTANCE_ID: service_instance_id,
}
)
# traces
otlp_endpoint_traces = urljoin(otlp_endpoint, "/v1/traces")
trace_exporter = OTLPSpanExporter(endpoint=otlp_endpoint_traces)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
trace.get_tracer_provider().add_span_processor(span_processor)
# metrics
otlp_endpoint_metrics = urljoin(otlp_endpoint, "/v1/metrics")
metric_exporter = OTLPMetricExporter(endpoint=otlp_endpoint_metrics)
metric_reader = PeriodicExportingMetricReader(metric_exporter)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# instrument
DjangoInstrumentor().instrument()
Psycopg2Instrumentor().instrument()
CeleryInstrumentor().instrument()
Then, I simply imported it at the end of my settings.py
file like below:
import otel_config
Although my traces and metrics work fine in most cases, my OpenTelemetry metrics are broken in the case of celery workers with prefork mode.
In case of prefork workers, the child processes happen to get the same SERVICE_INSTANCE_ID
as the parent process. Therefore, different child processes manipulate the same metric value as each has its own exclusive memory. Thus, the value in my collector gets changed very often and is wrong since it's not the aggregation value of all the child processes.
It's not the same in the case of my thread workers (--pool=threads
) since all different threads of the same worker use the same metric instance in memory, and the final value in the collector is always the correct aggregation value.
I've tried multiple solutions, but I get other drawbacks that I'll explain hereafter.
- Using
worker_init
andworker_process_init
signals
here, I removed import otel_config
from settings.py
and added it in worker_init
and worker_process_init
signal handlers like below:
from celery.signals import worker_init, worker_process_init
@worker_init
def worker_init_handler(sender, **kwargs):
pool_cls = str(sender.pool_cls) if hasattr(sender, 'pool_cls') else None
if "threads" in pool_cls: # only if --pool=threads
import otel_config
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
import otel_config
I had to keep worker_init_handler
as I also have thread workers, and in threads workers, no worker_process_init
signal is sent. However, I needed the instrumentation and metrics in my thread workers as well.
I also had to add import otel_config
to my wsgi.py
file for my Django API traces and metrics.
However, in this approach, I lose the metrics sent by the parent process. I have a task_received
signal handler that exports some metrics, and this signal is handled only in the parent process.
I'm also not sure if it's a good practice to instrument OpenTelemetry in multiple places instead of only one place (i.e. settings.py
).
- Reinstrumentation in case of prefork child processes
In this approach, I tried to re-import everything in case of the child process initialization:
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
import import otel_config
However, Python Otel works on a singleton basis. meaning setting the MeterProvider
and TracerProvider
and instrumenting (e.g. DjangoInstrumentor().instrument()
) once, results in blockage of next settings and instrumentations. So, this reimporting in worker_process_init_handler
here wouldn't do anything at all. I even tried other hacks like using the following in case of worker_process_init_handler
(child processes):
# trace.set_tracer_provider(tracer_provider) # this one is singleton so commented
trace._TRACER_PROVIDER = tracer_provider
...
# metrics.set_meter_provider(meter_provider) # this one is singleton so commented
metrics._internal._METER_PROVIDER = meter_provider
But this approach is too hacky and requires other hacks on the Instrumentors and get_meter
usage in the codebase.
All in all, I'd like to know if anyone had any similar experience and what the practice would be in such issues.