faust
faust copied to clipboard
Unable to terminate faust worker programatically
Checklist
- [ x] I have included information about relevant versions
- [ x ] I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
when agent processes message, we use psycopg3 to search for records in DB. In case db goes offline, we need to terminate faust worker and let kubernetes to do its work - restart container until it will not stop crashing.
try:
conn = psycopg.connect(
f"dbname={dbname} user={user} password={password} host={host}"
)
self.cur = conn.cursor(row_factory=dict_row)
except psycopg.Error as e:
logging.error(f"Error connecting to database: {e}")
sys.exit(1)
Expected behavior
We expect to sys.exit(1) or smth to terminate whole faust worker, so container can be restarted.
Actual behavior
faust will stay in Stopping state indefinitely
Full traceback
[records-checker] Traceback (most recent call last):
[records-checker] File "/usr/local/lib/python3.8/dist-packages/mode/worker.py", line 279, in execute_from_commandline
[records-checker] self.loop.run_until_complete(self._starting_fut)
[records-checker] File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
[records-checker] self.run_forever()
[records-checker] File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
[records-checker] self._run_once()
[records-checker] File "/usr/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
[records-checker] handle._run()
[records-checker] File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
[records-checker] self._context.run(self._callback, *self._args)
[records-checker] File "/usr/local/lib/python3.8/dist-packages/faust/agents/agent.py", line 674, in _execute_actor
[records-checker] await coro
[records-checker] File "/app/records.py", line 153, in process_records
[records-checker] record_checker_obj.get_records(message["detection"]["licensePlateID"])
[records-checker] File "/app/records.py", line 75, in get_records
[records-checker] sys.exit(1)
[records-checker] SystemExit: 1
[records-checker] [2022-07-08 01:58:02,607] [1] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[records-checker] [2022-07-08 01:58:02,612] [1] [INFO] [^-App]: Wait for streams...
[records-checker] [2022-07-08 01:58:02,626] [1] [INFO] [^--TableManager]: Stopping...
[records-checker] [2022-07-08 01:58:02,627] [1] [INFO] [^---Fetcher]: Stopping...
[records-checker] [2022-07-08 01:58:02,630] [1] [INFO] [^---Recovery]: Stopping...
[records-checker] [2022-07-08 01:58:02,634] [1] [INFO] [^-App]: Flush producer buffer...
[records-checker] [2022-07-08 01:58:02,636] [1] [INFO] [^---Conductor]: Stopping...
[records-checker] [2022-07-08 01:58:02,639] [1] [INFO] [^--AgentManager]: Stopping...
[records-checker] [2022-07-08 01:58:02,643] [1] [INFO] [^---Agent: __main__.process_detections]: Stopping...
[records-checker] [2022-07-08 01:58:02,645] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f99d27700)]: Stopping...
[records-checker] [2022-07-08 01:58:02,651] [1] [INFO] [^---Agent: __main__.process_records]: Stopping...
[records-checker] [2022-07-08 01:58:02,653] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f98cd09a0)]: Stopping...
Versions
- Python version 3.8
- Faust version (latest)
- Operating system ubuntu 20.04.4 on arm64
- Kafka version 3.2.0
- RocksDB version (if applicable)
Do you use liveness and readiness probes by k8s? I would rather avoid using sys.exit but check the db on the health endpoint for liveness probe. In that case k8s would trigger the sig kill which is then processed by faust.
@hlacikd We stop our agents with agent_stopper. See https://github.com/faust-streaming/faust/pull/329
You should be using the k8's liveness check instead of going this route. This is how you crash the app if there is ever a need to do it https://github.com/faust-streaming/faust/blob/master/faust/stores/aerospike.py#L261
for future readers, the link in the previous comment no longer points to the correct line of code (because it points to master rather than a specific commit), follow this link instead: https://github.com/faust-streaming/faust/blob/0bb2685e545d28c4e8a604cc748c12e0911c1260/faust/stores/aerospike.py#L261