otel-cf-workers icon indicating copy to clipboard operation
otel-cf-workers copied to clipboard

Trace data is not passed between CF Workers communicating via CF Queue

Open R2D34 opened this issue 10 months ago • 1 comments

Hello there,

Problem: The TraceId isn't persisted between workers communicating via CF Queue, when both are instrumented

I have been banging my head for quite some time with this issue, including checking out the source of this repo to see if I missed something or if there is some obvious implementation of this case, but I couldn't find anything.

My setup is the following:

Worker 1 - hono server which has some HTTP handlers (can PR this later, since I saw some other issue about hono instrumentation)

// src/index.ts

import { Hono } from 'hono';
import variationsRoute from './routes/variations';
import { corsMiddleware } from './middlewares/cors';
import { type ResolveConfigFn, instrument } from '@microlabs/otel-cf-workers';

export interface Env {
    BASELIME_API_KEY: string
    SERVICE_NAME: string
}

const app = new Hono<{ Bindings: Env }>();

// Apply CORS middleware globally
app.use('*', corsMiddleware);
// Mount routes
app.route('/variations', variationsRoute);


// Configure Baselime
const config: ResolveConfigFn = (env: Env, _trigger) => {
    return {
        exporter: {
            url: 'https://otel.baselime.io/v1',
            headers: { 'x-api-key': env.BASELIME_API_KEY },
        },
        service: { name: env.SERVICE_NAME },
    }
}

// Export the instrumented handler
export default instrument(app, config);

In variationsRoute I have one HTTP call which obtains an array from external API and then for each element of the array I use env.QUEUE.send(element) ...

Worker 2 -- Triggerted by queue events emmited from Worker 1

const handler = {
  
	async queue(batch: MessageBatch<MessageData>, env: Env): Promise<void> {
// some processing

	},
};

// Configure Baselime
const config: ResolveConfigFn = (env: Env, _trigger) => {
  return {
      exporter: {
          url: 'https://otel.baselime.io/v1',
          headers: { 'x-api-key': env.BASELIME_API_KEY },
      },
      service: { name: env.SERVICE_NAME },
  }
}


export default instrument(handler, config)

Worker 1 is instrumented almost correctly (HTTP call timing doesn't seem to match reality, but that's a separate issue...)

The QUEUE.send events are visible in the Observation backend.

The Queue processing events are visible in the Observation backend. However, they have a separate Trace ID and Span ID from the ones in Worker 1, so they cannot be correlated.

I started looking in manually passing the traceId as part of MessageData for the queue; however, then I had a hard time setting those values as root values for the handling tracing in Worker 2.

Is there some obvious solution to this that I am missing, or does it require writing some custom logic for TraceId extraction?

R2D34 avatar Feb 13 '25 15:02 R2D34

I faced a similar problem, but I forced myself to solve it with the following code...

In the Worker:

import { propagation, context, trace } from "@opentelemetry/api";

....

await trace.getTracer("worker").startActiveSpan("queue.enqueue", async (span) => {
  const carrier = {};
  propagation.inject(context.active(), carrier);

  await c.env.WORKFLOW_BATCH_QUEUE.send({ ...payloads, carrier });
  span.end(); 
});

In the Queue consumer:

import { propagation, context, trace, SpanKind } from "@opentelemetry/api";

...

export default {
  async queue(batch, env, ctx): Promise<void> {
    for (const message of batch.messages) {
      const remoteCtx = propagation.extract(context.active(), message.body.carrier);

      await trace
        .getTracer("batch-queue")
        .startActiveSpan("batch-queue.handler", { kind: SpanKind.CONSUMER }, remoteCtx, async (span) => {
          ...Queue logics
          span.end(); 
        });
    }
  }
}

inaridiy avatar Apr 26 '25 07:04 inaridiy