Consumer works for sometime with Kafka on Azure Event Hub and dies after couple of days or week.
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I'm using Azure Event Hub enabled with Kafka Interface. I'm running faust worker in my python application running inside a docker container. This works well without any issues. After sometime, the data is still flowing into a topic from an external source to event hub but the faust consumer is not able to pull the data. I've restart to my python application manually to fix this. I captured below error log where we can see the faust worker crashed due to Heartbeat request timedout error.
Expected behavior
It should run indefinitely without restarting manually
Actual behavior
Sometimes it dies and we have to restart the application manually to get it running normally.
Full traceback
2023-01-22 01:10:17,946 — aiokafka.consumer.group_coordinator Thread-3 — ERROR — _do_heartbeat:817 — Heartbeat failed: KafkaError('Unexpected exception in heartbeat task: RequestTimedOutError()')
2023-01-22 01:10:17,947 — faust.transport.drivers.aiokafka MainThread — ERROR — _drain_messages:1226 — [^--Consumer]: Drain messages raised: KafkaError('Unexpected exception in heartbeat task: RequestTimedOutError()')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1181, in _drain_messages
async for tp, message in ait:
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 728, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 788, in _wait_next_records
records = await self._getmany(
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1419, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 990, in getmany
return await self.call_thread(
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 430, in call_thread
result = await promise
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 379, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 138, in maybe_async
return await res
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1012, in _fetch_records
return await fetcher.fetched_records(
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1100, in fetched_records
waiter.result() # Check for authorization errors
kafka.errors.KafkaError: KafkaError: Unexpected exception in heartbeat task: RequestTimedOutError()
2023-01-22 01:10:17,949 — faust.transport.consumer MainThread — ERROR — crash:868 — [^---Fetcher]: Crashed reason=KafkaError('Unexpected exception in heartbeat task: RequestTimedOutError()')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 843, in _execute_task
await task
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 177, in _fetcher
await consumer._drain_messages(self)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1181, in _drain_messages
async for tp, message in ait:
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 728, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 788, in _wait_next_records
records = await self._getmany(
File "/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py", line 1419, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 990, in getmany
return await self.call_thread(
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 430, in call_thread
result = await promise
File "/usr/local/lib/python3.8/site-packages/mode/threads.py", line 379, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 138, in maybe_async
return await res
File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1012, in _fetch_records
return await fetcher.fetched_records(
File "/usr/local/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 1100, in fetched_records
waiter.result() # Check for authorization errors
kafka.errors.KafkaError: KafkaError: Unexpected exception in heartbeat task: RequestTimedOutError()
2023-01-22 01:10:17,951 — faust.worker MainThread — INFO — _log_mundane:864 — [^Worker]: Stopping...
2023-01-22 01:10:17,951 — faust.app.base MainThread — INFO — _log_mundane:864 — [^-App]: Stopping...
2023-01-22 01:10:17,951 — faust.transport.consumer MainThread — INFO — _log_mundane:864 — [^---Fetcher]: Stopping...
2023-01-22 01:10:17,952 — faust.app.base MainThread — INFO — _consumer_wait_empty:1617 — [^-App]: Wait for streams...
2023-01-22 01:10:17,952 — faust.tables.manager MainThread — INFO — _log_mundane:864 — [^--TableManager]: Stopping...
2023-01-22 01:10:17,953 — faust.transport.consumer MainThread — INFO — _log_mundane:864 — [^---Fetcher]: Stopping...
2023-01-22 01:10:17,953 — faust.tables.recovery MainThread — INFO — _log_mundane:864 — [^---Recovery]: Stopping...
2023-01-22 01:10:17,954 — faust.app.base MainThread — INFO — _producer_flush:1594 — [^-App]: Flush producer buffer...
2023-01-22 01:10:17,954 — faust.transport.conductor MainThread — INFO — _log_mundane:864 — [^---Conductor]: Stopping...
2023-01-22 01:10:17,955 — faust.agents.manager MainThread — INFO — _log_mundane:864 — [^--AgentManager]: Stopping...
2023-01-22 01:10:17,955 — faust.agents.agent MainThread — INFO — _log_mundane:864 — [^---Agent: src.app.main_stream.receive]: Stopping...
2023-01-22 01:10:17,955 — mode.supervisors MainThread — INFO — _log_mundane:864 — [^----OneForOneSupervisor: (1@0x7fa9f61dbc10)]: Stopping...
2023-01-22 01:10:17,956 — faust.agents.replies MainThread — INFO — _log_mundane:864 — [^--ReplyConsumer]: Stopping...
2023-01-22 01:10:17,957 — faust.assignor.leader_assignor MainThread — INFO — _log_mundane:864 — [^--LeaderAssignor]: Stopping...
2023-01-22 01:10:17,957 — faust.transport.drivers.aiokafka MainThread — INFO — _log_mundane:864 — [^--Consumer]: Stopping...
2023-01-22 01:10:17,957 — faust.transport.drivers.aiokafka MainThread — INFO — _log_mundane:864 — [^---AIOKafkaConsumerThread]: Stopping...
2023-01-22 01:10:17,968 — aiokafka.consumer.group_coordinator Thread-3 — INFO — _maybe_leave_group:359 — LeaveGroup request succeeded
2023-01-22 01:10:17,971 — faust.web.drivers.aiohttp MainThread — INFO — _log_mundane:864 — [^--Web]: Stopping...
2023-01-22 01:10:17,972 — faust.web.drivers.aiohttp MainThread — INFO — _log_mundane:864 — [^---Server]: Stopping...
2023-01-22 01:10:17,972 — faust.web.drivers.aiohttp MainThread — INFO — _cleanup_app:318 — [^--Web]: Cleanup
2023-01-22 01:10:17,973 — faust.web.cache.backends.base MainThread — INFO — _log_mundane:864 — [^--CacheBackend]: Stopping...
2023-01-22 01:10:17,973 — faust.transport.drivers.aiokafka MainThread — INFO — _log_mundane:864 — [^--Producer]: Stopping...
2023-01-22 01:10:17,975 — faust.transport.producer MainThread — INFO — _log_mundane:864 — [^---ProducerBuffer]: Stopping...
2023-01-22 01:10:17,976 — faust.sensors.monitor MainThread — INFO — _log_mundane:864 — [^--Monitor]: Stopping...
2023-01-22 01:10:17,976 — faust.worker MainThread — INFO — _shutdown_loop:306 — [^Worker]: Gathering service tasks...
2023-01-22 01:10:17,977 — faust.worker MainThread — INFO — _shutdown_loop:310 — [^Worker]: Gathering all futures...
2023-01-22 01:10:18,977 — faust.worker MainThread — INFO — _shutdown_loop:325 — [^Worker]: Closing event loop
2023-01-22 01:10:18,979 — faust.worker MainThread — CRITICAL — _shutdown_loop:328 — [^Worker]: We experienced a crash! Reraising original exception...
Versions
- Python version - 3.8
- Faust version - 0.10.1
I've seen errors similar to this before, still unsure what's causing them. What version of mode-streaming are you using presently? I have a theory that v0.3.4 could be causing issues.
I've seen errors similar to this before, still unsure what's causing them. What version of
mode-streamingare you using presently? I have a theory thatv0.3.4could be causing issues.
I'm using mode-streaming v0.3.5.
Is there a provision in the library to catch the exception in my application and reconnect to the event hub without crashing?
This seems to be more of a aiokafka bug. I know https://github.com/aio-libs/aiokafka/issues/624 isn't directly related since it's for a producer instead of a heartbeat, but the note # Check for authorization errors makes me wonder.
Sadly I need to mark this as wontfix because this bug seems to be related to aiokafka. Sorry I can't help more.