ApplicationInsights-node.js
ApplicationInsights-node.js copied to clipboard
Service Bus Messages Have No Context #2
Hi,
We are using AI to track our system. We have service that receive some info in REST, and post it on Azure Service Bus, and other services consume this message. The send/receive of the same message have different operation ID, and they looks like they have different context.
In addition, the received message have different context than other http/s requests and logs on the same flow. Every trace/dependency on the 'receive' function seems to be without any parent.
Simple app to reproduce:
sender.js
:
const appInsights = require('applicationinsights');
appInsights.setup()
.setAutoCollectConsole(true, true)
.setSendLiveMetrics(true)
.start();
const express = require('express');
const axios = require('axios');
const {ServiceBusClient} = require('@azure/service-bus');
const WebSocket = require('ws');
const connectionString = "*****";
const sender = new ServiceBusClient(connectionString, {webSocketOptions: { webSocket: WebSocket }}).createSender('test');
const app = express();
app.post('/sendMessage', async (_req, res) => {
console.log('log');
await axios.default.get('https://google.com');
console.log('log 2');
await sender.sendMessages({subject: 'subject', body: {key:'value'}});
res.sendStatus(200);
});
app.listen(3080, '0.0.0.0', () => {console.log('started')});
receiver.js
:
const appInsights = require('applicationinsights');
appInsights.setup().setAutoCollectConsole(true, true).setSendLiveMetrics(true).start();
const axios = require('axios');
const { ServiceBusClient } = require('@azure/service-bus');
const WebSocket = require('ws');
const connectionString = '*****';
const receiver = new ServiceBusClient(connectionString, { webSocketOptions: { webSocket: WebSocket } }).createReceiver(
'test',
'test-subscription',
);
const myMessageHandler = async (message) => {
console.log(`message.body: ${JSON.stringify(message.body)}`);
await axios.default.get('https://google.com');
console.log('log 3');
};
const myErrorHandler = async (args) => {
console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};
receiver.subscribe({
processMessage: myMessageHandler,
processError: myErrorHandler,
});
This is the current result on Azure portal: Sender request:
Receiver side:
Thanks! cc: @orgads
@hectorhdzg Did you have a chance to look into this?
@bivas6, @orgads, Application Insights SDK will try to use the correlation context using incoming HTTP request headers, in this case receiver doesn't have any trace headers, is making a new outgoing HTTP request to Service Bus so a new context will be created, I guess you could add some correlation properties in the actual Service Bus message to be able to associate the telemetry but not much we can do here, regarding other issue where receiver telemetry is not in same context, automatically we will wrap all the telemetry in same context when there is a incoming HTTP request, in this case you don't have that so you will need to create your context and wrap all the code you want to share it, similar to what did in this PR you can also take a look at the code we share to enable this in Azure Functions
For those who needs it, this is the solution we used:
const messageHandlerWrapper = (handler) => {
return (message) => {
const traceParent = message.applicationProperties;
const diagnosticId = traceParent['Diagnostic-Id'];
const [_, traceId, spanId, _a] = diagnosticId.split('-');
const context = CorrelationContextManager.CorrelationContextManager.generateContextObject(
traceId,
spanId,
'ServiceBus.process',
);
const fn = appInsights.wrapWithCorrelationContext(() => handler(message), context);
await fn();
};
};
const myMessageHandler = async (message) => {
console.log(`message.body: ${JSON.stringify(message.body)}`);
await axios.default.get('https://google.com');
console.log('log 3');
};
const myErrorHandler = async (args) => {
console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};
receiver.subscribe({
processMessage: messageHandlerWrapper(myMessageHandler),
processError: messageHandlerWrapper(myErrorHandler),
});
@hectorhdzg Is it possible for you to hook in subscribe
automatically? We didn't add any data to the message, and just used applicationProperties[Diagnostic-Id]
.
@hectorhdzg ping?
@orgads the way we hook to Azure SDKs is through @azure/core-tracing package, we do not do any patching for specific SDKs, looks like "subscribe" functionality is only present in Azure Service Bus, we are planning to introduce full support for OpenTelemetry in this SDK that should handle this scenario because Azure SDKs are also using it, we are expecting to have a beta release soon.
Ok, thanks a lot.
Hello! I've been having the same issue as specified by @bivas6 .
The main workflow I was trying to fulfill using function apps was the following:
- have a sender component which will defer async work to some worker component through service bus
- the worker should receive service bus message, do certain calls to different microservices and fulfill the message
- I would like all the subcalls and logs to be in the same operation Id with minimal fuss
Option 1(didn't work):
I sent the sender traceparent inside the service bus message and generated a correlation context by using application insights exported classes like this
const traceParent = new Traceparent(serviceBusMessage.traceParent);
const correlationContext = CorrelationContextManager.generateContextObject(traceParent.traceId, traceParent.spanId, operationName);
return appInsights.wrapWithCorrelationContext(()=>{}, correlationContext)
I noted that the correlation context was properly created with this method by reusing the traceparent from the sender function, however I think there is some bug by using these classes because the end result was that a totally new context was generated inside appInsights.wrapWithCorrelationContext and the passed argument wasn't used properly. Subcalls from the receiver were also not being correlated at all and they generated new operation ids as well. I assume there might be some internals that I'm not aware off and this option won't work without some other setup which might be done inside appInsights.startOperation method.
Option 2(which works): I also sent the sender traceparent inside the service bus message to be reused by message consumers. Then, I am taking advantage of the mutable context of the function app and do the following:
export function contextPropagatingServiceBusTrigger(serviceBusTrigger: AzureFunction) {
return async (context: AuthenticatedContext, ...args: any[]) => {
const operationName = `ServiceBusReceive ${context.executionContext.functionName}`;
const serviceBusMessage = args[0];
//use message traceparent, or fallback to default app insights behaviour to use a freshly generated one
context.traceContext.traceparent = serviceBusMessage?.traceParent ?? context.traceContext.traceparent;
const correlationContext = appInsights.startOperation(context, context.executionContext.functionName);
// Wrap the Function runtime with correlationContext
return appInsights.wrapWithCorrelationContext(async () => {
const startTime = Date.now(); // Start trackRequest timer
try {
// execute the function
const result = await serviceBusTrigger(context, ...args);
appInsights.defaultClient.trackRequest({
name: operationName,
resultCode: 200,
success: true,
url: context.executionContext.functionName,
duration: Date.now() - startTime,
id: correlationContext.operation.parentId,
});
appInsights.defaultClient.flush();
return result;
} catch (err) {
// we might not need this try/catch, but it's good to have certain syntax errors which will be caught as well
appInsights.defaultClient.trackException({
exception: err,
});
appInsights.defaultClient.flush();
}
}, correlationContext)();
};
}
With this option, I am getting the same operationId correctly across the process initiator(sender), receiver + dependency calls.
My question now is if this approach is acceptable, since I wasn't able to find another way of accomplishing the correlation. I was inspired by insights docs for http triggers to reach this way of correlating services communication. Thanks a lot in advance, and I'm looking forward to hearing from you!
@hectorhdzg If you get a chance, I would be greatful to hear about this issue from you.
Check this out:
https://github.com/microsoft/ApplicationInsights-node.js/issues/973
@tudor33sud both options look fine to me, I can look into the issue you are having with option 1, that should work too, can you provide some test traceParent that you are passing in service bus message?
@hectorhdzg I am currently running our workloads exclusively on FunctionApps so it's pretty simple to pass a traceparent.
Assuming a typical AzureFunction , we do have the Context object which is accessible via context.traceContext.traceparent , so whatever the Function App will generate for that particular handler, we will just make sure to send it in the service bus payload like : {...customProperties, traceparent: context.traceContext.traceparent}.
For the purpose of testing, you could use any valid traceparent and you should see that in option 1 that it's not being used. I suspect startOperation does something more than just generating the context somehow, or there might be some cls context mapping situation.
I've been looking at getting correlation working correctly for service bus triggers in our environment.
It appears using the startOperation string overload is the intended way to support non-http trigger functions.
I found this information from previous issues and pull requests: #665 #672 #681 #682
Hope this helps anyone else digging for the same info.