confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Consumer.offsets_for_times() returns the same offset for different timestamps

Open miroslavbel opened this issue 4 years ago • 4 comments

Description

How to reproduce

Set up some variables to use

>>> from datetime import datetime
>>> from confluent_kafka import TopicPartition
>>>
>>> topic = 'topic_name' # set up to real topic name
>>>
>>> partition = 0 # set up to existing topic partition
>>>
>>> date_in  = datetime(2021, 8, 6, 11, 10, 00)
>>> date_mid = datetime(2021, 8, 6, 11, 15, 00)
>>> date_out = datetime(2021, 8, 6, 11, 20, 00)
>>>
>>> tp_in = TopicPartition(topic=topic, partition=partition, offset=int(date_in.timestamp() * 1_000))
>>> tp_mid = TopicPartition(topic=topic, partition=partition, offset=int(date_mid.timestamp() * 1_000))
>>> tp_out = TopicPartition(topic=topic, partition=partition, offset=int(date_out.timestamp() * 1_000))

Connect to kafka

>>> from confluent_kafka import Consumer
>>> 
>>> cfg = {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}
>>> 
>>> consumer = Consumer(cfg)
>>> 
>>> consumer.list_topics(topic, 5) # for get real partitions
>>> consumer.subscribe([topic])

case 1 (get offsets for three timestamps separately)

  1. call .offsets_for_times() three times with a list with one timestamp
  2. get:
  • 46504966 offset for date_in
  • 46504987 offset for date_mid
  • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in])
[TopicPartition{topic=topic_name,partition=0,offset=46504966,error=None}]
>>> consumer.offsets_for_times([tp_mid])
[TopicPartition{topic=topic_name,partition=0,offset=46504987,error=None}]
>>> consumer.offsets_for_times([tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

case 2 (get offsets for two timestamps at one call)

  1. call .offsets_for_times() with a list with two timestamps
  2. get:
  • 46504988 offset for date_in
  • 46504988 offset for date_out
  1. expect:
  • ~~46504966 offset for date_in~~
  • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

case 3 (get offsets for three timestamps at one call)

  1. call .offsets_for_times() with a list with two timestamps
  2. get:
  • 46504988 offset for date_in
  • 46504988 offset for date_out
  • 46504988 offset for date_out
  1. expect:
  • ~~46504966 offset for date_in~~
  • ~~46504987 offset for date_mid~~
  • 46504988 offset for date_out
>>> consumer.offsets_for_times([tp_in, tp_mid, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]

Checklist

Please provide the following information:

confluent_kafka.version(): ('1.7.0', 17235968) confluent_kafka.libversion(): ('1.7.0', 17236223) python:Python 3.7.11 [GCC 9.3.0] on linux OS: Ubuntu 20.04.2 LTS Client configuration: {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • [ ] Apache Kafka broker version:
  • [x] Client configuration: {...}
  • [x] Operating system:
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

miroslavbel avatar Aug 06 '21 12:08 miroslavbel

I have a vague memory that you're not allowed to pass the same partition multiple times in the same call.

edenhill avatar Aug 09 '21 06:08 edenhill

Oh, yes. I see this note in librdkafka but I don't see in confluent-kafka-python. May be we need to fix this?

miroslavbel avatar Aug 11 '21 00:08 miroslavbel

It seems to me that docs at site is different from this repo: site in Consumer.poll() doesn't have note section, but this repo has.

miroslavbel avatar Aug 11 '21 00:08 miroslavbel

New Admin API, list_offsets is also available now. You can use that API now.

Keeping this issue Open as there is some documentation related issue between API documentation and the comments in the code.

pranavrth avatar Feb 27 '24 13:02 pranavrth