dd-trace-py
dd-trace-py copied to clipboard
Child spans lost when created in a forked subprocess
I can't seem to get Datadog APM to pick up trace spans that are instantiated from within a child worker process of a traced parent process. The spans are created, and linked to the parent, but something goes wrong in storing and publishing them.
It could be that the worker thread created by ddtrace._worker.PeriodicWorkerThread that invokes the AgentWriter is halted in the child process when the main process is forked. But I can't prove that.
Maybe creating and sending spans from a forked child process was never supported? There is code however that seems to indicate support and accommodation of this: https://github.com/DataDog/dd-trace-py/blob/046cad6f9ab9e6101be858ce37749d194fcaa507/ddtrace/tracer.py#L416-L438
Which version of dd-trace-py are you using?
ddtrace: 0.32.2 (also reproducible using the test script below and ddtrace 0.33.0)
Python: 3.7.3
Which version of the libraries are you using?
Reproducible with no other libraries than ddtrace
How can we reproduce your problem?
This test script below:
- Run
pip install ddtrace==0.32.2 - Run
python3 app.pyfor successful case - Run
python3 app.py --subprocessfor failing case - (Optionally) See more verbose state of the tracer by running at
INFO:python3 app.py --subprocess --loglevel=INFO
import argparse
import logging
import time
import multiprocessing as mp
from ddtrace import tracer
logger = logging.getLogger(__name__)
JOB_TYPE = "test"
span_count = 0
class PreWriteTraceLoggingFilter:
"""Log spans being seen by the AgentWriter"""
def process_trace(self, trace):
for span in trace:
global span_count
span_count += 1
logger.warning(f"\nSEEING SPAN {span_count} IN PRE-WRITE TRACE LOGGING FILTER:\n{span.pprint()}")
return trace
class App:
@staticmethod
def run(in_subprocess=False):
if tracer.writer._filters:
tracer.writer._filters.append(PreWriteTraceLoggingFilter())
else:
tracer.writer._filters = [PreWriteTraceLoggingFilter()]
for i in range(1, 6):
# Start a Datadog Trace for this iter to capture activity in APM
with tracer.trace(
name=f"job.{JOB_TYPE}", service="test-app", resource="this.thing", span_type="worker"
):
# Schedule a job to be done
do_work(job=job_to_be_done, job_args=(i,), in_subprocess=in_subprocess)
# Pause before next job
time.sleep(1)
if logger.isEnabledFor(logging.INFO):
root_span = tracer.current_root_span()
if root_span:
logger.info(
f"Datadog ROOT SPAN exists.\nSPAN DETAILS:\n{root_span}"
f"\nspan.meta:\n{root_span.meta}"
f"\nspan.metrics:\n{root_span.metrics}"
f"\nTRACER DETAILS:\n{tracer}"
f"\nTRACER DICT:\n{tracer.__dict__}"
f"\nWRITER DETAILS:\n{tracer.writer}"
f"\nWRITER DICT:\n{tracer.writer.__dict__}"
)
else:
logger.info(
"Datadog ROOT span does NOT exist. Perhaps tracing is not enabled or a span needs to be "
"started"
)
def do_work(job, job_args, in_subprocess=False):
if not in_subprocess:
job(*job_args)
else:
logger.warning("----> Working job in a subprocess")
ctx = mp.get_context("fork")
worker = ctx.Process(
name="job_worker",
target=job,
args=job_args,
daemon=False,
)
worker.start()
worker.join()
def job_to_be_done(job_id):
if logger.isEnabledFor(logging.INFO):
current_span = tracer.current_span()
if current_span:
logger.info(
f"Datadog CURRENT span exists in subprocess.\nSPAN DETAILS:\n{current_span}"
f"\nspan.meta:\n{current_span.meta}"
f"\nspan.metrics:\n{current_span.metrics}"
f"\nTRACER DETAILS:\n{tracer}"
f"\nTRACER DICT:\n{tracer.__dict__}"
f"\nWRITER DETAILS:\n{tracer.writer}"
f"\nWRITER DICT:\n{tracer.writer.__dict__}"
)
else:
logger.info(
"Datadog CURRENT span does NOT exist in subprocess."
)
with tracer.trace(
name=f"job.{JOB_TYPE}.work",
service="test-app",
resource=str(job_id),
span_type="worker"
) as span:
if logger.isEnabledFor(logging.INFO):
if span:
logger.info(
f"Datadog CHILD span was created in subprocess.\nSPAN DETAILS:\n{span}"
f"\nspan.meta:\n{span.meta}"
f"\nspan.metrics:\n{span.metrics}"
f"\nTRACER DETAILS:\n{tracer}"
f"\nTRACER DICT:\n{tracer.__dict__}"
f"\nWRITER DETAILS:\n{tracer.writer}"
f"\nWRITER DICT:\n{tracer.writer.__dict__}"
)
else:
logger.info(
"Datadog CHILD span was NOT created in subprocess."
)
# The "job_to_be_done" ... log some math
logger.warning(
"\n\t" + " " * 20 +
"\n\t" + "=" * 20 +
f"\n\tJob {job_id}: 2**{job_id} = {2**job_id}" +
"\n\t" + "=" * 20 +
"\n\t" + " " * 20
)
if logger.isEnabledFor(logging.INFO):
logger.info(
f"Writer Queue Details:\n{tracer.writer._trace_queue}"
f"\nWriter Queue Dict:\n{tracer.writer._trace_queue.__dict__}"
f"\nWriter queue has {tracer.writer._trace_queue.accepted} traces "
f"\nand {tracer.writer._trace_queue.accepted_lengths} total spans"
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--subprocess", action="store_true", help="Do work in a subprocess?")
parser.add_argument("--loglevel", default="WARNING", help="Name the desired logging log level")
args = parser.parse_args()
loglvl = logging._nameToLevel[args.loglevel.upper()]
logging.basicConfig(level=loglvl)
App.run(args.subprocess)
What is the result that you get?
- When child spans are started with
tracer.trace(...)in a child worker process, they are not being picked up by theAgentWriter, and don't appear to be added to the writer's_trace_queue.- The child spans are created however and are able to link to the parent span created in the parent process by way of the active
contextwhen the child span was created
- The child spans are created however and are able to link to the parent span created in the parent process by way of the active
- This results in those more detailed child spans, and any other spans created by patched modules ... never make it to Datadog APM.
- In my scenario the meat of the more interesting work with various modules happens in the worker process. So it makes for some pretty uninteresting traces to look at in APM.
OUTPUT OF TEST SCRIPT WITH SUBPROCESS (5 spans seen, no child spans):
python3 app.py --subprocess
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
====================
Job 1: 2**1 = 2
====================
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
====================
Job 2: 2**2 = 4
====================
WARNING:__main__:
SEEING SPAN 1 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 16530961218632425167
trace_id 6825816557751025017
parent_id None
service test-app
resource this.thing
type worker
start 1579904772.6704671
end 1579904773.6800072
duration 1.009540s
error 0
tags
system.pid:29954
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
====================
Job 3: 2**3 = 8
====================
WARNING:__main__:
SEEING SPAN 2 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 17380868482985867897
trace_id 17662960499728435942
parent_id None
service test-app
resource this.thing
type worker
start 1579904773.6803951
end 1579904774.6915362
duration 1.011141s
error 0
tags
system.pid:29954
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
====================
Job 4: 2**4 = 16
====================
WARNING:__main__:
SEEING SPAN 3 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 1070944698873432775
trace_id 14861366285187849972
parent_id None
service test-app
resource this.thing
type worker
start 1579904774.691659
end 1579904775.706146
duration 1.014487s
error 0
tags
system.pid:29954
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
====================
Job 5: 2**5 = 32
====================
WARNING:__main__:
SEEING SPAN 4 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 7299252520529262583
trace_id 12962911925894524263
parent_id None
service test-app
resource this.thing
type worker
start 1579904775.706304
end 1579904776.715918
duration 1.009614s
error 0
tags
system.pid:29954
WARNING:__main__:
SEEING SPAN 5 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 18157549604617954965
trace_id 11988146514970803406
parent_id None
service test-app
resource this.thing
type worker
start 1579904776.716063
end 1579904777.729151
duration 1.013088s
error 0
tags
system.pid:29954
What is result that you expected?
- When running all code execution in-process of the main application process, all parent and child spans are seen by the
AgentWriterand sent using API code to Datadog API - I'd expect all spans, including those created from a forked child process, to be created, get stored in the queue, flushed from the queue, sent to the API, and visible in the Datadog APM dashboard.
- With nesting of child spans in parent spans on flame graphs
OUTPUT OF TEST SCRIPT WITH NO SUBPROCESS (10 spans seen):
python3 app.py --subprocess
WARNING:__main__:
====================
Job 1: 2**1 = 2
====================
WARNING:__main__:
====================
Job 2: 2**2 = 4
====================
WARNING:__main__:
SEEING SPAN 1 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 14675814759724508398
trace_id 770041225728864883
parent_id None
service test-app
resource this.thing
type worker
start 1579904717.645301
end 1579904718.649347
duration 1.004046s
error 0
tags
system.pid:29819
WARNING:__main__:
SEEING SPAN 2 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test.work
id 18431673632679444143
trace_id 770041225728864883
parent_id 14675814759724508398
service test-app
resource 1
type worker
start 1579904717.645469
end 1579904717.645587
duration 0.000118s
error 0
tags
WARNING:__main__:
====================
Job 3: 2**3 = 8
====================
WARNING:__main__:
====================
Job 4: 2**4 = 16
====================
WARNING:__main__:
SEEING SPAN 3 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 17201101031906148987
trace_id 3672484094332845497
parent_id None
service test-app
resource this.thing
type worker
start 1579904718.649627
end 1579904719.650445
duration 1.000818s
error 0
tags
system.pid:29819
WARNING:__main__:
SEEING SPAN 4 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test.work
id 3543179655255681397
trace_id 3672484094332845497
parent_id 17201101031906148987
service test-app
resource 2
type worker
start 1579904718.649804
end 1579904718.6500342
duration 0.000230s
error 0
tags
WARNING:__main__:
SEEING SPAN 5 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 17478005526224945084
trace_id 14657657786022608386
parent_id None
service test-app
resource this.thing
type worker
start 1579904719.6505458
end 1579904720.6558309
duration 1.005285s
error 0
tags
system.pid:29819
WARNING:__main__:
SEEING SPAN 6 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test.work
id 13770363007737709916
trace_id 14657657786022608386
parent_id 17478005526224945084
service test-app
resource 3
type worker
start 1579904719.650626
end 1579904719.650753
duration 0.000127s
error 0
tags
WARNING:__main__:
====================
Job 5: 2**5 = 32
====================
WARNING:__main__:
SEEING SPAN 7 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 10600570517259197084
trace_id 10546914854753799373
parent_id None
service test-app
resource this.thing
type worker
start 1579904720.655954
end 1579904721.660364
duration 1.004410s
error 0
tags
system.pid:29819
WARNING:__main__:
SEEING SPAN 8 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test.work
id 5692015905287033963
trace_id 10546914854753799373
parent_id 10600570517259197084
service test-app
resource 4
type worker
start 1579904720.65604
end 1579904720.656203
duration 0.000163s
error 0
tags
WARNING:__main__:
SEEING SPAN 9 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 6247140980905075234
trace_id 16399843480143240570
parent_id None
service test-app
resource this.thing
type worker
start 1579904721.660526
end 1579904722.664381
duration 1.003855s
error 0
tags
system.pid:29819
WARNING:__main__:
SEEING SPAN 10 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test.work
id 1084021125221115092
trace_id 16399843480143240570
parent_id 6247140980905075234
service test-app
resource 5
type worker
start 1579904721.660671
end 1579904721.660921
duration 0.000250s
error 0
tags
Bump.
Any traction on this?
Any quick acknowledgement that this is something that should or should not be doable (continuing a trace from parent process to forked child process, that is spans created in each process are added to that final trace and span graph).
Thank you
Bump. Again.
Maybe not looking for a fix right away, but is this a) a bug with a feature that should work? b) an unsupported use case?
Hey @kwhickey so sorry for the long silence on this.
Tracing subprocess is not something that is supported out of the box right now, so it is feature request rather than a bug.
@majorgreys was working on adding support for multiprocessing library (#1171) but unfortunately that was bumped for some other priorities we had to work on :(
In general the approach that we need to take is: at the time we start a subprocess we need to pass through the currently active span's trace id and span id, then on the child process before starting any spans we need to initialize the context setting the trace id and parent id from the parent processes info.
So in your example:
- In
do_workwhen you start the worker- grab the currently active span
- pass the
span.span_idandspan.trace_idinto the subprocess- not sure the best approach, it could be as "simple" as appending to the job args, but this approach obviously has it's downsides... like needing to update all jobs to add those arguments
- In
job_to_be_done- grab the passed in span id and trace id
- again, could be as simple as having them as function arguments
- before doing anything (first line of the function) create and activate a trace context
- grab the passed in span id and trace id
def job_to_be_done(job_id, trace_id, span_id):
context = ddtrace.Context(trace_id=trace_id, span_id=span_id)
tracer.context_provider.activate(context)
# rest of the function
I know this example/approach is a little out there. This is mostly just a rough example of how this type of feature would need to work whether it was done manually by you or handled by us in an integration.
Please take a look at #1171 and follow along there, we definitely want to get this back on the schedule to finish up.
I can't seem to read today, I realize now I didn't really address your question.
Hi @kwhickey! (@brettlangdon beat me to the reply, but I'll still add my thoughts)
Thanks for the very detailed and insightful write-up (made grokking the issue a ton easier) and your patience. At first pass all your reasoning seems correct to me. Continuing a trace to a forked process should be doable but it looks like there's a bug somewhere along the way in the forked process which is resulting in the dropped spans. I'll try taking a deeper look with your example when I get the chance.
We don't have very much coverage for the forking case (and nothing past the trace being added to the queue, so certainly something could be going awry after that point). This could certainly be improved on our end!
Like @brettlangdon mentioned, we do have some work on subprocessing/multiprocessing in #1171 that we can hopefully get back to that will have similar testing requirements where we can explore fixing this bug as well. :slightly_smiling_face:
@brettlangdon I tried your suggested workaround of activating the context in the subprocess, but it didn't take. It does not seem to jump-start the tracing again in the subprocess.
These lines changed in job_to_be_done(...) in the above code. Please try out the sample app and see if you can get it to initiate tracing with any other workarounds.
NOTE: I did not need to pass in the span_id and trace_id from the parent process as arguments, because the forked subprocess has a copy of the parent processs memory, and therefore the tracer object from the parent process as well as reference to the current_span() from that tracer object.
def job_to_be_done(job_id):
current_span = tracer.current_span()
parent_ctx = current_span.context
logger.warning(
f"\n\n [ACTIVATING NEW Context OBJECT IN SUBPROCESS]\n"
f"\ttrace_id={current_span.trace_id}\n"
f"\tspan_id={current_span.span_id}\n"
f"\tsampling_priority={parent_ctx.sampling_priority}\n"
f"\t_dd_origin={parent_ctx._dd_origin}\n\n"
)
from ddtrace.tracer import Context
context = Context(
trace_id=current_span.trace_id,
span_id=current_span.span_id,
sampling_priority=parent_ctx.sampling_priority,
_dd_origin=parent_ctx._dd_origin,
)
tracer.context_provider.activate(context)
if logger.isEnabledFor(logging.INFO):
if current_span:
logger.info(
f"Datadog CURRENT span exists in subprocess.\nSPAN DETAILS:\n{current_span}"
f"\nspan.meta:\n{current_span.meta}"
f"\nspan.metrics:\n{current_span.metrics}"
f"\nTRACER DETAILS:\n{tracer}"
f"\nTRACER DICT:\n{tracer.__dict__}"
f"\nWRITER DETAILS:\n{tracer.writer}"
f"\nWRITER DICT:\n{tracer.writer.__dict__}"
)
else:
logger.warning(
"Datadog CURRENT span does NOT exist in subprocess."
)
with tracer.trace(
# ** rest unchanged **
Here is the output.
As you can see, the data from the last "seen span" from the parent process matches the data going into the Context object constructor. For example:
Last Parent Proc Log
SEEING SPAN 5 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 10632735743717901325
trace_id 7598786591009685004
parent_id None
Last Child Process Log
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=7598786591009685004
span_id=10632735743717901325
sampling_priority=1
_dd_origin=None
But still, the activate function does not seem to initiate a new child span when calling tracer.trace(...).
python3 app.py --subprocess
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=4600472673853409428
span_id=10639428754452025545
sampling_priority=1
_dd_origin=None
WARNING:__main__:
====================
Job 1: 2**1 = 2
====================
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=2286970869355803110
span_id=3809425606980525876
sampling_priority=1
_dd_origin=None
WARNING:__main__:
====================
Job 2: 2**2 = 4
====================
WARNING:__main__:
SEEING SPAN 1 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 10639428754452025545
trace_id 4600472673853409428
parent_id None
service test-app
resource this.thing
type worker
start 1584703743.6846411
end 1584703744.696319
duration 1.011678s
error 0
tags
system.pid:85909
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=7332416341348088823
span_id=15235335455626267011
sampling_priority=1
_dd_origin=None
WARNING:__main__:
====================
Job 3: 2**3 = 8
====================
WARNING:__main__:
SEEING SPAN 2 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 3809425606980525876
trace_id 2286970869355803110
parent_id None
service test-app
resource this.thing
type worker
start 1584703744.69682
end 1584703745.706458
duration 1.009638s
error 0
tags
system.pid:85909
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=1331123246548887572
span_id=12276903358193692189
sampling_priority=1
_dd_origin=None
WARNING:__main__:
====================
Job 4: 2**4 = 16
====================
WARNING:__main__:
SEEING SPAN 3 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 15235335455626267011
trace_id 7332416341348088823
parent_id None
service test-app
resource this.thing
type worker
start 1584703745.70666
end 1584703746.720486
duration 1.013826s
error 0
tags
system.pid:85909
WARNING:__main__:----> Working job in a subprocess
WARNING:__main__:
[ACTIVATING NEW Context OBJECT IN SUBPROCESS]
trace_id=7598786591009685004
span_id=10632735743717901325
sampling_priority=1
_dd_origin=None
WARNING:__main__:
====================
Job 5: 2**5 = 32
====================
WARNING:__main__:
SEEING SPAN 4 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 12276903358193692189
trace_id 1331123246548887572
parent_id None
service test-app
resource this.thing
type worker
start 1584703746.72074
end 1584703747.7346241
duration 1.013884s
error 0
tags
system.pid:85909
WARNING:__main__:
SEEING SPAN 5 IN PRE-WRITE TRACE LOGGING FILTER:
name job.test
id 10632735743717901325
trace_id 7598786591009685004
parent_id None
service test-app
resource this.thing
type worker
start 1584703747.734878
end 1584703748.747338
duration 1.012460s
error 0
tags
system.pid:85909
Hi @kwhickey,
I got a chance to dig into this issue... I think we have a solution in #1331.
Basically there were two problems:
-
The one outlined in #1331.
-
The child processes are terminating before the writer is able to write (it writes every 1 second, but doesn't start until the first trace is passed to it). This can be mitigated by adding
tracer.writer.flush_queue()in your child process code.
You should be able to try this out with
$ pip install git+https://github.com/DataDog/dd-trace-py.git@1184
and adding
tracer.writer.flush_queue()
to line 133 of your example you provided (thanks a ton for the example app btw, it was super useful in triaging this thing 🙂).
Let me know if this works for you!
We've got to get some tests in for #1331 but I think it'll probably be included in our next release.
@Kyle-Verhoog Thanks for the progress on this!
I looked at and tested your solution. It looks to be a combination of activating the context in the child process as @brettlangdon mentioned, but also in combination with flushing the queue before exiting the child process to be sure all spans are sent.
It does work, even on the the version I have, when adding both of these to the test-app above. However, I'm wondering if this isn't really the root cause. I see that the AgentWriter uses a PeriodicWorker class that runs on a background thread, and that background thread already has a fail-safe to flush the queue when it exits by way of registering an atexit handler. That handler eventually calls the "work" that the periodic worker does (run_periodic) which in turn flushes the queue.
So under normal circumstances, even if a program exited in less than 1 second, I think the spans created in that short time would be flushed and sent.
What I think may be the root cause here is that atexit does not fire when that background thread is running in a subprocess.
I was trying to setup some test code using PeriodicWorker directly to prove this (inside a subprocess vs. no subprocess), but didn't get quite there. There are a handful of other scenarios where atexit will not fire its callback handler, this could be another one of them.
With all that, the activating the context would still be necessary, but hopefully if it worked as designed, I wouldn't have too add a bunch of try-finally logic to subprocess workers to ensure spans are sent for quick-executing jobs.
Ah you're right! I forgot about our atexit handler. I've seen it in the past where short-lived forked processes do not send spans. Not sure why it's not firing in subprocesses 🤔.
I'll also have to look deeper into the semantics of atexit. Agreed, it's not ideal to have the try-finally logic to get your spans sent.
FYI to keep tabs on the full solution, I received this from Datadog support on Aug 6, 2020:
Taking a look at your request, we can confirm that currently the lost spans are being caused by
atexitnot firing up in subprocesses. While the update to 0.37 assures that forked processes are now supported, we agree that the workaround per last comment, of activating context manually with try-finally logic, is not ideal.We're going to include this change as a part of supporting multiprocessing in Python, and start work on it this October. You can use this existing ticket, or follow the attached PR to track progress.
My hypothesis here is that it's the usage of daemonic threads for the writing threads. We end up trying to replicate the behaviour of normal threads which looks like it could break down when the interpreter shuts down.
Unfortunately it's not completely trivial to change over to normal threads but it shouldn't be too bad. It's in our backlog but I'll see if I can make some time to investigate soon.
The request for proper mulitiprocessing support is in our team's feature backlog and is not planned for the short term. As part of the backlog it will be considered during feature prioritization.