faust
faust copied to clipboard
Faust agent didn't response after long period of inactivity in kafka topic
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I ran faust streaming like that
// faust app
faust_app = providers.Singleton(
App,
id=config.crm_kafka_group(),
broker=brokers,
store=config.rocksdb(),
)
// faust app in container (from dependency_injector import containers)
container.faust_app().agent(channel=container.topic_in(), name="faust_agent")(faust_agent)
// func itself
async def faust_agent(
records,
topic_error: Topic = Provide[Container.topic_error],
):
async for record in records:
do...
That's all, later none messages were in kafka for 27 days, first 18 days were ok, faust works normally, but later it started to generate logs more and faster, check pic below. And when we push message into kafka.
Expected behavior
Later, I tryied to reproduce, but normally even having this kind of errors faust catch new messages and process them.
Actual behavior
Faust didn't read it. After rebooting application it started read messages
Thoughts
I tried to reproduce the error in short distance, but couldn't. I think something happened in a long range distance when kafka hasn't got any activities, CPU and RAM were ok that day. Maybe you have some ideas what could happen?
Full traceback
[2023-09-18 15:42:38,960] [1] [ERROR] Unable connect to node with id 1:
[2023-09-18 15:42:38,961] [1] [ERROR] Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
[2023-09-18 15:46:27,729] [1] [ERROR] Unable connect to node with id 1:
[2023-09-18 15:46:27,729] [1] [ERROR] Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1).
[2023-09-18 15:47:36,079] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.48203237913548946 runtime=2.4840235710144043e-05 sleeptime=1.4820323791354895
[2023-09-18 15:47:37,788] [1] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2023-09-18 15:47:37,788] [1] [INFO] Timer _thread_keepalive-AIOKafkaConsumerThread woke up too late, with a drift of +2.1931118201464415 runtime=5.8999285101890564e-05 sleeptime=3.1931118201464415
[2023-09-19 16:54:25,332] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=8) since start (started 20.33 days ago)
[2023-09-19 16:54:25,333] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=5) since start (started 20.33 days ago)
[2023-09-19 16:54:25,335] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=7) since start (started 20.33 days ago)
[2023-09-19 16:54:25,335] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=4) since start (started 20.33 days ago)
[2023-09-19 16:54:25,336] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='communication.private.services.crm.connect.prod-__assignor-__leader', partition=0) since start (started 20.33 days ago)
[2023-09-19 16:54:25,336] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=7) since start (started 20.33 days ago)
[2023-09-19 16:54:32,384] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=6) since start (started 20.33 days ago)
[2023-09-19 16:54:32,386] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=9) since start (started 20.33 days ago)
[2023-09-19 16:54:32,387] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=8) since start (started 20.33 days ago)
[2023-09-19 16:54:32,388] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=5) since start (started 20.33 days ago)
[2023-09-19 16:54:32,389] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=7) since start (started 20.33 days ago)
[2023-09-19 16:54:32,390] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=4) since start (started 20.33 days ago)
and so on untill it get 27 days
Versions
- Python version 3.10.8
- Faust version 0.10.9
- Operating system docker container python:3.10.8-slim-bullseye
- Kafka version 3.4.0
- RocksDB version (if applicable)