opentelemetry-python icon indicating copy to clipboard operation
opentelemetry-python copied to clipboard

opentelemetry python sdk does not work well under fork

Open yurneroma opened this issue 1 year ago • 3 comments

Describe your environment

OS: (Ubuntu 22.04) Python version: (Python 3.12) SDK version: (1.26.0) API version: (1.26.0)

What happened?

i wrote the code below:

import os
from opentelemetry import  trace

from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from functools import wraps
from multiprocessing import Process
import multiprocessing as mp

def init_tracer():
    resource = Resource.create(
        attributes={
            "service.name": "api-service",
            # If workers are not distinguished within attributes, traces and
            # metrics exported from each worker will be indistinguishable. While
            # not necessarily an issue for traces, it is confusing for almost
            # all metric types. A built-in way to identify a worker is by PID
            # but this may lead to high label cardinality. An alternative
            # workaround and additional discussion are available here:
            # https://github.com/benoitc/gunicorn/issues/1352
            "worker": os.getpid(),
        }
    )

    trace.set_tracer_provider(TracerProvider(resource=resource))
    # This uses insecure connection for the purpose of example. Please see the
    # OTLP Exporter documentation for other options.
    span_processor = BatchSpanProcessor(
            OTLPSpanExporter(endpoint="http://tempo.mycompany.cn:4318/v1/traces")
    )

    trace.get_tracer_provider().add_span_processor(span_processor)


# def post_fork(func):
#     @wraps(func)
#     def wrapper(*args, **kwargs):
#         init_tracer()
#         res = func(*args, **kwargs)
#         return res
#     return wrapper

#@post_fork
import time
def worker_func(name):
    with trace.get_tracer(__name__).start_as_current_span('multi-span') as span:
        span.set_attribute("pid", os.getpid())
        print(f"worker_func {name} running, span context : {trace.get_current_span().get_span_context()}")




if __name__ == '__main__':
    init_tracer()
    print('----------------------')

    mp.set_start_method('fork')
    p = Process(target=worker_func, args=('bob',))
    p.start()
    print("main process will wait child process")


 

i want to see the trace which is generated in the child process to be exported to the tempo. but it does not.

Steps to Reproduce

import os
from opentelemetry import  trace

from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from functools import wraps
from multiprocessing import Process
import multiprocessing as mp

def init_tracer():
    resource = Resource.create(
        attributes={
            "service.name": "api-service",
            # If workers are not distinguished within attributes, traces and
            # metrics exported from each worker will be indistinguishable. While
            # not necessarily an issue for traces, it is confusing for almost
            # all metric types. A built-in way to identify a worker is by PID
            # but this may lead to high label cardinality. An alternative
            # workaround and additional discussion are available here:
            # https://github.com/benoitc/gunicorn/issues/1352
            "worker": os.getpid(),
        }
    )

    trace.set_tracer_provider(TracerProvider(resource=resource))
    # This uses insecure connection for the purpose of example. Please see the
    # OTLP Exporter documentation for other options.
    span_processor = BatchSpanProcessor(
            OTLPSpanExporter(endpoint="http://tempo.mycopany.ac.cn:4318/v1/traces")
    )

    trace.get_tracer_provider().add_span_processor(span_processor)


# def post_fork(func):
#     @wraps(func)
#     def wrapper(*args, **kwargs):
#         init_tracer()
#         res = func(*args, **kwargs)
#         return res
#     return wrapper

#@post_fork
import time
def worker_func(name):
    with trace.get_tracer(__name__).start_as_current_span('multi-span') as span:
        span.set_attribute("pid", os.getpid())
        print(f"worker_func {name} running, span context : {trace.get_current_span().get_span_context()}")




if __name__ == '__main__':
    init_tracer()
    print('----------------------')

    mp.set_start_method('fork')
    p = Process(target=worker_func, args=('bob',))
    p.start()
    print("main process will wait child process")


    
 

Expected Result

i want to see the trace which is generated in the child process to be exported to the tempo.

and could you offer some function to reinitialize the TracerProvider object after i fork a process, and i can use it as a total new object. and the all of state is right.

Actual Result

the readable span info do not be exported in the BatchSpanProcessor.worker() function.

Additional context

  • i init the TracerProvider in the main process.
  • and i fork a process, and execute some logic.
  • in my worker_func, i generate a span.
  • i add some log in the opentelemetry-sdk lib.
  • it seems that when my child process exit, the TracerProvider object in child process does not call the shutdown function, so the SynchronousMultiSpanProcessor.shutdown() can not be called, so the BatchSpanProcessor.worker() can not be waited. when i span info be send to queue, and just then, when the worker wait the condition(timeout), the worker thread killed when the children process exit.
    image

could you offer some function to reinitialize the TracerProvider object after i fork a process, and i can use it as a total new object. and the all of state is right.

Would you like to implement a fix?

Yes

yurneroma avatar Oct 09 '24 07:10 yurneroma

The problem is that forked processes don't call atexit which is where final cleanup happens to ensure that spans are exported. We're fixing this in our SDK by patching os._exit: https://github.com/pydantic/logfire/pull/785

alexmojaki avatar Jan 08 '25 14:01 alexmojaki

I've been investigating this and believe there's a bit more involved. Here's what I've found:

@alexmojaki is correct that Python didn't use to call atexit functions in forked processes but as of Python 3.13, this is fixed: https://github.com/python/cpython/pull/114279. One very important detail here is that the forked process must register the function itself; it doesn't inherit those functions from the original process.

In my testing, the atexit registration that TraceProvider does here does not re-run on the forked process because the TraceProvider instance is already created and so these lines in it's __init__ method aren't re-run after forking.

I figured that this could be rectified by registering an os.register_at_fork(after_in_child=...) function like is done here. This is how the threads that we're wanting to cleanly shutdown are setup, so surely it would be the right way to also setup the clean shutdown of them. In my testing however, the registered atexit functions are cleared in the forked process after the os.register_at_fork callback is called. So I don't believe that Python provides a hook to do the setup we'd like at fork time to control the atexit functions for that forked process. When I monkey-patch the atexit module's internal function it uses to clear those functions to be a no-op, they remain and things work. But that's patching which we already know can be used to solve the problem, ex. on os._exit as mentioned.

So to leverage Python 3.13's atexit fix for forked processes for this issue, I can see roughly two options:

  1. CPython fix to clear atexit functions in forked processes after register_at_fork callbacks, not before. Or honestly, might be simpler if CPython allowed the option of registering atexit functions that are not cleared on fork, maybe atexit.register(func, keep_on_fork=True). In addition to a CPython change, the Python opentelemetry library would need to make a change to use this of course.

  2. Since opentelemetry is spinning up threads in the child process, there's surely a way for it to call atexit.register() a little down the road and avoid those getting cleared. Like, not immediately after forking, but as the first thing in a child thread. I'm not sure where best to do this or how reliable it would be; race conditions?

Maybe there's a better way, or something I'm not thinking of.

camlee avatar Mar 15 '25 05:03 camlee

3.13 is very new, while 3.12's end-of-life is more than 3 years away. I don't think features in 3.13 should be the solution right now.

alexmojaki avatar Mar 15 '25 09:03 alexmojaki