opencensus-python
opencensus-python copied to clipboard
AzureExporter not working with multiprocessing
Describe your environment. MacOS 10.14.6 Python 3.7.5 opencensus-ext-azure==1.0.4 opencensus-ext-requests==0.7.3
Steps to reproduce.
I have code that I want to monitor the dependency calls to Azure DevOps APIs. Our code is running multiprocessing using the Process
class. When the exporter is ran outside of multiprocessing, it sends telemetry to App Insights. When ran inside a multiprocessing Process, it doesn't. I added a callback to print the spandata and it doesn't get called when using Process
.
from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication
from multiprocessing import Process, Pool, Queue
from base_insights import BaseInsights
from opencensus.common.runtime_context import RuntimeContext
class TestInsights:
def __init__(self):
self.tracer = BaseInsights.tracer
def process(self):
procs = []
organization_url = 'https://dev.azure.com/org'
credentials = BasicAuthentication('', '')
p1 = Process(target=self.my_loop, args=[organization_url, credentials])
p1.start()
p1.join()
def my_loop(self, organization_url, credentials, parent_span=None):
with self.tracer.span(name='TestLoopProcessThreadingInside'):
connection = Connection(base_url=organization_url, creds=credentials)
core_client = connection.clients.get_core_client()
org = core_client.get_project_collection("test")
TestInsights().process()
BaseInsights:
import os
from opencensus.ext.azure.trace_exporter import AzureExporter
from opencensus.trace import file_exporter
from opencensus.trace import config_integration
from opencensus.trace.samplers import ProbabilitySampler, AlwaysOnSampler
from opencensus.trace.tracer import Tracer
config_integration.trace_integrations(['requests'])
def singleton(cls):
return cls()
@singleton
class BaseInsights:
def __init__(self):
exporter = AzureExporter()
exporter.add_telemetry_processor(self.callback_function)
self.tracer = Tracer(exporter=exporter, sampler=AlwaysOnSampler())
def callback_function(self, envelope):
print(envelope)
What is the expected behavior? Span data gets sent to Application Insights
What is the actual behavior? Span data is not sent to Application Insights
@dpgrodriguez I created a more simplified version of your first code snippet by taking out the calls to devops API. The span datas are printed fine and I can see the dependencies in App Insights. I think the process might be exiting too quickly for the exporter to actually export the data. The default export interval is 15seconds. You can try modifying that or setting a delay within your new process.
exporter = AzureExporter(export_interval=5.0)
import time
from multiprocessing import Process
from base_insights import BaseInsights
class TestInsights:
def __init__(self):
self.tracer = BaseInsights.tracer
def my_loop():
with BaseInsights.tracer.span(name='TestLoopProcessThreadingInside'):
print("inside")
time.sleep(10)
if __name__ == '__main__':
with BaseInsights.tracer.span(name='TestOutside'):
print("outside")
time.sleep(10)
p1 = Process(target=my_loop)
p1.start()
p1.join()
Hi Leighton,
I tried your suggestion but still cannot send span data from inside multiprocess. I used your exact code and this is what was displayed:
outside
{...redacted envelope data from outside}
inside
The only span data generated was from "outside".
I also tried removing the parent span from __main__
:
if __name__ == '__main__':
# with BaseInsights.tracer.span(name='TestOutside'):
print("outside")
time.sleep(10)
p1 = Process(target=my_loop)
p1.start()
p1.join()
with these results:
outside
inside
The data from inside multiprocessing also wasn't sent to Applicaiton Insights
These are the versions that I'm using: opencensus==0.7.10 opencensus-context==0.1.1 opencensus-ext-azure==1.0.4 opencensus-ext-requests==0.7.3 opencensus-ext-threading==0.1.2
Did you set the export_interval
of the exporter? The whole execution from start to finish should take around 20 seconds.
Yes, tried setting it to 5 and 1.
Any other recommendations?
Perhaps it's the way you are setting up BaseInsights
? I simplified it even further. Try to see if it works for you.
import time
from multiprocessing import Process
from opencensus.trace.samplers import AlwaysOnSampler
from opencensus.trace.tracer import Tracer
from opencensus.ext.azure.trace_exporter import AzureExporter
exporter = AzureExporter(export_interval=5)
tracer = Tracer(exporter=exporter, sampler=AlwaysOnSampler())
def callback_function(envelope):
print(envelope)
exporter.add_telemetry_processor(callback_function)
def my_loop():
with tracer.span(name='TestLoopProcessThreadingInside'):
print("inside")
time.sleep(10)
def main():
with tracer.span(name='TestOutside'):
print("outside")
time.sleep(10)
p1 = Process(target=my_loop)
p1.start()
p1.join()
if __name__ == '__main__':
main()
I tried running this code on 4 different environments.
CentOS 7.8 - Python 2.7.5 CentOS 7.8 - Python 3.6.8 macOS 10.14.6 - Python 2.7.16 macOS 10.14.6 - Python 3.7.5
I got the same results. Only INPROC: TestOutside
is being logged in Application Insights.
Is there a way to show lower level debug messages?
@dpgrodriguez
I did some debugging and it looks like the multiprocessing
module behaves a little differently on Windows then macOC/linux. On Windows, when a new process is created, all threads are copied over into the new address space (including the worker that sends telemetry to Azure Monitor), however when I ran the same code in Linux, only the calling thread was copied. The multiprocessing
module uses os.fork()
underneath for POSIX systems (which includes linux), and from the documentation of fork: A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources
. Whereas, Windows will spin up an entirely new Process, and tell it to load all the modules again (which in turn creates another AzureExporter and worker).
So the only way to get this working for your OS is to duplicate the initialization logic of your AzureExporter
and Tracer
into the function that is run in the spawned Process.
That's some interesting findings. I did some digging as well.
In BaseExporter
(opencensus.ext.azure.common.exporter.py
), it looks like the items are actually being passed into the export
method and into the queue even inside the multiprocess Process
.
def export(self, items):
self._queue.puts(items, block=False) # pragma: NO COVER
However, in the Worker
's run
method, as it tries to get the batch
from the queue, it cannot find the items anymore:
class Worker(threading.Thread):
daemon = True
...
...
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
Any thoughts on this? Could it be that the threading Queue is not playing well with multiprocessing?
@dpgrodriguez Yes there is nothing in the queue because the Worker that fills them up doesn't exist.
The worker is actually present but doesn't see the contents of the threading queue because it can't access it. I did some quick PoC of using the multiprocess Queue and was able to send logs to analytics.
from multiprocessing import Queue as MP_Queue
import jsonpickle
...redacted...
# queue, or shared workers among queues (e.g. queue for traces, queue
# for logs).
def export(self, items):
json_items = jsonpickle.encode(items)
# Put items on both multiprocessing and threading queues
self._mp_queue.put(json_items)
self._queue.puts(items, block=False) # pragma: NO COVER
... redacted ...
def run(self): # pragma: NO COVER
# Indicate that this thread is an exporter thread.
# Used to suppress tracking of requests in this thread.
execution_context.set_is_exporter(True)
src = self.src
dst = self.dst
while True:
batch = src.gets(dst.max_batch_size, dst.export_interval)
# Check if batch results in empty tuple and check multiprocessing queue for contents
if batch == ():
try:
json_items = dst._mp_queue.get()
batch = tuple(jsonpickle.decode(json_items))
except Exception as e:
pass
if batch and isinstance(batch[-1], QueueEvent):
... redacted ...
This works and sends the multiprocessed SpanData into insights.
I am facing the same issue while utilizing multiprocessing through joblib. Mine is a pandas based ML Pipeline where work is distributed across multiple processes using joblib ("multiprocessing" back end).
with Parallel(n_jobs=4, backend="multiprocessing") as parallel:
parallel(delayed(_handle_blob)(name) for name in blob_paths)
joblib with "multiprocessing" backend successfully logs to a file on the disk across multiple processes. But, when I add "AzureLogHandler", I don't see any logs sent to Azure Application Insights.
I have used the workaround proposed in the thread. It works. But finally it's a work around.
It would be great if this issue can be prioritized and addressed in Open-Census.
@arnabbiswas1
I am not too familiar with jobib but it seems like the worker fork behaviour is similar to the other multiprocessing libraries. As of today there is not a clean solution to address this issue, at least from the SDK side. I believe some mp/web server libraries provide hooks in which you can run logic right after a child process is created (like gunicorn). Looking at joblib docs, maybe you can look into whether Parallel
class allows for some pre-processing of the workers that are spawned (the backend
parameter).
If you are interested in using other technologies (such as Gunicorn), take a look at the examples in OpenTelemetry.
@lzchen Thanks for the pointers.
Right now we are using "multiprocessing" as a backend. The other backend which we can use is "loky". That has the same logging issue, even with log files on the disk.
with Parallel(n_jobs=4, backend="multiprocessing") as parallel:
parallel(delayed(_handle_blob)(name) for name in blob_paths)
In general, for various priorities, I can't spend much time in this issue at this point of time. Considering that some flavor of multiprocessing is very common in Python based applications and Microsoft is using this library for Azure Monitor/Application Insight, I thought there may be some work arounds or solutions.
Thanks for your support. Please keep me posted, in case, you encounter with some other solutions in future.
Is there any resolution for integrating AzureLogHandler and AzureEventHandler objects with the multithreaded process? I have been trying to send intermediate loggers to App Insights from a threaded process every 5 secs, I have utilized a workaround to create the AzureEventHandler at the process ID level, however, when the main thread closes, the opencensus BaseLogHandler class raises an error as such:
The workaround has resulted in successful uploading of intermediate logs to App Insights, the only issue is faced when the Main thread closes. Please do let me know if any resolution to this is possible?
https://github.com/census-instrumentation/opencensus-python/issues/928#issuecomment-679497439
Can you please share a working sample of how you managed to integrate Azure Exporter / Azure Event handler / Azure Log handler to work in a multiprocessing setup? That would help me a lot in resolving the issue which I am facing upon using the Azure Event handler under multiple processes. FYI @dpgrodriguez
I was experiencing the same issue (with Celery) and I figured out that it's not specifically related to the AzureExporter, but to the opencensus.common.schedule.Queue
@dpgrodriguez helped me on the way
Simply changing self._queue = queue.Queue(maxsize=capacity)
to self._queue = multiprocessing.Queue(maxsize=capacity)
in https://github.com/census-instrumentation/opencensus-python/blob/master/opencensus/common/schedule/init.py#L85 does the trick for me.
Can someone confirm this/help me out with a pull request?
Any news on this? 👀 We're having the same problem with Celery and would really appreciate to have an official solution
@giulianabaratto
There is a draft pr open and the issue should be addressed once it is merged and released.
@giulianabaratto
There is a draft pr open and the issue should be addressed once it is merged and released.
Would you be able to share a link to this PR? I would be nice to be able to check if it has been merged or not...
https://github.com/census-instrumentation/opencensus-python/pull/1158