faust icon indicating copy to clipboard operation
faust copied to clipboard

Stream processing stops due to StaleLeaderEpochCodeError exception

Open aidenprice opened this issue 4 years ago • 0 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run faust-streaming in a pod in a Kubernetes cluster watching Azure Eventhubs through their Kafka interface.

After a period of several days aiokafka will raise StaleLeaderEpochCodeError several times. As this error is not caught anywhere Faust application stops processing streams, but does not crash or restart.

N.B. I didn't see this behaviour when using robinhood/faust, I suspect the change to aiokafka might be part of it.

Something that my intuition suggests is related is that Eventhubs only retain messages in the queue for at most 7 days. So the ****-__assignor-__leader topic will eventually drain and appear empty from Faust's point of view.

I understand that working with Azure Eventhubs rather than proper Kafka is unsupported and appreciate any advice you can give. Even if there isn't a way to stop the exception happening (given my non-standard usage), if there was a way to catch the exception so I could cause the pod to restart that would be fine too.

I have to say that Faust is much better than any of the native Microsoft libraries for dealing with Eventhub message streams.

Expected behavior

Faust pods continue to process Eventhub message streams without interruption.

Actual behavior

StaleLeaderEpochCodeError exception occurs somewhat randomly after several days of operation but the app does not crash and so the Kubernetes pod does not restart.

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 66, in run
    self.service._start_thread()
  File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
    self.thread_loop.run_until_complete(self._serve())
> File "/usr/local/.venv/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 550, in commit
    await self._coordinator.commit_offsets(assignment, offsets)
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 964, in commit_offsets
    raise err
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 953, in commit_offsets
    await asyncio.shield(
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 1066, in _do_commit_offsets
    raise first_error
kafka.errors.StaleLeaderEpochCodeError: [Error 13] StaleLeaderEpochCodeError

Versions

  • Python version; 3.8.6
  • Faust version; faust-streaming 0.4.0
  • Operating system; Debian Slim in Python Docker image
  • Kafka version; Azure Eventhubs
  • RocksDB version (if applicable); librocksdb5.17

aidenprice avatar Jan 11 '21 22:01 aidenprice