faust icon indicating copy to clipboard operation
faust copied to clipboard

Faust agent didn't response after long period of inactivity in kafka topic

Open Xastur opened this issue 2 years ago • 0 comments

Checklist

  • [x] I have included information about relevant versions
  • [ ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I ran faust streaming like that

// faust app

faust_app = providers.Singleton(
    App,
    id=config.crm_kafka_group(),
    broker=brokers,
    store=config.rocksdb(),
)

// faust app in container (from dependency_injector import containers)

container.faust_app().agent(channel=container.topic_in(), name="faust_agent")(faust_agent)

// func itself

async def faust_agent(
    records,
    topic_error: Topic = Provide[Container.topic_error],
):
    async for record in records:
        do...

That's all, later none messages were in kafka for 27 days, first 18 days were ok, faust works normally, but later it started to generate logs more and faster, check pic below. And when we push message into kafka.

Expected behavior

Later, I tryied to reproduce, but normally even having this kind of errors faust catch new messages and process them.

Actual behavior

Faust didn't read it. After rebooting application it started read messages

Thoughts

I tried to reproduce the error in short distance, but couldn't. I think something happened in a long range distance when kafka hasn't got any activities, CPU and RAM were ok that day. Maybe you have some ideas what could happen?

Full traceback

image


[2023-09-18 15:42:38,960] [1] [ERROR] Unable connect to node with id 1:  

[2023-09-18 15:42:38,961] [1] [ERROR] Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1). 

[2023-09-18 15:46:27,729] [1] [ERROR] Unable connect to node with id 1:  

[2023-09-18 15:46:27,729] [1] [ERROR] Failed fetch messages from 1: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 1). 

[2023-09-18 15:47:36,079] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.48203237913548946 runtime=2.4840235710144043e-05 sleeptime=1.4820323791354895 

[2023-09-18 15:47:37,788] [1] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding... 

[2023-09-18 15:47:37,788] [1] [INFO] Timer _thread_keepalive-AIOKafkaConsumerThread woke up too late, with a drift of +2.1931118201464415 runtime=5.8999285101890564e-05 sleeptime=3.1931118201464415 

[2023-09-19 16:54:25,332] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=8) since start (started 20.33 days ago) 

[2023-09-19 16:54:25,333] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=5) since start (started 20.33 days ago) 

[2023-09-19 16:54:25,335] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=7) since start (started 20.33 days ago) 

[2023-09-19 16:54:25,335] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=4) since start (started 20.33 days ago) 

[2023-09-19 16:54:25,336] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='communication.private.services.crm.connect.prod-__assignor-__leader', partition=0) since start (started 20.33 days ago) 

[2023-09-19 16:54:25,336] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=7) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,384] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=6) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,386] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=9) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,387] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=8) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,388] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=5) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,389] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.communication.emailout', partition=7) since start (started 20.33 days ago) 

[2023-09-19 16:54:32,390] [1] [ERROR] [^---AIOKafkaConsumerThread]: Aiokafka has not sent fetch request for TP(topic='prod.c', partition=4) since start (started 20.33 days ago) 

and so on untill it get 27 days

Versions

  • Python version 3.10.8
  • Faust version 0.10.9
  • Operating system docker container python:3.10.8-slim-bullseye
  • Kafka version 3.4.0
  • RocksDB version (if applicable)

Xastur avatar Sep 29 '23 09:09 Xastur