Trace data is not passed between CF Workers communicating via CF Queue
Hello there,
Problem: The TraceId isn't persisted between workers communicating via CF Queue, when both are instrumented
I have been banging my head for quite some time with this issue, including checking out the source of this repo to see if I missed something or if there is some obvious implementation of this case, but I couldn't find anything.
My setup is the following:
Worker 1 - hono server which has some HTTP handlers (can PR this later, since I saw some other issue about hono instrumentation)
// src/index.ts
import { Hono } from 'hono';
import variationsRoute from './routes/variations';
import { corsMiddleware } from './middlewares/cors';
import { type ResolveConfigFn, instrument } from '@microlabs/otel-cf-workers';
export interface Env {
BASELIME_API_KEY: string
SERVICE_NAME: string
}
const app = new Hono<{ Bindings: Env }>();
// Apply CORS middleware globally
app.use('*', corsMiddleware);
// Mount routes
app.route('/variations', variationsRoute);
// Configure Baselime
const config: ResolveConfigFn = (env: Env, _trigger) => {
return {
exporter: {
url: 'https://otel.baselime.io/v1',
headers: { 'x-api-key': env.BASELIME_API_KEY },
},
service: { name: env.SERVICE_NAME },
}
}
// Export the instrumented handler
export default instrument(app, config);
In variationsRoute I have one HTTP call which obtains an array from external API and then for each element of the array I use env.QUEUE.send(element) ...
Worker 2 -- Triggerted by queue events emmited from Worker 1
const handler = {
async queue(batch: MessageBatch<MessageData>, env: Env): Promise<void> {
// some processing
},
};
// Configure Baselime
const config: ResolveConfigFn = (env: Env, _trigger) => {
return {
exporter: {
url: 'https://otel.baselime.io/v1',
headers: { 'x-api-key': env.BASELIME_API_KEY },
},
service: { name: env.SERVICE_NAME },
}
}
export default instrument(handler, config)
Worker 1 is instrumented almost correctly (HTTP call timing doesn't seem to match reality, but that's a separate issue...)
The QUEUE.send events are visible in the Observation backend.
The Queue processing events are visible in the Observation backend. However, they have a separate Trace ID and Span ID from the ones in Worker 1, so they cannot be correlated.
I started looking in manually passing the traceId as part of MessageData for the queue; however, then I had a hard time setting those values as root values for the handling tracing in Worker 2.
Is there some obvious solution to this that I am missing, or does it require writing some custom logic for TraceId extraction?
I faced a similar problem, but I forced myself to solve it with the following code...
In the Worker:
import { propagation, context, trace } from "@opentelemetry/api";
....
await trace.getTracer("worker").startActiveSpan("queue.enqueue", async (span) => {
const carrier = {};
propagation.inject(context.active(), carrier);
await c.env.WORKFLOW_BATCH_QUEUE.send({ ...payloads, carrier });
span.end();
});
In the Queue consumer:
import { propagation, context, trace, SpanKind } from "@opentelemetry/api";
...
export default {
async queue(batch, env, ctx): Promise<void> {
for (const message of batch.messages) {
const remoteCtx = propagation.extract(context.active(), message.body.carrier);
await trace
.getTracer("batch-queue")
.startActiveSpan("batch-queue.handler", { kind: SpanKind.CONSUMER }, remoteCtx, async (span) => {
...Queue logics
span.end();
});
}
}
}