faust
faust copied to clipboard
IllegalStateError causes consumer crash when commit is pending and a rebalance happens
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
When the Consumer has requested a commit on one or more partitions and a rebalance is triggered before the aiokafka consumer can perform the commit, it is possible that one or more of the partitions being committed is revoked by the rebalance and not re-assigned. This will cause an IllegalStateError exception from the kafka-python client code due to the attempt to commit a partition that isn't owned by the consumer.
The fix is to filter the commit partitions again in the aiokafka consumer commit code to ensure that they were not revoked during the await break.
Expected behavior
Commits after a rebalance should not cause an IllegalStateError exception
Actual behavior
Intermittent IllegalStateError exceptions after a rebalance
Full traceback
[2021-01-21 14:59:16,201] [9] [ERROR] [^---AIOKafkaConsumerThread]: Got exception: IllegalStateError("Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned")
Current assignment: '\n+Topic Partition Set-------+\n| topic | partitions |\n+-----------+--------------+\n| spe_kxe_5 | {25, 52, 67} |\n+-----------+--------------+'
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
await consumer.commit(aiokafka_offsets)
File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 548, in commit
"Partition {} is not assigned".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned
Versions
- Python version 3.7
- Faust version 0.4.6
- Operating system - CentOS
- Kafka version N/A
- RocksDB version (if applicable) N/A