opentelemetry-python-contrib icon indicating copy to clipboard operation
opentelemetry-python-contrib copied to clipboard

Create span only after record is received while polling

Open mrajashree opened this issue 2 years ago • 9 comments
trafficstars

Description

Fixes https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1674

The package confluent-kafka has a class Consumer, which has a method called poll. As per docs, the recommended usage is calling consumer.poll infinitely, so it keeps polling the brokers for messages, and then each time it's called, the code is supposed to check if the message is None or has any error before trying to process the message. (Ref: https://github.com/confluentinc/confluent-kafka-python#basic-consumer-example)

The package opentelemetry-instrumentation-confluent-kafka offers a wrapper around confluent-kafka's consumer, called ProxiedConsumer. ProxiedConsumer also has a method call poll, which calls ConfluentKafkaInstrumentor's wrap_poll method. This wrap_poll method calls the underlying consumer's poll method with the user specified timeout. The confluent_kafka.Consumer.poll method is supposed to be called from an application from within an infinite loop, which means the wrap_poll method will also be called with each iteration of the infinite loop. This is the observed behavior with the current implementation of wrap_poll:

  1. The wrap_poll method is creating a span each time it is called. If we go by this example where the consumer.poll is called with a timeout of 1 second, the current wrap_poll implementation will create a span per second.
  2. It starts this span before extracting context from the kafka message, so this span is no longer linked to any previous spans. Whereas it should only create a span after checking that the received record is not None (here) and is an actual kafka message.
  3. Since the span started before checking if record exists here is started as current span, the span that's started after record is received will use the current span's context even if the links contain the context from the message headers.
  4. Lastly, wrap_poll returns the record even if the record is None, it should only return record if the record exists.

This PR fixes the above issues. This is assuming my understanding and usage of ProxiedConsumer is right, please correct me if not. Here's a sample code snippet based on the docs:

c = confluent_kafka.Consumer({ 'bootstrap.servers': 'localhost:29092' })
consumer = ConfluentKafkaInstrumentor().instrument_consumer(c, tracer_provider=tracer_provider)
consumer.subscribe(['mytopic'])
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    process(msg) // process is just some method to handle messages

Type of change

Please delete options that are not relevant.

  • [x] Bug fix (non-breaking change which fixes an issue)
  • [ ] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] This change requires a documentation update

How Has This Been Tested?

Tested this change and verified that the span created by wrap_poll stays linked to all spans created previously for the current trace. Also the spans created after wrap_poll stay linked to the same trace for a kafka message.

Does This PR Require a Core Repo Change?

  • [ ] Yes. - Link to PR:
  • [x] No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • [ ] Followed the style guidelines of this project
  • [ ] Changelogs have been updated
  • [ ] Unit tests have been added
  • [ ] Documentation has been updated

mrajashree avatar Feb 16 '23 00:02 mrajashree

Please add a changelog entry

shalevr avatar Feb 27 '23 06:02 shalevr

There was probably a reason why span links were used. Please take a look at the change again.

srikanthccv avatar Feb 28 '23 14:02 srikanthccv

There was probably a reason why span links were used. Please take a look at the change again.

@srikanthccv yes I wrote a bit in the PR description about the usage of links here and why I think it's not the right choice here. Here's what I think:

  1. The span links were used to link the span started here to the span associated with the sender of kafka message.
  2. Links by definition can point to spans within the same trace or across traces. In this case we want the new span to be a part of the same trace as the span from which the kafka message was sent.
  3. Passing in links alone won't ensure that the new span will belong to the current trace. Because as per the start_span code here, the context argument is checked first, and if it's not passed, then the new span is considered as the root span. So while it will stay linked to the previous spans, it won't be part of a same trace.
  4. So as a proposed change in this PR, I'm starting the new span by passing it the context extracted from previous spans. This way I've confirmed that all spans associated to a kafka message belong to the same trace.

Let me know what you think

mrajashree avatar Feb 28 '23 19:02 mrajashree

@mrajashree I think the link code needs to stay as it was mostly correct as it was mostly following https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/#batch-receiving as Kafka is a batch receiver.

Also there is some more background here https://opentelemetry.io/blog/2022/instrument-kafka-clients/

owenhaynes avatar Apr 21 '23 08:04 owenhaynes

@mrajashree do you have any thoughts on this after the comment from @owenhaynes?

ocelotl avatar Jul 12 '23 18:07 ocelotl

Hi, anything can be done to push this through?

It would be great to have the consumer use the context from the kafka message and be in the same trace as the producer, this would be much more useful in a distributed system to trace async flows.

In this example https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#topic-with-multiple-consumers

the spans created by the consumers have the same parent as the producer.

Samira-El avatar Aug 17 '23 16:08 Samira-El

It would be very useful to resolve this issue. As it stands we don't have viable distributed tracing - just separate Producer and Consumer Spans

alexchowle avatar Oct 17 '23 07:10 alexchowle

hi @ocelotl and @owenhaynes, as per the opentelemtry semantic conventions for messaging, context propagation is a must, which is not being done currently https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#context-propagation

A message may traverse many different components and layers in one or more intermediaries when it is propagated from the producer to the consumer(s). To be able to correlate consumer traces with producer traces using the existing context propagation mechanisms, all components must propagate context down the chain.

There's some more background on this here: https://github.com/open-telemetry/oteps/blob/main/text/trace/0205-messaging-semantic-conventions-context-propagation.md

This issue was also fixed in the java lib (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/3529)

I think if we need to use links, we also need to figure out a way of propagating context and not starting a new trace each time consumer receives a message

mrajashree avatar Mar 29 '24 19:03 mrajashree

Hi @lzchen @ocelotl @shalevr - can you please take a look at this PR and merge. As the contributor @mrajashree mentioned this fix has been done in other language libraries and is needed by downstream by end-users. Thanks for your help in getting this fix in.

alolita avatar Jun 04 '24 17:06 alolita