faust
faust copied to clipboard
Stream.take not shutdown on SIGINT
If we use stream.take(..., within=...)
, then send value to topic, and send SIGINT signal, faust did not shutdown gracefully
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
import faust
app = faust.App(
"agent-check-take",
broker="kafka://localhost:9092"
)
check_topic = app.topic("test-topic", key_type=str, value_type=str)
@app.agent(check_topic)
async def test_agent(stream):
async for v in stream.take(10, within=10 * 60 * 60):
print("Received", v)
@app.timer(10)
async def test_timer():
print("Sending value to topic")
await check_topic.send(key="key", value="value")
if __name__ == "__main__":
app.main()
Expected behavior
Shutting down agent tasks
Actual behavior
Agent with stream.take
did not exit
Full traceback
[2021-06-16 00:11:11,071] [63784] [INFO] [^Worker]: Ready
[2021-06-16 00:11:21,077] [63784] [WARNING] Sending value to topic
^C-INT- -INT- -INT- -INT- -INT- -INT-
[2021-06-16 00:11:27,680] [63784] [INFO] [^Worker]: Signal received: Signals.SIGINT (2)
[2021-06-16 00:11:27,680] [63784] [INFO] [^Worker]: Stopping...
[2021-06-16 00:11:27,681] [63784] [INFO] [^-App]: Stopping...
[2021-06-16 00:11:27,681] [63784] [INFO] [^---Fetcher]: Stopping...
[2021-06-16 00:11:27,682] [63784] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2021-06-16 00:11:27,682] [63784] [INFO] [^-App]: Wait for streams...
[2021-06-16 00:11:36,891] [63784] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks [(1, <ConsumerMessage: TopicPartition(topic='test-topic', partition=0) offset=2>)]
[2021-06-16 00:11:36,892] [63784] [INFO] [^--Consumer]: Agent tracebacks:
=======================================
TRACEBACK OF ALL RUNNING AGENT ACTORS
=======================================
* agent_check_take.test_agent ----->
============================================================
['Stack for <coroutine object test_agent at 0x1039a40c0> (most recent call last):\n File "/Users/romanius/Projects/Work/Aermetric/laborer/agent_check_take.py", line 13, in test_agent\n async for v in stream.take(10, within=10 * 60 * 60):\n File "async_generator_asend", line -1, in [rest of traceback truncated]\n']
-eof tracebacks- :-)
........
[2021-06-16 00:16:10,984] [63784] [ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(topic='test-topic', partition=0) at all since worker start (started 5.02 minutes ago).
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
currently set to 300.0. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
2) The commit handler background thread has stopped working (report as bug).
Versions
- Python version: 3.9.4
- Faust version: master (fbaae90)
- Operating system: macOS 10.15
- Kafka version: Confluent Kafka 6.1.1
- RocksDB version (if applicable)
I am having the same errors with just regular streams, I always have to kill my cluster with: kill -9
Try stream_await_empty=False
parameter to faust.App
. That requires idempotent message handling, and there probably is still a bug in the shutdown sequence, but if you don't mind at-least-once message delivery semantics then it's a workaround