aiokafka
aiokafka copied to clipboard
How does partition reassignment occur instantly when on_partitions_revoked suggests committing offsets?
Hi team,
I’m trying to understand how partition reassignment works in aiokafka and ran into some confusion regarding the behavior described in the documentation for ConsumerRebalanceListener. Specifically, the documentation suggests committing offsets in on_partitions_revoked before partitions are reassigned. Here is the relevant snippet from the [documentation](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.abc.ConsumerRebalanceListener):
async def on_partitions_revoked(self, revoked_partitions):
self.logger.info(f"Partitions revoked: {revoked_partitions}")
if revoked_partitions:
self.consumer.commit(offsets=self.kafka_consumer_service.kafka_client.offsets_to_commit)
However, in practice, I’m observing the following issue:
Application logs:
2025-01-10 21:17:30,802 - main - INFO - Partitions revoked: frozenset({TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1)})
Kafka Consumer Configuration
Here is the configuration I’m using for the Kafka consumer:
kafka_config = {
"bootstrap_servers": os.environ["KAFKA_BOOTSTRAP_SERVER_URLS"],
"sasl_mechanism": os.environ["KAFKA_SASL_MECHANISM"],
"sasl_plain_username": os.environ.get("KAFKA_SERVER_USERNAME"),
"sasl_plain_password": os.environ.get("KAFKA_SERVER_PASSWORD"),
"security_protocol": os.environ["KAFKA_SECURITY_PROTOCOL"],
"group_id": os.environ["KAFKA_CONSUMER_GROUP_ID"],
"auto_offset_reset": "latest",
"value_deserializer": lambda v: json.loads(v.decode("utf-8")),
"key_deserializer": lambda v: v.decode("utf-8") if v else None,
"enable_auto_commit": False,
"heartbeat_interval_ms": 60000,
"session_timeout_ms": 60000,
"max_poll_interval_ms": 60000,
"request_timeout_ms": 60000,
}
This is immediately followed by an error:
aiokafka.errors.CommitFailedError: CommitFailedError: ('Commit cannot be completed since the group has already\n rebalanced and assigned the partitions to another member.\n This means that the time between subsequent calls to poll()\n was longer than the configured max_poll_interval_ms, which\n typically implies that the poll loop is spending too much\n time message processing. You can address this either by\n increasing the rebalance timeout with max_poll_interval_ms,\n or by reducing the maximum size of batches returned in poll()\n with max_poll_records.\n ', 'Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member')
My question is:
- How does the partition reassignment seem to occur instantly, even though the documentation suggests that the
on_partitions_revokedmethod is the place to commit offsets before reassignment? - Does this behavior imply that the partitions are reassigned before
on_partitions_revokedfinishes its execution? If so, how can I ensure my offsets are safely committed in such cases?
Thank you in advance for your help and clarification!