ApplicationInsights-node.js icon indicating copy to clipboard operation
ApplicationInsights-node.js copied to clipboard

Service Bus Messages Have No Context #2

Open bivas6 opened this issue 3 years ago • 13 comments

Hi,

We are using AI to track our system. We have service that receive some info in REST, and post it on Azure Service Bus, and other services consume this message. The send/receive of the same message have different operation ID, and they looks like they have different context.

In addition, the received message have different context than other http/s requests and logs on the same flow. Every trace/dependency on the 'receive' function seems to be without any parent.

Simple app to reproduce:

sender.js:

const appInsights = require('applicationinsights');

appInsights.setup()
    .setAutoCollectConsole(true, true)
    .setSendLiveMetrics(true)
    .start();

const express = require('express');
const axios = require('axios');
const {ServiceBusClient} = require('@azure/service-bus');
const WebSocket = require('ws');
const connectionString = "*****";
const sender = new ServiceBusClient(connectionString, {webSocketOptions: { webSocket: WebSocket }}).createSender('test');
const app = express();
app.post('/sendMessage', async (_req, res) => {
    console.log('log');
    await axios.default.get('https://google.com');
    console.log('log 2');
    await sender.sendMessages({subject: 'subject', body: {key:'value'}});
    res.sendStatus(200);
});

app.listen(3080, '0.0.0.0', () => {console.log('started')});

receiver.js:

const appInsights = require('applicationinsights');

appInsights.setup().setAutoCollectConsole(true, true).setSendLiveMetrics(true).start();

const axios = require('axios');
const { ServiceBusClient } = require('@azure/service-bus');
const WebSocket = require('ws');

const connectionString = '*****';
const receiver = new ServiceBusClient(connectionString, { webSocketOptions: { webSocket: WebSocket } }).createReceiver(
  'test',
  'test-subscription',
);

const myMessageHandler = async (message) => {
  console.log(`message.body: ${JSON.stringify(message.body)}`);
  await axios.default.get('https://google.com');
  console.log('log 3');
};
const myErrorHandler = async (args) => {
  console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};
receiver.subscribe({
  processMessage: myMessageHandler,
  processError: myErrorHandler,
});

This is the current result on Azure portal: Sender request:

image

Receiver side:

image

Thanks! cc: @orgads

bivas6 avatar Feb 06 '22 15:02 bivas6

@hectorhdzg Did you have a chance to look into this?

orgads avatar Feb 09 '22 11:02 orgads

@bivas6, @orgads, Application Insights SDK will try to use the correlation context using incoming HTTP request headers, in this case receiver doesn't have any trace headers, is making a new outgoing HTTP request to Service Bus so a new context will be created, I guess you could add some correlation properties in the actual Service Bus message to be able to associate the telemetry but not much we can do here, regarding other issue where receiver telemetry is not in same context, automatically we will wrap all the telemetry in same context when there is a incoming HTTP request, in this case you don't have that so you will need to create your context and wrap all the code you want to share it, similar to what did in this PR you can also take a look at the code we share to enable this in Azure Functions

hectorhdzg avatar Feb 09 '22 17:02 hectorhdzg

For those who needs it, this is the solution we used:

const messageHandlerWrapper = (handler) => {
  return (message) => {
    const traceParent = message.applicationProperties;
    const diagnosticId = traceParent['Diagnostic-Id'];
    const [_, traceId, spanId, _a] = diagnosticId.split('-');
    const context = CorrelationContextManager.CorrelationContextManager.generateContextObject(
      traceId,
      spanId,
      'ServiceBus.process',
    );
    const fn = appInsights.wrapWithCorrelationContext(() => handler(message), context);
    await fn();
  };
};

const myMessageHandler = async (message) => {
  console.log(`message.body: ${JSON.stringify(message.body)}`);
  await axios.default.get('https://google.com');
  console.log('log 3');
};

const myErrorHandler = async (args) => {
  console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};

receiver.subscribe({
  processMessage: messageHandlerWrapper(myMessageHandler),
  processError: messageHandlerWrapper(myErrorHandler),
});

bivas6 avatar Feb 21 '22 14:02 bivas6

@hectorhdzg Is it possible for you to hook in subscribe automatically? We didn't add any data to the message, and just used applicationProperties[Diagnostic-Id].

orgads avatar Feb 21 '22 14:02 orgads

@hectorhdzg ping?

orgads avatar Mar 01 '22 20:03 orgads

@orgads the way we hook to Azure SDKs is through @azure/core-tracing package, we do not do any patching for specific SDKs, looks like "subscribe" functionality is only present in Azure Service Bus, we are planning to introduce full support for OpenTelemetry in this SDK that should handle this scenario because Azure SDKs are also using it, we are expecting to have a beta release soon.

hectorhdzg avatar Mar 01 '22 21:03 hectorhdzg

Ok, thanks a lot.

orgads avatar Mar 01 '22 21:03 orgads

Hello! I've been having the same issue as specified by @bivas6 .

The main workflow I was trying to fulfill using function apps was the following:

  1. have a sender component which will defer async work to some worker component through service bus
  2. the worker should receive service bus message, do certain calls to different microservices and fulfill the message
  3. I would like all the subcalls and logs to be in the same operation Id with minimal fuss

Option 1(didn't work):

I sent the sender traceparent inside the service bus message and generated a correlation context by using application insights exported classes like this

const traceParent = new Traceparent(serviceBusMessage.traceParent);
const correlationContext = CorrelationContextManager.generateContextObject(traceParent.traceId, traceParent.spanId, operationName);
return appInsights.wrapWithCorrelationContext(()=>{}, correlationContext)

I noted that the correlation context was properly created with this method by reusing the traceparent from the sender function, however I think there is some bug by using these classes because the end result was that a totally new context was generated inside appInsights.wrapWithCorrelationContext and the passed argument wasn't used properly. Subcalls from the receiver were also not being correlated at all and they generated new operation ids as well. I assume there might be some internals that I'm not aware off and this option won't work without some other setup which might be done inside appInsights.startOperation method.

Option 2(which works): I also sent the sender traceparent inside the service bus message to be reused by message consumers. Then, I am taking advantage of the mutable context of the function app and do the following:

export function contextPropagatingServiceBusTrigger(serviceBusTrigger: AzureFunction) {
    return async (context: AuthenticatedContext, ...args: any[]) => {
        const operationName = `ServiceBusReceive ${context.executionContext.functionName}`;
        const serviceBusMessage = args[0];
        //use message traceparent, or fallback to default app insights behaviour to use a freshly generated one
        context.traceContext.traceparent = serviceBusMessage?.traceParent ?? context.traceContext.traceparent;
        const correlationContext = appInsights.startOperation(context, context.executionContext.functionName);

        // Wrap the Function runtime with correlationContext
        return appInsights.wrapWithCorrelationContext(async () => {
            const startTime = Date.now(); // Start trackRequest timer
            try {
                // execute the function
                const result = await serviceBusTrigger(context, ...args);
                appInsights.defaultClient.trackRequest({
                    name: operationName,
                    resultCode: 200,
                    success: true,
                    url: context.executionContext.functionName,
                    duration: Date.now() - startTime,
                    id: correlationContext.operation.parentId,
                });
                appInsights.defaultClient.flush();
                return result;
            } catch (err) {
                // we might not need this try/catch, but it's good to have certain syntax errors which will be caught as well
                appInsights.defaultClient.trackException({
                    exception: err,
                });
                appInsights.defaultClient.flush();
            }

        }, correlationContext)();
    };
}

With this option, I am getting the same operationId correctly across the process initiator(sender), receiver + dependency calls.

My question now is if this approach is acceptable, since I wasn't able to find another way of accomplishing the correlation. I was inspired by insights docs for http triggers to reach this way of correlating services communication. Thanks a lot in advance, and I'm looking forward to hearing from you!

tudor33sud avatar Jun 27 '22 06:06 tudor33sud

@hectorhdzg If you get a chance, I would be greatful to hear about this issue from you.

tudor33sud avatar Jun 30 '22 10:06 tudor33sud

Check this out:

https://github.com/microsoft/ApplicationInsights-node.js/issues/973

johnib avatar Jun 30 '22 21:06 johnib

@tudor33sud both options look fine to me, I can look into the issue you are having with option 1, that should work too, can you provide some test traceParent that you are passing in service bus message?

hectorhdzg avatar Jul 06 '22 21:07 hectorhdzg

@hectorhdzg I am currently running our workloads exclusively on FunctionApps so it's pretty simple to pass a traceparent.

Assuming a typical AzureFunction , we do have the Context object which is accessible via context.traceContext.traceparent , so whatever the Function App will generate for that particular handler, we will just make sure to send it in the service bus payload like : {...customProperties, traceparent: context.traceContext.traceparent}.

For the purpose of testing, you could use any valid traceparent and you should see that in option 1 that it's not being used. I suspect startOperation does something more than just generating the context somehow, or there might be some cls context mapping situation.

tudor33sud avatar Jul 07 '22 13:07 tudor33sud

I've been looking at getting correlation working correctly for service bus triggers in our environment.

It appears using the startOperation string overload is the intended way to support non-http trigger functions.

I found this information from previous issues and pull requests: #665 #672 #681 #682

Hope this helps anyone else digging for the same info.

haodeon avatar Mar 03 '23 19:03 haodeon