azure-sdk-for-python icon indicating copy to clipboard operation
azure-sdk-for-python copied to clipboard

don't know how to set operation_Id, operation_ParentId and operation_Name in logger (using configure_azure_monitor from azure.monitor.opentelemetry)

Open eeertel opened this issue 1 year ago • 9 comments

  • Package Name: azure-monitor-opentelemetry
  • Package Version: 1.23
  • Operating System: Windows and Linux:
  • Python Version: 3.11

Describe the bug When logging from an Azure Function using configure_azure_monitor to create a logger, I don't know how to/cannot set the fields operation_Id, operation_ParentId and operation_Name that are displayed in AppInsights

To Reproduce Steps to reproduce the behavior:

  1. Create the logger
conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
configure_azure_monitor(
    connection_string=conn_string,
    logger_name=__name__,       
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
  1. log something:
logger.info(f">>> Worker {worker_index} started >>>")

Expected behavior I would like to be able to set (when creating the logger) the context information (that is passed to an Azure Function) that contains the operation_Id, operation_name, etc. when creating the logger

Screenshots AppInsightsTraces

Additional context A sample Azure Function is depicted below. Please note that since the Azure FX is using multiprocessing the original Azure FX logger cannot be passed to the multiprocessing.Process instances as parameter (it is not pickable) . Each instance needs to create its own logger. The goal is to have a uniform logging experience in AppInsights.

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)
    
    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")
    
    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")
        
class TestMultiprocessing:
  
  _QUEUE_END_MARKER                         = "__END_OF_QUEUE__"
  _RET_CODE_KEY                             = "ret_code"
  
  def __init__(self, logger, context, **kwargs):

    self.logger = logger
    self.number_threads = 2

  def process_job(self, worker_index):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=__name__,       
    )
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    
    logger.info(f">>> Worker {worker_index} started >>>")
    time.sleep(5)
    logger.info(f"<<< Worker {worker_index} finished <<<")
    
    for handler in logger.handlers:
      handler.flush()
    
    return

  def process_in_parallel(self):
    from multiprocessing import Process
  
    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")
        
    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()
      
    self.logger.info(f"Finished all {self.number_threads} thread(s)")
    
    return 

eeertel avatar Mar 06 '24 15:03 eeertel

Thanks for the feedback @eeertel . We will investigate and get back to you asap.

kashifkhan avatar Mar 06 '24 17:03 kashifkhan

In a functions environment, you need to extract the current trace context from the context and set it as the current span context. See this example. Then put your logging.info() messages inside the with tracer.start_as_current_span... context manager. Your logs should then be correlated with your function app calls.

lzchen avatar Mar 06 '24 21:03 lzchen

thank you @lzchen - the tracer.start_as_current ... has indeed solved the problem with the operation id (see below: trace_span

but what I still miss is the operation_Name. Is there a possibility to have that also sent to AppInsights ?

The complete code that I used for this is here:

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry.propagate import extract

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)
    
    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")
    
    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")
        
class TestMultiprocessing:
  
  _QUEUE_END_MARKER                         = "__END_OF_QUEUE__"
  _RET_CODE_KEY                             = "ret_code"
  
  def __init__(self, logger, context, **kwargs):

    self.traceparent = context.trace_context.Traceparent
    self.tracestate = context.trace_context.Tracestate
    self.logger = logger
    self.number_threads = 2

  def process_job(self, worker_index, traceparent, tracestate):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=__name__,       
    )
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    
    carrier = {
        "traceparent": traceparent,
        "tracestate": tracestate,
    }
    tracer = trace.get_tracer(__name__)
    
    # Start a span using the current context
    with tracer.start_as_current_span(
        "http_trigger_span",
        context=extract(carrier),
    ):    
    
      logger.info(f">>> Worker {worker_index} started >>>")
      time.sleep(5)
      logger.info(f"<<< Worker {worker_index} finished <<<")
      
      for handler in logger.handlers:
        handler.flush()
    
    return

  def process_in_parallel(self):
    from multiprocessing import Process
  
    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")
        
    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
          self.traceparent,
          self.tracestate,
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()
      
    self.logger.info(f"Finished all {self.number_threads} thread(s)")
    
    return 

eeertel avatar Mar 06 '24 22:03 eeertel

Hi @eeertel. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

github-actions[bot] avatar Mar 07 '24 16:03 github-actions[bot]

From my point of view the solution mentioned above does not solve the issue because of the following use case:

  • when using query alert rules to monitor an Azure Function, the operation_Name is an essential part of the query definition.

Without a valid operation_Name a trace in Application Insights that points to an execution failure cannot be "connected" to a certain function (which makes it impossible to define an alert rule)

An example for such a query alert rule is shown below:

traces
| where operation_Name == "test_multiprocessor"
| where severityLevel > 1

eeertel avatar Mar 07 '24 20:03 eeertel

@eeertel

I am currently investigating. Please ignore the automated message.

lzchen avatar Mar 07 '24 22:03 lzchen

@eeertel Operation Name is mapped from the OpenTelemetry span.name. So, by default it will look something like "GET /some-uri" for a given GET request. If you want to override Operation Name or any other span attribute that is autopopulated, you can do so with a custom span processor. Here are some examples: https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry-add-modify?tabs=python#add-a-custom-property-to-a-span

jeremydvoss avatar Mar 07 '24 22:03 jeremydvoss

@jeremydvoss unfortunately the example using the span processor does not work as expected with Application Insights. I can see in the debugger that the on_end function is invoked, but neither the custom dimension nor the span name have any effects on the data displayed in Application Insights.

I have exported the AppInisghts output and attached it here: app_insights_export.csv

The code used to generate this is shown below. My feeling is that the logging data is filtered by some entity along the way. If I look at context.traceparent that is extracted from the context - it is actually a strange combination 00-<operation_Id>-<operation_ParentId>-00 and context.tracestate is empty

If I use these values to set in the carrier (line 72 and 73) - somehow AppInsights displays the correct operation_id but a complete random operation_ParentId

Please let me know if you need any more details to reproduce this behavior.

Thank you in advance,

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.sdk.trace import SpanProcessor

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)
    
    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")
    
    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")
     
     
class SpanEnrichingProcessor(SpanProcessor):

  def __init__(self, function_name, **kwargs):
    self.function_name = function_name

  def on_end(self, span):
      # Set the function name
      span._name = self.function_name
      # test one custom dimension
      span._attributes["foo"] = "bar"
      # test setting the client ip address
      span._attributes["http.client_ip"] = "1.1.1.1"
        
class TestMultiprocessing:
  
  def __init__(self, logger, context, **kwargs):

    self.traceparent = context.trace_context.Traceparent
    self.tracestate = context.trace_context.Tracestate
    self.function_name = context.function_name
    self.logger = logger
    self.number_threads = 2
    self.logger.info(f"Initialized with traceparent: {self.traceparent} and tracestate: {self.tracestate} and function name: {self.function_name}")

  def process_job(self, worker_index):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    span_enrich_processor = SpanEnrichingProcessor(self.function_name)
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=self.function_name,
        # Configure the custom span processors to include span enrich processor.
        span_processors=[span_enrich_processor],          
    )
    logger = logging.getLogger(self.function_name)
    logger.setLevel(logging.DEBUG)
    
    carrier = {
        "traceparent": self.traceparent,
        "tracestate": self.tracestate,
    }
    tracer = trace.get_tracer(self.function_name)
    
    # Start a span using the current context
    with tracer.start_as_current_span(
        "timer_trigger_span",
        context=extract(carrier),
    ):    
    
      logger.info(f">>> Worker {worker_index} started >>>")
      time.sleep(5)
      logger.info(f"<<< Worker {worker_index} finished <<<")
      
      for handler in logger.handlers:
        handler.flush()
    
    return

  def process_in_parallel(self):
    from multiprocessing import Process
  
    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")
        
    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()
      
    self.logger.info(f"Finished all {self.number_threads} thread(s)")
    
    return 

eeertel avatar Mar 08 '24 14:03 eeertel

@eeertel I see. I misunderstood. I was only talking about changing Operation Name in OpenTelemetry Spans which become Application Insights "requests". I will investigate if it's possible to set Operation Name in log traces with the SDK

jeremydvoss avatar Mar 08 '24 19:03 jeremydvoss

Hi @eeertel, since you haven’t asked that we /unresolve the issue, we’ll close this out. If you believe further discussion is needed, please add a comment /unresolve to reopen the issue.

github-actions[bot] avatar Mar 15 '24 22:03 github-actions[bot]

/unresolve

@jeremydvoss did you manage to find out if it is possible to set Operation Name in log traces with the SDK ?

eeertel avatar Mar 15 '24 22:03 eeertel

I've confirmed that log SDK does not currently map Operation Name from anything. So, I'm marking this as a feature request.

jeremydvoss avatar Mar 18 '24 22:03 jeremydvoss

Hello, any progress on this?

gkeuccsr avatar May 17 '24 12:05 gkeuccsr

There is no current plan to enable setting Operation Name for Application Insights "Trace Logs"

jeremydvoss avatar May 28 '24 18:05 jeremydvoss

I ended up monkey-patching the code:

import azure.monitor.opentelemetry.exporter.export.logs._exporter as xxx_logs


def patch_34655(ai_operation_name: str) -> None:
    """
    Patching the AzureMonitorLogExporter, see https://github.com/Azure/azure-sdk-for-python/issues/34655
    """
    # pylint: disable=protected-access
    orig_convert_log_to_envelope = xxx_logs._convert_log_to_envelope

    # pylint: disable=import-outside-toplevel
    from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
    from opentelemetry.sdk._logs import LogData

    def custom_convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
        """Add operation_Name to the envelope"""
        envelope = orig_convert_log_to_envelope(log_data)
        envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = ai_operation_name
        return envelope

    xxx_logs._convert_log_to_envelope = custom_convert_log_to_envelope


patch_34655(ai_operation_name="fubar")

gkeuccsr avatar Jun 04 '24 09:06 gkeuccsr

Thanks @gkeuccsr for providing the monkey patch woraround.

@jeremydvoss I would expect this to be a fundamental feature to be supported when using azure functions and would appreciate if this could be put on the roadmap.

claria avatar Jun 26 '24 05:06 claria

@gkeuccsr Thanks for the awesome workaround. Is there any way where we can plug this code directly into the snippet?

        exporter = AzureMonitorLogExporter(
            connection_string=os.environ["My__Azure__ApplicationInsights__ConnectionString"],
        )
        self.logger_provider.add_log_record_processor(BatchLogRecordProcessor(
            exporter,
            schedule_delay_millis=EXPORT_INTERVAL_IN_MS,
        ))

FYI: Raised a similar issue like this: https://github.com/open-telemetry/opentelemetry-python/issues/3635

sudharsan2020 avatar Jun 29 '24 12:06 sudharsan2020