opentelemetry-python icon indicating copy to clipboard operation
opentelemetry-python copied to clipboard

Application freezes up after X amount of transactions when running two TracerProviders

Open twiclo opened this issue 1 year ago • 0 comments

Describe your environment Python3.11.6 'opentelemetry-api==1.22.0', 'opentelemetry-sdk==1.22.0', 'opentelemetry-instrumentation-aio-pika==0.43b0', 'opentelemetry-exporter-jaeger==1.21.0'

I originally opened this discussion but after troubleshooting for a few days I think I may have found a bug with the library. To summarize: I have an application that takes HTTP calls and converts them into RabbitMQ messages. OTel is added to send traces to Jaeger. The application takes HTTP calls specific to a "service". Each "service" implements BasePublisher and needs its own tracer_provider so that in Jaeger they're reported as individual services. When I run the application configured this way it will last about 10 hours under normal load before freezing up. Memory usage doesn't go up and CPU usage goes from normal, a few percentage points, to max. The process has to be hard killed. The application lasts 6 minutes when ran under a load test. When disabling the OTel code the application runs smoothly forever under load.

Here's the code:

from opentelemetry import trace, context
from opentelemetry.propagate import inject, extract
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.context import attach, detach
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

class BasePublisher(ABC):
    name: str
    tracer_provider: Optional[TracerProvider] = None

    def set_tracer_provider(self):
        tracer_provider = TracerProvider(resource=Resource.create({"service.name": self.name}))
        jaeger_exporter = JaegerExporter(agent_host_name=env['JAEGER_ADDRESS'], agent_port=int(env['JAEGER_PORT']))
        tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
        self.tracer_provider = tracer_provider

    async def publish(self, exchange: Exchange, msg: Union[str, bytes], routing_key: str, correlation_id: str=str(uuid.uuid4())):
        if self.tracer_provider is None:
            self.set_tracer_provider()

        tracer = trace.get_tracer(self.name, tracer_provider=self.tracer_provider)

        message = Message(...)

        log.info(f"Publishing message with rouing key `{routing_key}`")
        with tracer.start_as_current_span("publish") as span:
            span.set_attribute("rmq.message.routing_key", routing_key)
            span.set_attribute("rmq.message.correlation_id", correlation_id if correlation_id else False)
            span.set_attribute("rmq.message.timestamp", str(message.timestamp))

            # My application also handles consuming. The context is injected into my RMQ message so it can be used
            # if a consume is a part of the trace
            inject(message.headers, context=context.get_current())

            await exchange.publish(message, routing_key, mandatory=False)
        log.info("Message published")

        handler.flush()
        return "Message published successfully"

In my testing I found that I'm not creating some object that isn't being garbage collected. At least as far I can tell. There's just one instance of the ABC for each service that gets created at startup. My tracer_provider variable is set once then never again.

One last thing; if I run it under load I can see the rate of messages getting posted to RMQ start to fluctuate a little. That's how I know the application is about to freeze. If I stop the load test and let it sit for a few minutes then start the test again, it fails just as fast. So something is being held onto here.

twiclo avatar Feb 23 '24 19:02 twiclo