apm-agent-python icon indicating copy to clipboard operation
apm-agent-python copied to clipboard

Add instrumentation and distributed tracing for RabbitMQ (pika)

Open champ opened this issue 5 years ago • 12 comments

Context One common way to integrate services is using RabbitMQ as a message broker. Currently, the Elastic APM agent only supports distributed tracing for HTTP calls. It would be of high value to add instrumentation and distributed tracing for message brokers and, in particular, RabbitMQ - one of the most deployed open source message brokers. Not having this integration could be a show-stopper for users of competitor solutions (New Relic already sports this integration) when considering migrating to Elastic APM.

Solution The ideal solution would be to instrument message publishing and include tracing data in message headers, interpreting those headers when consuming messages to continue the tracing.

To support the maximum number of projects using RabbitMQ, I would recommend to start by adding support for Pika - a widely-used AMQP client library having support for both Python 2.7 and +3.4.

Alternatives A flexible alternative to a library-specific integration could be to provide APIs to create and interpret a distributed tracing payload for use in custom instrumentation of any type of communication between services. New Relic, for example, does offer these APIs in create_distributed_trace_payload() and accept_distributed_trace_payload()

I'm willing to contribute with code if this is of general interest.

/ Hampus

champ avatar Dec 18 '19 10:12 champ

Thanks for the request, @champ! Message queues are definitely on our radar and we do want to support them. In fact, we're working on designing the UI that would better represent these types of transactions in Kibana as well.

I don't know for sure when RabbitMQ will hit the top of our roadmap, as we're currently focused on some asyncio framework support. If you'd rather not wait and would like to take a shot at adding something yourself, we'd be happy to support you in that effort! Just keep in mind that we won't be around much between now and the new year -- most of our team is taking some time for the holidays.

basepi avatar Dec 18 '19 19:12 basepi

@basepi great to hear you're working on messaging integration!

I've been trying out some instrumenting of pika taking into account the discussions/goals in these issues:

Here are the first results:

  • Capturing publish and get actions as spans - works fine.
  • Propagating trace header and state in published messages - works fine.
  • Capturing consumed messages as spans (within an active transaction) - works fine.
  • Capturing consumed messages as transactions (when no active transaction) - not straight-forward since transactions have to be created from a client, and there is no access to a client singleton.

Any ideas on how to deal with creating transactions from a global context? Any API changes on the roadmap to support that?

champ avatar Dec 20 '19 18:12 champ

This is something I ran into while instrumenting tornado. Because Tornado doesn't have any API for middleware, I had to treat it as an instrumentation, rather than an integration (like Django or Flask). This means I need the ability to create transactions, as the call() function is only called when there's an active, sampling transaction.

Previously, this would require you to override call_if_sampling(), like Beni did in his Django management commands PR. It's messy.

Instead, I added a new flag to the AbstractInstrumentedModule class, creates_transactions. If set to True, the call() function will always be called and you can create transactions at that stage.

However, this doesn't solve your Client object issues. Even once you get to the point where your code is being called without an active transaction, you need to get access to the singleton Client object to create the transaction.


We generally store the Client in a way that jives with whatever framework we're integrating with. In Flask, everything is done via signals so we can just store it in our ElasticAPM object that is created as part of Flask configuration. In Tornado I'm storing it in the Application object so I can access it later.

RabbitMQ isn't a web framework, so I guess my first instinct is that you should be able to get the Client object from whatever framework you're using, which is calling rabbitmq. Once you have the Client, then you can create a transaction (paying attention to whether there's a trace ID embedded in the message from Rabbit, for distributed tracing across the send/receive) and you're off to the races.

That said, generalizing that interface between RabbitMQ and arbitrary frameworks seems difficult. There's no established API for instrumented libraries to access their callers in this way. The only "communication" is the transaction itself, which we make available in contextvars (or threadlocals) so it can be accessed for creating spans. I wonder if we could find a way to distribute the Client object in a similar way? (I'm betting dragons lurk that way, but it might work...)

@beniwohli Any thoughts on how to make the Client more readily available for ad-hoc consumers like in rabbitmq?

basepi avatar Jan 06 '20 17:01 basepi

Interested to "trace" distributed transactions which pass through "Rabbit MQ" through the Application. The clients for Rabbit MQ are written in python 2.x & Python 3,.x to consume and produce messages.

When in Elastic APM, how do we configure/add/span which shows the "Request -calls which pass through a queue and being processed by the consumer".

Entire lifecycle of the request -> Application > some processing in the server -> generates message in RMQ - > Consumers consume the message.

Each message should be considered as transaction and latency should be calculated.

Even the Request where it went and to which queue ?

dikshith avatar May 17 '23 11:05 dikshith

@dikshith I expect that we'll instrument rabbitmq the same way we do kafka. Which should satisfy most of your requirements nicely -- in kafka we either create a span if a transaction is already active, or a transaction otherwise. And we definitely want to save all of the context information like queue names.

basepi avatar May 17 '23 14:05 basepi

How to create a span for an active transaction? Is there any example ? Please share.

My scenario is http calls -> then to rmq -> then for processing then output to client

So a heterogenous system. Already tracing works best for me till RMQ, then in no man’s land.

On Wed, 17 May 2023 at 8:04 PM, Colton Myers @.***> wrote:

@dikshith https://github.com/dikshith I expect that we'll instrument rabbitmq the same way we do kafka. Which should satisfy most of your requirements nicely -- in kafka we either create a span if a transaction is already active, or a transaction otherwise. And we definitely want to save all of the context information like queue names.

— Reply to this email directly, view it on GitHub https://github.com/elastic/apm-agent-python/issues/678#issuecomment-1551518640, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAQBOXOOKVTBD2JOEWKLW6LXGTOYLANCNFSM4J4I7Q4A . You are receiving this because you were mentioned.Message ID: @.***>

dikshith avatar May 17 '23 17:05 dikshith

@dikshith yes, you can use capture_span to capture custom spans. You can pass in extra context (like the queue name) into the undocumented extra argument (pass in a dictionary), or you can put extra context into labels as well.

basepi avatar May 17 '23 17:05 basepi

Please any example code so that I can see , these explanations are bit difficult to get through.

Hope you understand

On Wed, 17 May 2023 at 10:44 PM, Colton Myers @.***> wrote:

@dikshith https://github.com/dikshith yes, you can use capture_span https://www.elastic.co/guide/en/apm/agent/python/current/api.html#api-capture-span to capture custom spans. You can pass in extra context into the undocumented extra argument (pass in a dictionary), or you can put extra context into labels as well.

— Reply to this email directly, view it on GitHub https://github.com/elastic/apm-agent-python/issues/678#issuecomment-1551780984, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAQBOXIH3AEB5EPKENP7QJDXGUBPNANCNFSM4J4I7Q4A . You are receiving this because you were mentioned.Message ID: @.***>

dikshith avatar May 17 '23 17:05 dikshith

Something like this:

def consume(message, queue, traceparent):
    with elasticapm.capture_span(
        name="consumer",
        span_type="messaging.rabbitmq.receive",
        links=[traceparent],
        labels={"queue": queue},
    ):
        result = None  # do your work here
    return result

basepi avatar May 17 '23 17:05 basepi

Let me try and try

Thank you

On Wed, 17 May 2023 at 11:20 PM, Colton Myers @.***> wrote:

Something like this:

def consume(message, queue, traceparent): with elasticapm.capture_span( name="consumer", span_type="messaging.rabbitmq.receive", links=[traceparent], labels={"queue": queue}, ): result = None # do your work here return result

— Reply to this email directly, view it on GitHub https://github.com/elastic/apm-agent-python/issues/678#issuecomment-1551822166, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAQBOXKB4URMM6235KAKFQLXGUFUZANCNFSM4J4I7Q4A . You are receiving this because you were mentioned.Message ID: @.***>

dikshith avatar May 17 '23 18:05 dikshith