faust icon indicating copy to clipboard operation
faust copied to clipboard

Unable to terminate faust worker programatically

Open hlacikd opened this issue 2 years ago • 3 comments

Checklist

  • [ x] I have included information about relevant versions
  • [ x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

when agent processes message, we use psycopg3 to search for records in DB. In case db goes offline, we need to terminate faust worker and let kubernetes to do its work - restart container until it will not stop crashing.

        try:
            conn = psycopg.connect(
                f"dbname={dbname} user={user} password={password} host={host}"
            )
            self.cur = conn.cursor(row_factory=dict_row)
        except psycopg.Error as e:
            logging.error(f"Error connecting to database: {e}")
            sys.exit(1)

Expected behavior

We expect to sys.exit(1) or smth to terminate whole faust worker, so container can be restarted.

Actual behavior

faust will stay in Stopping state indefinitely

Full traceback

[records-checker] Traceback (most recent call last):
[records-checker]   File "/usr/local/lib/python3.8/dist-packages/mode/worker.py", line 279, in execute_from_commandline
[records-checker]     self.loop.run_until_complete(self._starting_fut)
[records-checker]   File "/usr/lib/python3.8/asyncio/base_events.py", line 603, in run_until_complete
[records-checker]     self.run_forever()
[records-checker]   File "/usr/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
[records-checker]     self._run_once()
[records-checker]   File "/usr/lib/python3.8/asyncio/base_events.py", line 1859, in _run_once
[records-checker]     handle._run()
[records-checker]   File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
[records-checker]     self._context.run(self._callback, *self._args)
[records-checker]   File "/usr/local/lib/python3.8/dist-packages/faust/agents/agent.py", line 674, in _execute_actor
[records-checker]     await coro
[records-checker]   File "/app/records.py", line 153, in process_records
[records-checker]     record_checker_obj.get_records(message["detection"]["licensePlateID"])
[records-checker]   File "/app/records.py", line 75, in get_records
[records-checker]     sys.exit(1)
[records-checker] SystemExit: 1
[records-checker] [2022-07-08 01:58:02,607] [1] [INFO] [^--Consumer]: Consumer shutting down for user cancel. 
[records-checker] [2022-07-08 01:58:02,612] [1] [INFO] [^-App]: Wait for streams... 
[records-checker] [2022-07-08 01:58:02,626] [1] [INFO] [^--TableManager]: Stopping... 
[records-checker] [2022-07-08 01:58:02,627] [1] [INFO] [^---Fetcher]: Stopping... 
[records-checker] [2022-07-08 01:58:02,630] [1] [INFO] [^---Recovery]: Stopping... 
[records-checker] [2022-07-08 01:58:02,634] [1] [INFO] [^-App]: Flush producer buffer... 
[records-checker] [2022-07-08 01:58:02,636] [1] [INFO] [^---Conductor]: Stopping... 
[records-checker] [2022-07-08 01:58:02,639] [1] [INFO] [^--AgentManager]: Stopping... 
[records-checker] [2022-07-08 01:58:02,643] [1] [INFO] [^---Agent: __main__.process_detections]: Stopping... 
[records-checker] [2022-07-08 01:58:02,645] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f99d27700)]: Stopping... 
[records-checker] [2022-07-08 01:58:02,651] [1] [INFO] [^---Agent: __main__.process_records]: Stopping... 
[records-checker] [2022-07-08 01:58:02,653] [1] [INFO] [^----OneForOneSupervisor: (1@0x7f98cd09a0)]: Stopping... 

Versions

  • Python version 3.8
  • Faust version (latest)
  • Operating system ubuntu 20.04.4 on arm64
  • Kafka version 3.2.0
  • RocksDB version (if applicable)

hlacikd avatar Jul 07 '22 23:07 hlacikd

Do you use liveness and readiness probes by k8s? I would rather avoid using sys.exit but check the db on the health endpoint for liveness probe. In that case k8s would trigger the sig kill which is then processed by faust.

dada-engineer avatar Jul 11 '22 15:07 dada-engineer

@hlacikd We stop our agents with agent_stopper. See https://github.com/faust-streaming/faust/pull/329

zerafachris avatar Jul 26 '22 08:07 zerafachris

You should be using the k8's liveness check instead of going this route. This is how you crash the app if there is ever a need to do it https://github.com/faust-streaming/faust/blob/master/faust/stores/aerospike.py#L261

patkivikram avatar Jul 26 '22 12:07 patkivikram

for future readers, the link in the previous comment no longer points to the correct line of code (because it points to master rather than a specific commit), follow this link instead: https://github.com/faust-streaming/faust/blob/0bb2685e545d28c4e8a604cc748c12e0911c1260/faust/stores/aerospike.py#L261

hassanselim0 avatar Apr 02 '24 22:04 hassanselim0