faust icon indicating copy to clipboard operation
faust copied to clipboard

IllegalStateError causes consumer crash when commit is pending and a rebalance happens

Open bobh66 opened this issue 4 years ago • 0 comments

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When the Consumer has requested a commit on one or more partitions and a rebalance is triggered before the aiokafka consumer can perform the commit, it is possible that one or more of the partitions being committed is revoked by the rebalance and not re-assigned. This will cause an IllegalStateError exception from the kafka-python client code due to the attempt to commit a partition that isn't owned by the consumer.

The fix is to filter the commit partitions again in the aiokafka consumer commit code to ensure that they were not revoked during the await break.

Expected behavior

Commits after a rebalance should not cause an IllegalStateError exception

Actual behavior

Intermittent IllegalStateError exceptions after a rebalance

Full traceback

[2021-01-21 14:59:16,201] [9] [ERROR] [^---AIOKafkaConsumerThread]: Got exception: IllegalStateError("Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned")
Current assignment: '\n+Topic Partition Set-------+\n| topic     | partitions   |\n+-----------+--------------+\n| spe_kxe_5 | {25, 52, 67} |\n+-----------+--------------+'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 548, in commit
    "Partition {} is not assigned".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned

Versions

  • Python version 3.7
  • Faust version 0.4.6
  • Operating system - CentOS
  • Kafka version N/A
  • RocksDB version (if applicable) N/A

bobh66 avatar Feb 04 '21 14:02 bobh66