pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

consumer.reset_offsets is not working as expected when using datatime.datetime

Open wkelongws opened this issue 6 years ago • 0 comments

Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use datetime.datetime to seek offset with the function consumer.reset_offsets. But it doesn't behave as expected. See the following example:

import pykafka
print(pykafka.__version__)

output:

'2.8.0'
from pykafka import KafkaClient
import time

client = KafkaClient(hosts="<ip:port>")
topic = client.topics['test1']

# create 10 messages with timestamp as the value
p = topic.get_producer()
for m_id in range(0,10):
    msg = '{}'.format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
    p.produce(msg.encode("utf-8"))
    time.sleep(5)
p.stop()

# create a consumer and print out the 10 messages
consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.EARLIEST,
    reset_offset_on_start=True)

for message in consumer:
    if message is not None:
        raw_string = message.value.decode("utf-8")
        print(message.offset, ':',raw_string)

output:

0 : 2019-11-05 00:56:11
1 : 2019-11-05 00:56:17
2 : 2019-11-05 00:56:22
3 : 2019-11-05 00:56:27
4 : 2019-11-05 00:56:32
5 : 2019-11-05 00:56:37
6 : 2019-11-05 00:56:42
7 : 2019-11-05 00:56:47
8 : 2019-11-05 00:56:52
9 : 2019-11-05 00:56:57

I want to query the historical data by resetting the offset based on timestamp. For example, if I seek the offset by 2019-11-05 00:56:32 I would expect the offset be reset to 4 (or something close due to any network delays).

# seek by timestamp
partition_offset_pairs = [(p, datetime.fromisoformat('2019-11-05 00:56:32')) for p in consumer.partitions.values()]
consumer.reset_offsets(partition_offsets=partition_offset_pairs)

print(consumer.held_offsets)

output:

{0: -2}

The output is not as expected. I have done a couple experiments with this and I don't see reset_offsets((pykafka.partition.Partition, datetime.datetime)) behaves in any reasonable way.

Note: I can do the same historical data query (seek offset) in kafka-python (with the function consumer.offsets_for_times) with no issues.

wkelongws avatar Nov 05 '19 01:11 wkelongws