Stream processing stops due to StaleLeaderEpochCodeError exception
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Run faust-streaming in a pod in a Kubernetes cluster watching Azure Eventhubs through their Kafka interface.
After a period of several days aiokafka will raise StaleLeaderEpochCodeError several times. As this error is not caught anywhere Faust application stops processing streams, but does not crash or restart.
N.B. I didn't see this behaviour when using robinhood/faust, I suspect the change to aiokafka might be part of it.
Something that my intuition suggests is related is that Eventhubs only retain messages in the queue for at most 7 days. So the ****-__assignor-__leader topic will eventually drain and appear empty from Faust's point of view.
I understand that working with Azure Eventhubs rather than proper Kafka is unsupported and appreciate any advice you can give. Even if there isn't a way to stop the exception happening (given my non-standard usage), if there was a way to catch the exception so I could cause the pod to restart that would be fine too.
I have to say that Faust is much better than any of the native Microsoft libraries for dealing with Eventhub message streams.
Expected behavior
Faust pods continue to process Eventhub message streams without interruption.
Actual behavior
StaleLeaderEpochCodeError exception occurs somewhat randomly after several days of operation but the app does not crash and so the Kubernetes pod does not restart.
Full traceback
Traceback (most recent call last):
File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
self._bootstrap_inner()
File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 66, in run
self.service._start_thread()
File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
self.thread_loop.run_until_complete(self._serve())
> File "/usr/local/.venv/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
await consumer.commit(aiokafka_offsets)
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 550, in commit
await self._coordinator.commit_offsets(assignment, offsets)
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 964, in commit_offsets
raise err
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 953, in commit_offsets
await asyncio.shield(
File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 1066, in _do_commit_offsets
raise first_error
kafka.errors.StaleLeaderEpochCodeError: [Error 13] StaleLeaderEpochCodeError
Versions
- Python version;
3.8.6 - Faust version;
faust-streaming 0.4.0 - Operating system;
Debian Slim in Python Docker image - Kafka version;
Azure Eventhubs - RocksDB version (if applicable);
librocksdb5.17