pykafka
pykafka copied to clipboard
consumer.reset_offsets is not working as expected when using datatime.datetime
Based on the documentation (https://pykafka.readthedocs.io/en/latest/api/simpleconsumer.html), we suppose to be able to use datetime.datetime to seek offset with the function consumer.reset_offsets. But it doesn't behave as expected. See the following example:
import pykafka
print(pykafka.__version__)
output:
'2.8.0'
from pykafka import KafkaClient
import time
client = KafkaClient(hosts="<ip:port>")
topic = client.topics['test1']
# create 10 messages with timestamp as the value
p = topic.get_producer()
for m_id in range(0,10):
msg = '{}'.format(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
p.produce(msg.encode("utf-8"))
time.sleep(5)
p.stop()
# create a consumer and print out the 10 messages
consumer = topic.get_simple_consumer(auto_offset_reset=OffsetType.EARLIEST,
reset_offset_on_start=True)
for message in consumer:
if message is not None:
raw_string = message.value.decode("utf-8")
print(message.offset, ':',raw_string)
output:
0 : 2019-11-05 00:56:11
1 : 2019-11-05 00:56:17
2 : 2019-11-05 00:56:22
3 : 2019-11-05 00:56:27
4 : 2019-11-05 00:56:32
5 : 2019-11-05 00:56:37
6 : 2019-11-05 00:56:42
7 : 2019-11-05 00:56:47
8 : 2019-11-05 00:56:52
9 : 2019-11-05 00:56:57
I want to query the historical data by resetting the offset based on timestamp. For example, if I seek the offset by 2019-11-05 00:56:32 I would expect the offset be reset to 4 (or something close due to any network delays).
# seek by timestamp
partition_offset_pairs = [(p, datetime.fromisoformat('2019-11-05 00:56:32')) for p in consumer.partitions.values()]
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
print(consumer.held_offsets)
output:
{0: -2}
The output is not as expected.
I have done a couple experiments with this and I don't see reset_offsets((pykafka.partition.Partition, datetime.datetime)) behaves in any reasonable way.
Note: I can do the same historical data query (seek offset) in kafka-python (with the function consumer.offsets_for_times) with no issues.