opentelemetry-python-contrib icon indicating copy to clipboard operation
opentelemetry-python-contrib copied to clipboard

aiokafka: cannot trace message consuming with auto-instrumentation

Open andreaturli opened this issue 9 months ago • 4 comments

What problem do you want to solve?

When running https://aiokafka.readthedocs.io/en/stable/#aiokafkaconsumer, traces are not produced when using zero-code instrumentation similar to https://opentelemetry.io/docs/languages/python/getting-started/#instrumentation

Describe the solution you'd like

I'd like aiokafka consumer to produce traces with zero-code instrumentation

Describe alternatives you've considered

Having something like the following

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

I've tried to get traces using

async def main():
   consume()

and also

async def main():
   with tracer.start_as_current_span("kafka-consumer") as span:
      consume()

but nothing gets created in my OTEL exporter

Additional Context

Using

opentelemetry-instrumentation-aiokafka==0.53b1 and python 3.13

Would you like to implement a fix?

None

andreaturli avatar May 05 '25 15:05 andreaturli

I just tested this with the examples from https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3466, and it seems to be working fine.

emdneto avatar May 05 '25 19:05 emdneto

I can confirm it seems to be working when running something like opentelemetry-instrument python consumer.py where consumer.py is

import asyncio
import json
import logging

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

async def send_one_message(topic_name, message, bootstrap_servers='localhost:9092'):
    """
    Send a single message to a Kafka topic

    Args:
        topic_name (str): Name of the Kafka topic
        message (str or bytes): Message to send
        bootstrap_servers (str): Kafka broker(s) address
    """
    # Convert string message to bytes if needed
    if isinstance(message, str):
        message = message.encode('utf-8')

    # Create producer
    producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)

    try:
        # Start the producer
        await producer.start()

        # Send message
        logger.info(f"Sending message to topic {topic_name}")
        await producer.send(topic_name, message)
        logger.info(f"Message sent successfully")

    finally:
        # Stop the producer
        await producer.stop()

async def consume_messages():
    # Create a consumer instance
    consumer = AIOKafkaConsumer(
        'my-topic',  # topic name
        bootstrap_servers='localhost:9092',  # Kafka broker(s)
        group_id='my-group',  # Consumer group ID
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # Deserialize JSON messages
    )

    # Start the consumer
    await consumer.start()

    try:
        # Consume messages
        async for msg in consumer:
            print(f"Received message:")
            print(f"Topic: {msg.topic}")
            print(f"Partition: {msg.partition}")
            print(f"Offset: {msg.offset}")
            print(f"Key: {msg.key}")
            print(f"Value: {msg.value}")
            print("------------------------")


    except Exception as e:
        logger.error(f"Error: {e}")
    finally:
        # Stop the consumer
        await consumer.stop()

# Example usage
if __name__ == "__main__":
    # Topic name
    TOPIC = "my-topic"
    # Message to send
    MESSAGE = "Hello, Kafka!"
    # Send a single message
    asyncio.run(send_one_message(TOPIC, MESSAGE))
    # Consume messages
    asyncio.run(consume_messages())

with

> pip freeze | grep opentelemetry
opentelemetry-api==1.32.1
opentelemetry-distro==0.53b1
opentelemetry-exporter-otlp==1.32.1
opentelemetry-exporter-otlp-proto-common==1.32.1
opentelemetry-exporter-otlp-proto-grpc==1.32.1
opentelemetry-exporter-otlp-proto-http==1.32.1
opentelemetry-instrumentation==0.53b1
opentelemetry-instrumentation-aiohttp-client==0.53b1
opentelemetry-instrumentation-aiohttp-server==0.53b1
opentelemetry-instrumentation-aiokafka==0.53b1
opentelemetry-instrumentation-asyncio==0.53b1
opentelemetry-instrumentation-asyncpg==0.53b1
opentelemetry-instrumentation-dbapi==0.53b1
opentelemetry-instrumentation-grpc==0.53b1
opentelemetry-instrumentation-logging==0.53b1
opentelemetry-instrumentation-requests==0.53b1
opentelemetry-instrumentation-sqlite3==0.53b1
opentelemetry-instrumentation-threading==0.53b1
opentelemetry-instrumentation-urllib==0.53b1
opentelemetry-instrumentation-urllib3==0.53b1
opentelemetry-instrumentation-wsgi==0.53b1
opentelemetry-proto==1.32.1
opentelemetry-sdk==1.32.1
opentelemetry-semantic-conventions==0.53b1
opentelemetry-util-http==0.53b1

see also the traces on my exporter

Image

not sure why when used in combination with other libraries, namely aiohttp-server and asyncpg, it gives me this weird message

Image

Any idea?

andreaturli avatar May 06 '25 07:05 andreaturli

Looks like the consumer span only exists in the global instrumentator's async_consume_hook.

When we use AIOKafkaConsumer::__anext__, the consumer span is no longer there, so any downstream operations don't have any context.

from opentelemetry import trace

...

async for msg in consumer:
    # seeing the traceparent header in the consumed message from the producer
    msg.headers  # `[..., ('traceparent', b'00-24770f968d52edaf2771745749cec452-9c407f42b9cdf30f-01')]`

    # the consumer span is created and sent to the span processor, visible in `ConsoleSpanExporter()`

    # but the consumer span gets ended too early, so it's missing in the actual consumer logic here
    span = trace.get_current_span()  # NonRecordingSpan(SpanContext(trace_id=0x00000000000000000000000000000000, span_id=0x0000000000000000, trace_flags=0x00, trace_state=[], is_remote=False))

    ... # so db queries, http calls aren't attached to the consumer span

artkravchenko avatar May 12 '25 05:05 artkravchenko

FWIW, I think starting a new span with the trace context from the headers could be a workaround for those who are okay with this kind of inefficiency. The following workaround ensures that the "trace_id" matches between the XXX receive span from AIOKafkaConsumer.getone() in AIOKafkaConsumer.__anext__() and the print message span created in the code.

from aiokafka import AIOKafkaConsumer
from opentelemetry import trace, propagate
from opentelemetry.instrumentation.aiokafka.utils import AIOKafkaContextGetter

tracer = trace.get_tracer(__name__)

async def consume(topic, bootstrap_servers):
    consumer = AIOKafkaConsumer(topic, bootstrap_servers=bootstrap_servers)
    await consumer.start()
    aiokafka_getter = AIOKafkaContextGetter()  # THIS LINE
    try:
        async for message in consumer:
            extracted_context = propagate.extract(message.headers, getter=aiokafka_getter)  # AND THIS LINE
            with tracer.start_as_current_span("print message", context=extracted_context):  # FOR `context=extracted_context` HERE ARE THE KEY.
                print(message)
            break
    finally:
        await consumer.stop()

EDIT: Linking similar issues:

  • https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3026
  • https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1732

sakurai-youhei avatar Dec 17 '25 13:12 sakurai-youhei

Spent a few on this and I think the instrumentation is right, if you don't want the artificial span, pushing the context should be enough as the example below.

from aiokafka import AIOKafkaConsumer
from opentelemetry import context, propagate
from opentelemetry.instrumentation.aiokafka.utils import AIOKafkaContextGetter


async def consume(topic, bootstrap_servers):
    consumer = AIOKafkaConsumer(topic, bootstrap_servers=bootstrap_servers)
    await consumer.start()
    aiokafka_getter = AIOKafkaContextGetter()
    try:
        async for message in consumer:
            extracted_context = propagate.extract(msg.headers, getter=aiokafka_getter)
            token = context.attach(extracted_context)
            try:
                   ...
            finally:
                context.detach(token)
    finally:
        await consumer.stop()

Unfortunately I don't think we can wrap the aiokafka consumer iterator easily to do the same under the hood.

xrmx avatar Jan 28 '26 16:01 xrmx