opentelemetry-python-contrib
opentelemetry-python-contrib copied to clipboard
Create span only after record is received while polling
Description
Fixes https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1674
The package confluent-kafka has a class Consumer, which has a method called poll. As per docs, the recommended usage is calling consumer.poll infinitely, so it keeps polling the brokers for messages, and then each time it's called, the code is supposed to check if the message is None or has any error before trying to process the message. (Ref: https://github.com/confluentinc/confluent-kafka-python#basic-consumer-example)
The package opentelemetry-instrumentation-confluent-kafka offers a wrapper around confluent-kafka's consumer, called ProxiedConsumer. ProxiedConsumer also has a method call poll, which calls ConfluentKafkaInstrumentor's wrap_poll method. This wrap_poll method calls the underlying consumer's poll method with the user specified timeout.
The confluent_kafka.Consumer.poll method is supposed to be called from an application from within an infinite loop, which means the wrap_poll method will also be called with each iteration of the infinite loop. This is the observed behavior with the current implementation of wrap_poll:
- The
wrap_pollmethod is creating a span each time it is called. If we go by this example where the consumer.poll is called with a timeout of 1 second, the currentwrap_pollimplementation will create a span per second. - It starts this span before extracting context from the kafka message, so this span is no longer linked to any previous spans. Whereas it should only create a span after checking that the received record is not None (here) and is an actual kafka message.
- Since the span started before checking if record exists here is started as current span, the span that's started after record is received will use the current span's context even if the links contain the context from the message headers.
- Lastly,
wrap_pollreturns the record even if the record is None, it should only return record if the record exists.
This PR fixes the above issues. This is assuming my understanding and usage of ProxiedConsumer is right, please correct me if not. Here's a sample code snippet based on the docs:
c = confluent_kafka.Consumer({ 'bootstrap.servers': 'localhost:29092' })
consumer = ConfluentKafkaInstrumentor().instrument_consumer(c, tracer_provider=tracer_provider)
consumer.subscribe(['mytopic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
process(msg) // process is just some method to handle messages
Type of change
Please delete options that are not relevant.
- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update
How Has This Been Tested?
Tested this change and verified that the span created by wrap_poll stays linked to all spans created previously for the current trace. Also the spans created after wrap_poll stay linked to the same trace for a kafka message.
Does This PR Require a Core Repo Change?
- [ ] Yes. - Link to PR:
- [x] No.
Checklist:
See contributing.md for styleguide, changelog guidelines, and more.
- [ ] Followed the style guidelines of this project
- [ ] Changelogs have been updated
- [ ] Unit tests have been added
- [ ] Documentation has been updated
Please add a changelog entry
There was probably a reason why span links were used. Please take a look at the change again.
There was probably a reason why span links were used. Please take a look at the change again.
@srikanthccv yes I wrote a bit in the PR description about the usage of links here and why I think it's not the right choice here. Here's what I think:
- The span links were used to link the span started here to the span associated with the sender of kafka message.
- Links by definition can point to spans within the same trace or across traces. In this case we want the new span to be a part of the same trace as the span from which the kafka message was sent.
- Passing in
linksalone won't ensure that the new span will belong to the current trace. Because as per the start_span code here, thecontextargument is checked first, and if it's not passed, then the new span is considered as the root span. So while it will stay linked to the previous spans, it won't be part of a same trace. - So as a proposed change in this PR, I'm starting the new span by passing it the context extracted from previous spans. This way I've confirmed that all spans associated to a kafka message belong to the same trace.
Let me know what you think
@mrajashree I think the link code needs to stay as it was mostly correct as it was mostly following https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/#batch-receiving as Kafka is a batch receiver.
Also there is some more background here https://opentelemetry.io/blog/2022/instrument-kafka-clients/
@mrajashree do you have any thoughts on this after the comment from @owenhaynes?
Hi, anything can be done to push this through?
It would be great to have the consumer use the context from the kafka message and be in the same trace as the producer, this would be much more useful in a distributed system to trace async flows.
In this example https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#topic-with-multiple-consumers
the spans created by the consumers have the same parent as the producer.
It would be very useful to resolve this issue. As it stands we don't have viable distributed tracing - just separate Producer and Consumer Spans
hi @ocelotl and @owenhaynes, as per the opentelemtry semantic conventions for messaging, context propagation is a must, which is not being done currently https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#context-propagation
A message may traverse many different components and layers in one or more intermediaries when it is propagated from the producer to the consumer(s). To be able to correlate consumer traces with producer traces using the existing context propagation mechanisms, all components must propagate context down the chain.
There's some more background on this here: https://github.com/open-telemetry/oteps/blob/main/text/trace/0205-messaging-semantic-conventions-context-propagation.md
This issue was also fixed in the java lib (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/3529)
I think if we need to use links, we also need to figure out a way of propagating context and not starting a new trace each time consumer receives a message
Hi @lzchen @ocelotl @shalevr - can you please take a look at this PR and merge. As the contributor @mrajashree mentioned this fix has been done in other language libraries and is needed by downstream by end-users. Thanks for your help in getting this fix in.