confluent-kafka-python
confluent-kafka-python copied to clipboard
Consumer.offsets_for_times() returns the same offset for different timestamps
Description
How to reproduce
Set up some variables to use
>>> from datetime import datetime
>>> from confluent_kafka import TopicPartition
>>>
>>> topic = 'topic_name' # set up to real topic name
>>>
>>> partition = 0 # set up to existing topic partition
>>>
>>> date_in = datetime(2021, 8, 6, 11, 10, 00)
>>> date_mid = datetime(2021, 8, 6, 11, 15, 00)
>>> date_out = datetime(2021, 8, 6, 11, 20, 00)
>>>
>>> tp_in = TopicPartition(topic=topic, partition=partition, offset=int(date_in.timestamp() * 1_000))
>>> tp_mid = TopicPartition(topic=topic, partition=partition, offset=int(date_mid.timestamp() * 1_000))
>>> tp_out = TopicPartition(topic=topic, partition=partition, offset=int(date_out.timestamp() * 1_000))
Connect to kafka
>>> from confluent_kafka import Consumer
>>>
>>> cfg = {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}
>>>
>>> consumer = Consumer(cfg)
>>>
>>> consumer.list_topics(topic, 5) # for get real partitions
>>> consumer.subscribe([topic])
case 1 (get offsets for three timestamps separately)
- call
.offsets_for_times()three times with a list with one timestamp - get:
46504966offset fordate_in46504987offset fordate_mid46504988offset fordate_out
>>> consumer.offsets_for_times([tp_in])
[TopicPartition{topic=topic_name,partition=0,offset=46504966,error=None}]
>>> consumer.offsets_for_times([tp_mid])
[TopicPartition{topic=topic_name,partition=0,offset=46504987,error=None}]
>>> consumer.offsets_for_times([tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]
case 2 (get offsets for two timestamps at one call)
- call
.offsets_for_times()with a list with two timestamps - get:
46504988offset fordate_in46504988offset fordate_out
- expect:
- ~~
46504966offset fordate_in~~ 46504988offset fordate_out
>>> consumer.offsets_for_times([tp_in, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]
case 3 (get offsets for three timestamps at one call)
- call
.offsets_for_times()with a list with two timestamps - get:
46504988offset fordate_in46504988offset fordate_out46504988offset fordate_out
- expect:
- ~~
46504966offset fordate_in~~ - ~~
46504987offset fordate_mid~~ 46504988offset fordate_out
>>> consumer.offsets_for_times([tp_in, tp_mid, tp_out])
[TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}, TopicPartition{topic=topic_name,partition=0,offset=46504988,error=None}]
Checklist
Please provide the following information:
confluent_kafka.version(): ('1.7.0', 17235968)
confluent_kafka.libversion(): ('1.7.0', 17236223)
python:Python 3.7.11 [GCC 9.3.0] on linux
OS: Ubuntu 20.04.2 LTS
Client configuration: {'group.id':'qa', 'bootstrap.servers':'serever1.example:port,serever2.example:port,serever3.example:port,serever4.example:port,serever5.example:port'}
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): - [ ] Apache Kafka broker version:
- [x] Client configuration:
{...} - [x] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
I have a vague memory that you're not allowed to pass the same partition multiple times in the same call.
Oh, yes. I see this note in librdkafka but I don't see in confluent-kafka-python. May be we need to fix this?
It seems to me that docs at site is different from this repo: site in Consumer.poll() doesn't have note section, but this repo has.
New Admin API, list_offsets is also available now. You can use that API now.
Keeping this issue Open as there is some documentation related issue between API documentation and the comments in the code.