opentelemetry python sdk does not work well under fork
Describe your environment
OS: (Ubuntu 22.04) Python version: (Python 3.12) SDK version: (1.26.0) API version: (1.26.0)
What happened?
i wrote the code below:
import os
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from functools import wraps
from multiprocessing import Process
import multiprocessing as mp
def init_tracer():
resource = Resource.create(
attributes={
"service.name": "api-service",
# If workers are not distinguished within attributes, traces and
# metrics exported from each worker will be indistinguishable. While
# not necessarily an issue for traces, it is confusing for almost
# all metric types. A built-in way to identify a worker is by PID
# but this may lead to high label cardinality. An alternative
# workaround and additional discussion are available here:
# https://github.com/benoitc/gunicorn/issues/1352
"worker": os.getpid(),
}
)
trace.set_tracer_provider(TracerProvider(resource=resource))
# This uses insecure connection for the purpose of example. Please see the
# OTLP Exporter documentation for other options.
span_processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://tempo.mycompany.cn:4318/v1/traces")
)
trace.get_tracer_provider().add_span_processor(span_processor)
# def post_fork(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# init_tracer()
# res = func(*args, **kwargs)
# return res
# return wrapper
#@post_fork
import time
def worker_func(name):
with trace.get_tracer(__name__).start_as_current_span('multi-span') as span:
span.set_attribute("pid", os.getpid())
print(f"worker_func {name} running, span context : {trace.get_current_span().get_span_context()}")
if __name__ == '__main__':
init_tracer()
print('----------------------')
mp.set_start_method('fork')
p = Process(target=worker_func, args=('bob',))
p.start()
print("main process will wait child process")
i want to see the trace which is generated in the child process to be exported to the tempo. but it does not.
Steps to Reproduce
import os
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
from functools import wraps
from multiprocessing import Process
import multiprocessing as mp
def init_tracer():
resource = Resource.create(
attributes={
"service.name": "api-service",
# If workers are not distinguished within attributes, traces and
# metrics exported from each worker will be indistinguishable. While
# not necessarily an issue for traces, it is confusing for almost
# all metric types. A built-in way to identify a worker is by PID
# but this may lead to high label cardinality. An alternative
# workaround and additional discussion are available here:
# https://github.com/benoitc/gunicorn/issues/1352
"worker": os.getpid(),
}
)
trace.set_tracer_provider(TracerProvider(resource=resource))
# This uses insecure connection for the purpose of example. Please see the
# OTLP Exporter documentation for other options.
span_processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://tempo.mycopany.ac.cn:4318/v1/traces")
)
trace.get_tracer_provider().add_span_processor(span_processor)
# def post_fork(func):
# @wraps(func)
# def wrapper(*args, **kwargs):
# init_tracer()
# res = func(*args, **kwargs)
# return res
# return wrapper
#@post_fork
import time
def worker_func(name):
with trace.get_tracer(__name__).start_as_current_span('multi-span') as span:
span.set_attribute("pid", os.getpid())
print(f"worker_func {name} running, span context : {trace.get_current_span().get_span_context()}")
if __name__ == '__main__':
init_tracer()
print('----------------------')
mp.set_start_method('fork')
p = Process(target=worker_func, args=('bob',))
p.start()
print("main process will wait child process")
Expected Result
i want to see the trace which is generated in the child process to be exported to the tempo.
and could you offer some function to reinitialize the TracerProvider object after i fork a process, and i can use it as a total new object. and the all of state is right.
Actual Result
the readable span info do not be exported in the BatchSpanProcessor.worker() function.
Additional context
- i init the TracerProvider in the main process.
- and i fork a process, and execute some logic.
- in my worker_func, i generate a span.
- i add some log in the opentelemetry-sdk lib.
- it seems that when my child process exit, the TracerProvider object in child process does not call the shutdown function, so the SynchronousMultiSpanProcessor.shutdown() can not be called, so the BatchSpanProcessor.worker() can not be waited. when i span info be send to queue, and just then, when the worker wait the condition(timeout), the worker thread killed when the children process exit.
could you offer some function to reinitialize the TracerProvider object after i fork a process, and i can use it as a total new object. and the all of state is right.
Would you like to implement a fix?
Yes
The problem is that forked processes don't call atexit which is where final cleanup happens to ensure that spans are exported. We're fixing this in our SDK by patching os._exit: https://github.com/pydantic/logfire/pull/785
I've been investigating this and believe there's a bit more involved. Here's what I've found:
@alexmojaki is correct that Python didn't use to call atexit functions in forked processes but as of Python 3.13, this is fixed: https://github.com/python/cpython/pull/114279. One very important detail here is that the forked process must register the function itself; it doesn't inherit those functions from the original process.
In my testing, the atexit registration that TraceProvider does here does not re-run on the forked process because the TraceProvider instance is already created and so these lines in it's __init__ method aren't re-run after forking.
I figured that this could be rectified by registering an os.register_at_fork(after_in_child=...) function like is done here. This is how the threads that we're wanting to cleanly shutdown are setup, so surely it would be the right way to also setup the clean shutdown of them. In my testing however, the registered atexit functions are cleared in the forked process after the os.register_at_fork callback is called. So I don't believe that Python provides a hook to do the setup we'd like at fork time to control the atexit functions for that forked process. When I monkey-patch the atexit module's internal function it uses to clear those functions to be a no-op, they remain and things work. But that's patching which we already know can be used to solve the problem, ex. on os._exit as mentioned.
So to leverage Python 3.13's atexit fix for forked processes for this issue, I can see roughly two options:
-
CPython fix to clear
atexitfunctions in forked processes afterregister_at_forkcallbacks, not before. Or honestly, might be simpler if CPython allowed the option of registeringatexitfunctions that are not cleared on fork, maybeatexit.register(func, keep_on_fork=True). In addition to a CPython change, the Python opentelemetry library would need to make a change to use this of course. -
Since opentelemetry is spinning up threads in the child process, there's surely a way for it to call
atexit.register()a little down the road and avoid those getting cleared. Like, not immediately after forking, but as the first thing in a child thread. I'm not sure where best to do this or how reliable it would be; race conditions?
Maybe there's a better way, or something I'm not thinking of.
3.13 is very new, while 3.12's end-of-life is more than 3 years away. I don't think features in 3.13 should be the solution right now.