aiokafka: cannot trace message consuming with auto-instrumentation
What problem do you want to solve?
When running https://aiokafka.readthedocs.io/en/stable/#aiokafkaconsumer, traces are not produced when using zero-code instrumentation similar to https://opentelemetry.io/docs/languages/python/getting-started/#instrumentation
Describe the solution you'd like
I'd like aiokafka consumer to produce traces with zero-code instrumentation
Describe alternatives you've considered
Having something like the following
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
I've tried to get traces using
async def main():
consume()
and also
async def main():
with tracer.start_as_current_span("kafka-consumer") as span:
consume()
but nothing gets created in my OTEL exporter
Additional Context
Using
opentelemetry-instrumentation-aiokafka==0.53b1 and python 3.13
Would you like to implement a fix?
None
I just tested this with the examples from https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3466, and it seems to be working fine.
I can confirm it seems to be working when running something like opentelemetry-instrument python consumer.py where consumer.py is
import asyncio
import json
import logging
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
async def send_one_message(topic_name, message, bootstrap_servers='localhost:9092'):
"""
Send a single message to a Kafka topic
Args:
topic_name (str): Name of the Kafka topic
message (str or bytes): Message to send
bootstrap_servers (str): Kafka broker(s) address
"""
# Convert string message to bytes if needed
if isinstance(message, str):
message = message.encode('utf-8')
# Create producer
producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
try:
# Start the producer
await producer.start()
# Send message
logger.info(f"Sending message to topic {topic_name}")
await producer.send(topic_name, message)
logger.info(f"Message sent successfully")
finally:
# Stop the producer
await producer.stop()
async def consume_messages():
# Create a consumer instance
consumer = AIOKafkaConsumer(
'my-topic', # topic name
bootstrap_servers='localhost:9092', # Kafka broker(s)
group_id='my-group', # Consumer group ID
value_deserializer=lambda m: json.loads(m.decode('utf-8')) # Deserialize JSON messages
)
# Start the consumer
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print(f"Received message:")
print(f"Topic: {msg.topic}")
print(f"Partition: {msg.partition}")
print(f"Offset: {msg.offset}")
print(f"Key: {msg.key}")
print(f"Value: {msg.value}")
print("------------------------")
except Exception as e:
logger.error(f"Error: {e}")
finally:
# Stop the consumer
await consumer.stop()
# Example usage
if __name__ == "__main__":
# Topic name
TOPIC = "my-topic"
# Message to send
MESSAGE = "Hello, Kafka!"
# Send a single message
asyncio.run(send_one_message(TOPIC, MESSAGE))
# Consume messages
asyncio.run(consume_messages())
with
> pip freeze | grep opentelemetry
opentelemetry-api==1.32.1
opentelemetry-distro==0.53b1
opentelemetry-exporter-otlp==1.32.1
opentelemetry-exporter-otlp-proto-common==1.32.1
opentelemetry-exporter-otlp-proto-grpc==1.32.1
opentelemetry-exporter-otlp-proto-http==1.32.1
opentelemetry-instrumentation==0.53b1
opentelemetry-instrumentation-aiohttp-client==0.53b1
opentelemetry-instrumentation-aiohttp-server==0.53b1
opentelemetry-instrumentation-aiokafka==0.53b1
opentelemetry-instrumentation-asyncio==0.53b1
opentelemetry-instrumentation-asyncpg==0.53b1
opentelemetry-instrumentation-dbapi==0.53b1
opentelemetry-instrumentation-grpc==0.53b1
opentelemetry-instrumentation-logging==0.53b1
opentelemetry-instrumentation-requests==0.53b1
opentelemetry-instrumentation-sqlite3==0.53b1
opentelemetry-instrumentation-threading==0.53b1
opentelemetry-instrumentation-urllib==0.53b1
opentelemetry-instrumentation-urllib3==0.53b1
opentelemetry-instrumentation-wsgi==0.53b1
opentelemetry-proto==1.32.1
opentelemetry-sdk==1.32.1
opentelemetry-semantic-conventions==0.53b1
opentelemetry-util-http==0.53b1
see also the traces on my exporter
not sure why when used in combination with other libraries, namely aiohttp-server and asyncpg, it gives me this weird message
Any idea?
Looks like the consumer span only exists in the global instrumentator's async_consume_hook.
When we use AIOKafkaConsumer::__anext__, the consumer span is no longer there,
so any downstream operations don't have any context.
from opentelemetry import trace
...
async for msg in consumer:
# seeing the traceparent header in the consumed message from the producer
msg.headers # `[..., ('traceparent', b'00-24770f968d52edaf2771745749cec452-9c407f42b9cdf30f-01')]`
# the consumer span is created and sent to the span processor, visible in `ConsoleSpanExporter()`
# but the consumer span gets ended too early, so it's missing in the actual consumer logic here
span = trace.get_current_span() # NonRecordingSpan(SpanContext(trace_id=0x00000000000000000000000000000000, span_id=0x0000000000000000, trace_flags=0x00, trace_state=[], is_remote=False))
... # so db queries, http calls aren't attached to the consumer span
FWIW, I think starting a new span with the trace context from the headers could be a workaround for those who are okay with this kind of inefficiency. The following workaround ensures that the "trace_id" matches between the XXX receive span from AIOKafkaConsumer.getone() in AIOKafkaConsumer.__anext__() and the print message span created in the code.
from aiokafka import AIOKafkaConsumer
from opentelemetry import trace, propagate
from opentelemetry.instrumentation.aiokafka.utils import AIOKafkaContextGetter
tracer = trace.get_tracer(__name__)
async def consume(topic, bootstrap_servers):
consumer = AIOKafkaConsumer(topic, bootstrap_servers=bootstrap_servers)
await consumer.start()
aiokafka_getter = AIOKafkaContextGetter() # THIS LINE
try:
async for message in consumer:
extracted_context = propagate.extract(message.headers, getter=aiokafka_getter) # AND THIS LINE
with tracer.start_as_current_span("print message", context=extracted_context): # FOR `context=extracted_context` HERE ARE THE KEY.
print(message)
break
finally:
await consumer.stop()
EDIT: Linking similar issues:
- https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3026
- https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1732
Spent a few on this and I think the instrumentation is right, if you don't want the artificial span, pushing the context should be enough as the example below.
from aiokafka import AIOKafkaConsumer
from opentelemetry import context, propagate
from opentelemetry.instrumentation.aiokafka.utils import AIOKafkaContextGetter
async def consume(topic, bootstrap_servers):
consumer = AIOKafkaConsumer(topic, bootstrap_servers=bootstrap_servers)
await consumer.start()
aiokafka_getter = AIOKafkaContextGetter()
try:
async for message in consumer:
extracted_context = propagate.extract(msg.headers, getter=aiokafka_getter)
token = context.attach(extracted_context)
try:
...
finally:
context.detach(token)
finally:
await consumer.stop()
Unfortunately I don't think we can wrap the aiokafka consumer iterator easily to do the same under the hood.