kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Seeking on a consumer from the rebalance listener causes consumer to silently discard records

Open rvesse opened this issue 2 years ago • 0 comments

Discovered what appears to be a multi-threading bug that can be provoked by trying to do a seek from a rebalance listener. After the seek the consumer returns the record at the seeked offset and then silently resets the position to some subsequent offset potentially skipping a bunch of records.

Turning on Python logging shows that the fetcher is incorrectly thinking that there are compacted/deleted records to skip and resetting the position i.e. the code block at https://github.com/dpkp/kafka-python/blob/f19e4238fb47ae2619f18731f0e0e9a3762cfa11/kafka/consumer/fetcher.py#L674-L682 is triggered

I understand that the package is not fully type-safe so this probably should not be expected to work but wanted to document in case anyone else runs into this.

Code to Reproduce

def str_to_bytes(value: str) -> bytes:
    return value.encode("UTF-8")


def bytes_to_str(value: bytes) -> str:
    return value.decode("UTF-8")


def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener(consumer)
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")

class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self, consumer):
        self.consumer = consumer

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        for partition in assigned:
            self.consumer.seek(partition, 0)
            self.consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))


if __name__ == "__main__":
    consumer_fetch_misses_records()

Produces the following output:

[test-0#0] 1: 1

When it should produce all 10 lines i.e.:

[test-0#0] 1: 1
[test-0#0] 1: 1
[test-0#1] 2: 2
[test-0#2] 3: 3
[test-0#3] 4: 4
[test-0#4] 5: 5
[test-0#5] 6: 6
[test-0#6] 7: 7
[test-0#7] 8: 8
[test-0#8] 9: 9
[test-0#9] 10: 10

Workaround

The workaround I found is to have the rebalance listener set a boolean flag when the assignment changes and then have my main loop seek appropriately e.g.

def consumer_fetch_misses_records():
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"],
                             key_serializer=str_to_bytes,
                             value_serializer=str_to_bytes)
    for i in range(1, 11):
        producer.send(topic="test", key=str(i), value=str(i))

    producer.close()

    consumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="test",
                             key_deserializer=bytes_to_str,
                             value_deserializer=bytes_to_str)
    partition = TopicPartition("test", 0)
    listener = TestRebalanceListener()
    consumer.subscribe("test", listener=listener)

    for i, record in enumerate(consumer):
        if listener.should_seek:
            consumer.seek(partition, 0)
            consumer.commit(offsets=dict([(partition, OffsetAndMetadata(0, "Reset to beginning"))]))
            listener.should_seek = False
        print(f"[{record.topic}-{record.partition}#{record.offset}] {record.key}: {record.value}")


class TestRebalanceListener(ConsumerRebalanceListener):

    def __init__(self):
        self.should_seek = False

    def on_partitions_revoked(self, revoked):
        pass

    def on_partitions_assigned(self, assigned):
        self.should_seek = assigned is not None

rvesse avatar Mar 23 '22 16:03 rvesse