sentry-javascript icon indicating copy to clipboard operation
sentry-javascript copied to clipboard

Insights: auto-Instrument BullMQ for Queue Insights

Open bcoe opened this issue 1 year ago • 10 comments

Problem Statement

Similar to Celery for Python, BullMQ seems like it would be a good choice to auto-instrument for queue insights, see: https://github.com/getsentry/sentry-javascript/discussions/10148, along with conversations on Twitter:

Screenshot 2024-07-17 at 10 53 00 AM Screenshot 2024-07-17 at 10 53 19 AM Screenshot 2024-07-17 at 10 53 11 AM

Solution Brainstorm

We should model an implementation after the implementation for Celery.

bcoe avatar Jul 17 '24 15:07 bcoe

There is OTEL instrumentation we can leverage here: https://github.com/appsignal/opentelemetry-instrumentation-bullmq

AbhiPrasad avatar Jul 17 '24 15:07 AbhiPrasad

In the time being, we can probably write some docs that uses the otel instrumentation manually with bullmq. So users would have to install opentelemetry-instrumentation-bullmq themselves and then call span.setAttribute themselves.

@bcoe for an intermediate solution what do you think?

AbhiPrasad avatar Jul 17 '24 16:07 AbhiPrasad

Hi! Our team is also interested in instrumenting our BullMQ workloads running inside of NestJS. I know close to nothing about OTEL or about how Sentry works, but I've been playing with the OTEL instrumentation package that you referenced, and got something, but not much.

What I've done is basically trying to integrate the OTEL instrumentation with @sentry/node by looking at how other integrations work. The following snippet defines a BullMQ integration based off the PG tracing code:

import { BullMQInstrumentation } from '@appsignal/opentelemetry-instrumentation-bullmq'
import { defineIntegration } from '@sentry/core'
import { generateInstrumentOnce } from '@sentry/node'
import type { IntegrationFn } from '@sentry/types'

const INTEGRATION_NAME = 'BullMQ'

export const instrumentBullMQ = generateInstrumentOnce(
  INTEGRATION_NAME,
  () =>
    new BullMQInstrumentation({}),
)

const _bullmqIntegration = (() => {
  return {
    name: INTEGRATION_NAME,
    setupOnce() {
      instrumentBullMQ()
    },
  }
}) satisfies IntegrationFn

/**
 * BullMQ integration
 *
 * Capture tracing data for BullMQ.
 */
export const bullmqIntegration = defineIntegration(_bullmqIntegration)

With this snippet I can add the bullmqIntegration to my integrations array in Sentry.init (which also contains nodeProfilingIntegration()), so that when I trigger a Bull job, I see a transaction of type message under the Performance section of Sentry, with these relevant attributes:

messaging.operation: process,
messaging.system: bullmq,
otel.kind: CONSUMER,
sentry.op: message,
sentry.origin: manual,

However, this transaction is empty. Its duration is just a few milliseconds (shorter than the job duration), and it contains no spans at all, even though there are PG queries inside (PG queries are correctly reported in our regular application tracing). We also see no reference to the producing side of the job, nor we see anything under the Queues section of Sentry.

Not sure if you were able to get started on those docs you mentioned, but if you have some tips that could unblock us, we'd appreciate it 🙏

iacobus avatar Aug 22 '24 16:08 iacobus

Hello, thanks for writing in. We are on company-wide hackweek and thus on limited support this week. We'll take a look at this next week.

andreiborza avatar Aug 23 '24 08:08 andreiborza

Oh cool, have fun and thanks!

iacobus avatar Aug 23 '24 09:08 iacobus

Actually, upon further debugging, I came across a very subtle bug in our instrumented code where we were not using async functions correctly. Once I've fixed that, I do see database spans inside of the message span created by the OTEL instrumentation, and I also see spans for the enqueuing phase of the job lifecycle, so I think the integration code I shared above is actually sufficient to get tracing working.

We still don't see anything under the Queues feature, so would still appreciate a note in that regard, but my guess is that the OTEL instrumentation is not precisely compatible with what Sentry expects to show data under Queues?

iacobus avatar Aug 23 '24 13:08 iacobus

@iacobus Thanks for the patience! Very cool that you got the integration working yourself. The likely reason you're not getting any data in the queues module yet is that the performance data needs to have a certain schema to show up under that module. Usually, it is the span op and/or span attributes that need to follow a certain convention.

For queues, this convention can be found here: https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/ (these are our developer docs)

Until we provide support for bullmq out of the box you may be able to hack this in. Keep us posted!

lforst avatar Aug 26 '24 08:08 lforst

Thanks @lforst. As relevant context for your team, it looks like the instrumentation library under discussion has opted to follow the OpenTelemetry Semantic Convention for Messaging Spans, which seems to be slightly different from what Sentry Queues defines in its spec. OT standard makes intuitive sense to me, curious to hear your thoughts.

Getting Queues to work is not as high priority for us right now, and this library doesn't seem easily customizable, so gonna punt on this for some time. I'll stay tuned to this issue. Thanks for the support.

iacobus avatar Aug 26 '24 11:08 iacobus

As far as I can tell we are also adhering to the otel sem conventions except for the span op - which is not an otel concept.

lforst avatar Aug 26 '24 12:08 lforst

Hey @iacobus & @lforst,

I was just looking into this as well this week. I tried forking & working on the appsignal version, but it resulted into events being mixed up in the Sentry UI (some queue events would show up under another queue).

I came up with this snippet of code. It's very hacky, but it's working somehow. It handles add(), addBulk() & processJob(). I'm not using flows so I probably won't implement it. I thought it might be helpful maybe to put it here.

I'm sure there are some things to fix in here. Especially around the duplication of spans attributes. Also, I need to look into it more to be sure, but I'm pretty sure I end up with a lot more recorded processed events than published. So, if anyone has an idea on why that could be, I'd love to know.

const { Queue, Worker } = require('bullmq');
const bullMQIntegration = {
    name: 'bullmq',
    setupOnce() {
        const originalAddBulkQueue = Queue.prototype.addBulk;
        Queue.prototype.addBulk = async function (...args) {
            const jobs = args[0];
            const messageId = crypto.randomBytes(8).toString('hex');

            Sentry.startSpan({
                name: 'queue_producer_transaction',
                op: 'queue.publish',
                attributes: {
                    'sentry.op': 'queue.publish',
                    'messaging.message.id': messageId,
                    'messaging.destination.name': this.name,
                    'messaging.system': 'bullmq'
                }
            },
            async parent => {
                const promises = [];
                for (let i = 0; i < jobs.length; i++) {
                    const job = jobs[i];
                    promises.push(
                        Sentry.startSpan(
                            {
                                name: 'queue_producer',
                                op: 'queue.create',
                                attributes: {
                                    'sentry.op': 'queue.create',
                                    'messaging.message.id': messageId,
                                    'messaging.destination.name': this.name,
                                    'messaging.system': 'bullmq'
                                }
                            },
                            async span => {
                                const traceHeader = Sentry.spanToTraceHeader(span);
                                const baggageHeader = Sentry.spanToBaggageHeader(span);
                                const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
                                await redis.lpush(`span:${job.name}`, JSON.stringify(instrumentationData));
                            }
                        )
                    );
                }

                await Promise.all(promises);
                await originalAddBulkQueue.apply(this, args);
            });
        }

        const originalAddQueue = Queue.prototype.add;
        Queue.prototype.add = async function (...args) {
            const messageId = crypto.randomBytes(8).toString('hex');
            Sentry.startSpan({
                name: 'queue_producer_transaction',
                op: 'queue.publish',
                attributes: {
                    'sentry.op': 'queue.publish',
                    'messaging.message.id': messageId,
                    'messaging.destination.name': this.name,
                    'messaging.system': 'bullmq'
                }
            },
            parent => {
                Sentry.startSpan(
                    {
                        name: 'queue_producer',
                        op: 'queue.publish',
                        attributes: {
                            'sentry.op': 'queue.publish',
                            'messaging.message.id': messageId,
                            'messaging.destination.name': this.name,
                            'messaging.system': 'bullmq'
                        }
                    },
                    async span => {
                        const traceHeader = Sentry.spanToTraceHeader(span);
                        const baggageHeader = Sentry.spanToBaggageHeader(span);
                        const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
                        await redis.lpush(`span:${args[0]}`, JSON.stringify(instrumentationData));
                        await originalAddQueue.apply(this, args);
                    }
                );
            });
        };

        const originalRunWorker = Worker.prototype.processJob;
        Worker.prototype.processJob = async function(...args) {
            const message = JSON.parse(await redis.lpop(`span:${args[0].name}`));
            if (!message)
                return originalRunWorker.apply(this, args);

            const latency = Date.now() - message.timestamp;
            Sentry.continueTrace(
                { sentryTrace: message.traceHeader, baggage: message.baggageHeader },
                () => {
                    Sentry.startSpan({
                        name: 'queue_consumer_transaction',
                        op: 'queue.process',
                            attributes: {
                                'sentry.op': 'queue.process',
                                'messaging.message.id': message.messageId,
                                'messaging.destination.name': args[0].queue.name,
                                'messaging.message.receive.latency': latency,
                                'messaging.system': 'bullmq'
                            }
                    },
                    parent => {
                        Sentry.startSpan({
                            name: 'queue_consumer',
                            op: 'queue.process',
                            attributes: {
                                'sentry.op': 'queue.process',
                                'messaging.message.id': message.messageId,
                                'messaging.destination.name': args[0].queue.name,
                                'messaging.message.receive.latency': latency,
                                'messaging.system': 'bullmq'
                            }
                        }, (span) => {
                            originalRunWorker.apply(this, args)
                            parent.setStatus({ code: 1, message: 'ok' });
                        });
                    })
                }
            );
        };
    },
};

Sentry.init({
    dsn: getSentryDsn(),
    environment: getNodeEnv() || 'development',
    release: `ethernal@${getVersion()}`,
    integrations: [
        nodeProfilingIntegration(),
        Sentry.postgresIntegration,
        bullMQIntegration,
    ],
    tracesSampleRate: 1.0,
    profilesSampleRate: 1.0,
    debug: true
});

And here is what it looks like in Sentry:

Image

And each destination contains two lines: queue_producer_transaction (type "Producer", with the number of Published events/error rate/time spent set), and queue_consumer_transaction (type "Consumer", with the avg time in queue/processing time/processed/error rate/time spent set)

antoinedc avatar Aug 27 '24 21:08 antoinedc

Announcing BullMQ Telemetry Support: https://bullmq.io/news/241104/telemetry-support/

WilliamBlais avatar Nov 22 '24 16:11 WilliamBlais

@WilliamBlais nice! Your otel spans should be picked up automatically by Sentry 👍

chargome avatar Nov 25 '24 09:11 chargome

Hello, thanks for writing in. We are on company-wide hackweek and thus on limited support this week. We'll take a look at this next week.

Hey @andreiborza, any updates on this? It seems like it's really close to supporting it. Sentry even picks up the OTEL spans from BullMQ when telemetry configuration is provided, but unfortunately they're not shown on the queues tab

ThallesP avatar Mar 21 '25 23:03 ThallesP

Hey @ThallesP no update at this time!

You'll have to add some manual instrumentation for now.

Using the beforeSendSpan hook, you can find BullMQ spans (based on their span name) and then annotate these spans with the attributes the queues module expects.

AbhiPrasad avatar Mar 21 '25 23:03 AbhiPrasad