aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

How does partition reassignment occur instantly when on_partitions_revoked suggests committing offsets?

Open mdanish0320 opened this issue 10 months ago • 0 comments

Hi team,

I’m trying to understand how partition reassignment works in aiokafka and ran into some confusion regarding the behavior described in the documentation for ConsumerRebalanceListener. Specifically, the documentation suggests committing offsets in on_partitions_revoked before partitions are reassigned. Here is the relevant snippet from the [documentation](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.abc.ConsumerRebalanceListener):

async def on_partitions_revoked(self, revoked_partitions):
    self.logger.info(f"Partitions revoked: {revoked_partitions}")
    if revoked_partitions:
        self.consumer.commit(offsets=self.kafka_consumer_service.kafka_client.offsets_to_commit)

However, in practice, I’m observing the following issue:

Application logs:

2025-01-10 21:17:30,802 - main - INFO - Partitions revoked: frozenset({TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1)})

Kafka Consumer Configuration

Here is the configuration I’m using for the Kafka consumer:

kafka_config = {
    "bootstrap_servers": os.environ["KAFKA_BOOTSTRAP_SERVER_URLS"],
    "sasl_mechanism": os.environ["KAFKA_SASL_MECHANISM"],
    "sasl_plain_username": os.environ.get("KAFKA_SERVER_USERNAME"),
    "sasl_plain_password": os.environ.get("KAFKA_SERVER_PASSWORD"),
    "security_protocol": os.environ["KAFKA_SECURITY_PROTOCOL"],
    "group_id": os.environ["KAFKA_CONSUMER_GROUP_ID"],
    "auto_offset_reset": "latest",
    "value_deserializer": lambda v: json.loads(v.decode("utf-8")),
    "key_deserializer": lambda v: v.decode("utf-8") if v else None,
    "enable_auto_commit": False,
    "heartbeat_interval_ms": 60000,
    "session_timeout_ms": 60000,
    "max_poll_interval_ms": 60000,  
    "request_timeout_ms": 60000,  
}

This is immediately followed by an error:

aiokafka.errors.CommitFailedError: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms, which\n typically implies that the poll loop is spending too much\n time message processing. You can address this either by\n increasing the rebalance timeout with max_poll_interval_ms,\n or by reducing the maximum size of batches returned in poll()\n with max_poll_records.\n ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')

My question is:

  1. How does the partition reassignment seem to occur instantly, even though the documentation suggests that the on_partitions_revoked method is the place to commit offsets before reassignment?
  2. Does this behavior imply that the partitions are reassigned before on_partitions_revoked finishes its execution? If so, how can I ensure my offsets are safely committed in such cases?

Thank you in advance for your help and clarification!

mdanish0320 avatar Jan 10 '25 21:01 mdanish0320