Gunicorn + Gevent + Opentelemetry

Anyone using Django + Gunicorn + Gevent + Opentelemetry in production? Would love to know how you got it to work.

Seems like I can't seem to use BatchSpanProcessor or BatchLogRecordProcessor. I'm getting errors which seem to have many open issues but didn't find a solution.

I get this after the gunicorn is started but the server still accepts requests and serves them.

Traceback (most recent call last):
  File "src/gevent/_abstract_linkable.py", line 287, in gevent._gevent_c_abstract_linkable.AbstractLinkable._notify_links
  File "src/gevent/_abstract_linkable.py", line 333, in gevent._gevent_c_abstract_linkable.AbstractLinkable._notify_links
AssertionError: (None, <callback at 0xffffb32b0640 args=([],)>)
2025-06-09T14:58:34Z <callback at 0xffffb32b0640 args=([],)> failed with AssertionError

2025-06-09T14:58:34Z <callback at 0xffffb32903c0 args=([],)> failed with AssertionError

When I start hitting it with many requests, then the following error comes up which is problematic.

2025-06-09T14:40:18Z <Greenlet at 0xffff65bb5b20: <bound method Thread._bootstrap of <Thread(OtelBatchSpanRecordProcessor, stopped daemon 281472388520736)>>> failed with KeyError

Traceback (most recent call last):
  File "src/gevent/greenlet.py", line 900, in gevent._gevent_cgreenlet.Greenlet.run
  File "/usr/local/lib/python3.11/threading.py", line 1002, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.11/threading.py", line 1037, in _bootstrap_inner
    del _limbo[self]
        ~~~~~~^^^^^^
KeyError: <Thread(OtelBatchSpanRecordProcessor, stopped daemon 281472388522016)>

I am starting the otel code from wsgi.py with preload app being false. So, the code is being executed after the forking and gevent monkey patching occurs. This I have validated.

Below is the function I'm using to start otel instrumentation

def init_opentelemetry():
    """
    Why we're not adding this in post_fork method in gunicorn.conf.py?
    When we use gevent worker class with gunicorn, the monkey patching is done after the post_fork method is complete.
    But, if we import a lot of things before that, gevent monkey patching will not work properly and cause errors.
    Hence, had to move this away from post_fork method.
    """
    import logging
    from uuid import uuid4

    from opentelemetry import metrics, trace
    from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
        OTLPLogExporter,
    )
    from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
        OTLPMetricExporter,
    )
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
        OTLPSpanExporter,
    )

    # support for logs is currently experimental
    from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
    from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
    from opentelemetry.sdk.environment_variables import OTEL_EXPORTER_OTLP_ENDPOINT
    from opentelemetry.sdk.metrics import MeterProvider
    from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
    from opentelemetry.sdk.resources import Resource
    from opentelemetry.sdk.resources import SERVICE_INSTANCE_ID
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    collector_endpoint = os.environ.get(OTEL_EXPORTER_OTLP_ENDPOINT, "")

    resource = Resource.create(
        attributes={
            # each worker needs a unique service.instance.id to distinguish the created metrics in prometheus
            SERVICE_INSTANCE_ID: str(uuid4()),
            # "worker": worker.pid,
            "service.name": API_V2_APP_NAME,
        }
    )

    tracer_provider = TracerProvider(resource=resource)
    tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=collector_endpoint)))
    trace.set_tracer_provider(tracer_provider)

    metrics.set_meter_provider(
        MeterProvider(
            resource=resource,
            metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=collector_endpoint))],
        )
    )

    logger_provider = LoggerProvider(resource=resource)
    logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter(endpoint=collector_endpoint)))
    from opentelemetry import _logs
    _logs.set_logger_provider(logger_provider)
    logging.getLogger().addHandler(LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider))
    from opentelemetry.instrumentation.django import DjangoInstrumentor
    from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
    from opentelemetry.instrumentation.requests import RequestsInstrumentor
    from opentelemetry.instrumentation.logging import LoggingInstrumentor

    DjangoInstrumentor().instrument()
    Psycopg2Instrumentor().instrument()
    RequestsInstrumentor().instrument()
    LoggingInstrumentor().instrument(set_logging_format=True)
Вернуться на верх