pykafka
pykafka copied to clipboard
It will cause continuous continue when offset in message > next_offset
pykafka version: 2.8.0
from pykafka import KafkaClient
import logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
)
def connect_kafka():
client = KafkaClient(hosts="127.0.0.1:9092", broker_version="2.1.0")
topic_name='test_topic'
topic = client.topics[topic_name]
consumer = topic.get_simple_consumer(
consumer_group=b'pytest',
auto_commit_enable=False,
consumer_id=b'test1',
)
return consumer
consumer=connect_kafka()
offset = 228704876
consumer.commit_offsets(partition_offsets=[(consumer.partitions[0], offset)])
print consumer.fetch_offsets()[0][1].offset
consumer.reset_offsets(partition_offsets=[(consumer.partitions[0], offset)])
print consumer.fetch_offsets()[0][1].offset
When run the code above, it will cause many logs like this: Fri, 03 Jan 2020 16:22:43 simpleconsumer.py[line:996] DEBUG Skipping enqueue for offset (228704877) not equal to next_offset (228704876)
This will not happen when deleting any of the last four rows.
debug log in code here: https://github.com/Parsely/pykafka/blob/ebbc5c70901237e60bd6654336675886793fb8d9/pykafka/simpleconsumer.py#L994