faust icon indicating copy to clipboard operation
faust copied to clipboard

Stream.take not shutdown on SIGINT

Open Roman1us opened this issue 3 years ago • 2 comments

If we use stream.take(..., within=...), then send value to topic, and send SIGINT signal, faust did not shutdown gracefully

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

import faust

app = faust.App(
    "agent-check-take",
    broker="kafka://localhost:9092"
)

check_topic = app.topic("test-topic", key_type=str, value_type=str)


@app.agent(check_topic)
async def test_agent(stream):
    async for v in stream.take(10, within=10 * 60 * 60):
        print("Received", v)


@app.timer(10)
async def test_timer():
    print("Sending value to topic")
    await check_topic.send(key="key", value="value")


if __name__ == "__main__":
    app.main()

Expected behavior

Shutting down agent tasks

Actual behavior

Agent with stream.take did not exit

Full traceback

[2021-06-16 00:11:11,071] [63784] [INFO] [^Worker]: Ready 
[2021-06-16 00:11:21,077] [63784] [WARNING] Sending value to topic 
^C-INT- -INT- -INT- -INT- -INT- -INT-
[2021-06-16 00:11:27,680] [63784] [INFO] [^Worker]: Signal received: Signals.SIGINT (2) 
[2021-06-16 00:11:27,680] [63784] [INFO] [^Worker]: Stopping... 
[2021-06-16 00:11:27,681] [63784] [INFO] [^-App]: Stopping... 
[2021-06-16 00:11:27,681] [63784] [INFO] [^---Fetcher]: Stopping... 
[2021-06-16 00:11:27,682] [63784] [INFO] [^--Consumer]: Consumer shutting down for user cancel. 
[2021-06-16 00:11:27,682] [63784] [INFO] [^-App]: Wait for streams... 
[2021-06-16 00:11:36,891] [63784] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='test-topic', partition=0) offset=2>)] 
[2021-06-16 00:11:36,892] [63784] [INFO] [^--Consumer]: Agent tracebacks:

=======================================
 TRACEBACK OF ALL RUNNING AGENT ACTORS
=======================================


* agent_check_take.test_agent ----->
============================================================
['Stack for <coroutine object test_agent at 0x1039a40c0> (most recent call last):\n  File "/Users/romanius/Projects/Work/Aermetric/laborer/agent_check_take.py", line 13, in test_agent\n    async for v in stream.take(10, within=10 * 60 * 60):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']



-eof tracebacks- :-)
 
........

[2021-06-16 00:16:10,984] [63784] [ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(topic='test-topic', partition=0) at all since worker start (started 5.02 minutes ago). 

There are multiple possible explanations for this:

1) The processing of a single event in the stream
   is taking too long.

    The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
    currently set to 300.0.  If you expect the time
    required to process an event, to be greater than this then please
    increase the timeout.

 2) The commit handler background thread has stopped working (report as bug). 

Versions

  • Python version: 3.9.4
  • Faust version: master (fbaae90)
  • Operating system: macOS 10.15
  • Kafka version: Confluent Kafka 6.1.1
  • RocksDB version (if applicable)

Roman1us avatar Jun 15 '21 18:06 Roman1us

I am having the same errors with just regular streams, I always have to kill my cluster with: kill -9

zikphil avatar Mar 07 '22 13:03 zikphil

Try stream_await_empty=False parameter to faust.App. That requires idempotent message handling, and there probably is still a bug in the shutdown sequence, but if you don't mind at-least-once message delivery semantics then it's a workaround

richardhundt avatar May 31 '23 22:05 richardhundt