kafka-python
kafka-python copied to clipboard
Seeking on a consumer from the rebalance listener causes consumer to silently discard records
Discovered what appears to be a multi-threading bug that can be provoked by trying to do a seek from a rebalance listener. After the seek the consumer returns the record at the seeked offset and then silently resets the position to some subsequent offset potentially skipping a bunch of records.
Turning on Python logging shows that the fetcher is incorrectly thinking that there are compacted/deleted records to skip and resetting the position i.e. the code block at https://github.com/dpkp/kafka-python/blob/f19e4238fb47ae2619f18731f0e0e9a3762cfa11/kafka/consumer/fetcher.py#L674-L682 is triggered
I understand that the package is not fully type-safe so this probably should not be expected to work but wanted to document in case anyone else runs into this.
Code to Reproduce
def str_to_bytes(value: str) -> bytes:
return value.encode("UTF-8")
def bytes_to_str(value: bytes) -> str:
return value.decode("UTF-8")
def consumer_fetch_misses_records():
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
key_serializer=str_to_bytes,
value_serializer=str_to_bytes)
for i in range(1, 11):
producer.send(topic="test", key=str(i), value=str(i))
producer.close()
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
key_deserializer=bytes_to_str,
value_deserializer=bytes_to_str)
partition = TopicPartition("test", 0)
listener = TestRebalanceListener(consumer)
consumer.subscribe("test", listener=listener)
for i, record in enumerate(consumer):
print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")
class TestRebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer):
self.consumer = consumer
def on_partitions_revoked(self, revoked):
pass
def on_partitions_assigned(self, assigned):
for partition in assigned:
self.consumer.seek(partition, 0)
self.consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))
if __name__ == "__main__":
consumer_fetch_misses_records()
Produces the following output:
[test-0#0] 1: 1
When it should produce all 10 lines i.e.:
[test-0#0] 1: 1
[test-0#0] 1: 1
[test-0#1] 2: 2
[test-0#2] 3: 3
[test-0#3] 4: 4
[test-0#4] 5: 5
[test-0#5] 6: 6
[test-0#6] 7: 7
[test-0#7] 8: 8
[test-0#8] 9: 9
[test-0#9] 10: 10
Workaround
The workaround I found is to have the rebalance listener set a boolean flag when the assignment changes and then have my main loop seek appropriately e.g.
def consumer_fetch_misses_records():
producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
key_serializer=str_to_bytes,
value_serializer=str_to_bytes)
for i in range(1, 11):
producer.send(topic="test", key=str(i), value=str(i))
producer.close()
consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
key_deserializer=bytes_to_str,
value_deserializer=bytes_to_str)
partition = TopicPartition("test", 0)
listener = TestRebalanceListener()
consumer.subscribe("test", listener=listener)
for i, record in enumerate(consumer):
if listener.should_seek:
consumer.seek(partition, 0)
consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))
listener.should_seek = False
print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")
class TestRebalanceListener(ConsumerRebalanceListener):
def __init__(self):
self.should_seek = False
def on_partitions_revoked(self, revoked):
pass
def on_partitions_assigned(self, assigned):
self.should_seek = assigned is not None