[Python] Graceful shutdown immediatly close client connection
Hi,
I'm still trying to replace some Gunicorn with Unit, but I have issues when I try to shut down Unit gracefully. I expect it to properly handle the current requests before shutting down.
So, I have Unit running into a container, I made a request with curl, and during the process I stop the container with: sudo docker stop -t 60 -s 9 django_unit.
The process continues, but the socket is closed. So the curl gets an error with an empty reply:
romain@grace-hopper:~ curl http://localhost:8000/test/timeout
curl: (52) Empty reply from serve
In the container output, I see that the process continues, but the server can't respond to the client at the end:
0
1
2
2023/05/11 14:29:09 [notice] 1#1 process 44 exited with code 0
2023/05/11 14:29:09 [notice] 1#1 process 45 exited with code 0
3
4
2023/05/11 14:29:23 [warn] 47#47 [unit] sendmsg(11, 311) failed: Broken pipe (32)
2023/05/11 14:29:23 [notice] 46#46 app process 47 exited with code 0
2023/05/11 14:29:23 [alert] 46#46 sendmsg(13, -1, -1, 2) failed (32: Broken pipe)
2023/05/11 14:29:23 [notice] 1#1 process 46 exited with code 0
Did I miss some configuration? I put all the necessary stuff to reproduce this here https://github.com/RomainMou/unit-django-issue-headers#timeout.
I also encountered the same issue. Does anyone have any ideas?
I wrote a script that checks the current number of connections every second and ends when the connection count is zero. I set it as a preStop probe for k8s, solving the connection problem when the server shuts down.
#!/bin/bash
while true; do
connection_count=$(curl --unix-socket /var/run/control.unit.sock http://localhost/status/connections/active 2>/dev/null | grep -oP '\d+')
echo "ACTIVE: $connection_count"
if [ "$connection_count" -eq 0 ]; then
echo "CLOSE"
break
fi
sleep 1
done
Thanks @TokuiNico for sharing this! The root cause reported by @RomainMou needs still some investigation on our side. I would like to test this insight and outside the container. I will give this issue another push to see whats the general shutdown behaviour for Unit.
I've tried to understand what happens, I'm not really sure if I understand how all processes communicate, but I think something like this happens:
- The main process receives the termination signal.
- It calls
nxt_main_process_sigterm_handler, which callsnxt_process_quit, which callsnxt_runtime_quit, which callsnxt_runtime_stop_all_processes, which sendsNXT_PORT_MSG_QUITto all processes. - One of the processes receiving
NXT_PORT_MSG_QUITis thenxt_router. It will do some tasks, like callingnxt_runtime_quitand exiting. This breaks all connections with all clients.
I tried to confirm that by asking the nxt_router to wait for the apps to close before exiting with this dirty hack:
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 9bfabc75..9c2fde9b 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -11,7 +11,8 @@
#include <nxt_main_process.h>
#include <nxt_router.h>
#include <nxt_regex.h>
-
+#include <sys/types.h>
+#include <signal.h>
static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
nxt_runtime_t *rt);
@@ -454,6 +455,28 @@ nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
nxt_runtime_stop_all_processes(task, rt);
done = 0;
}
+ else if (rt->type == NXT_PROCESS_ROUTER) {
+ nxt_process_t *process;
+ int app_count;
+
+ // Wait for apps to stopped before stopping the router
+ for (;;) {
+ app_count = 0;
+ nxt_runtime_process_each(rt, process) {
+ if (nxt_process_type(process) == NXT_PROCESS_APP && process->use_count > 0) {
+ // check if process->pid still running
+ if (kill(process->pid, 0) == 0) {
+ app_count++;
+ }
+ }
+ } nxt_runtime_process_loop;
+ if (app_count == 0) {
+ break;
+ }
+ nxt_trace(task, "waiting for %d app processes", app_count);
+ sleep(1);
+ }
+ }
}
nxt_runtime_close_idle_connections(engine);
With this change, the app is able to finish processing and send the answer to the client:
0
2023/11/27 14:12:00 [notice] 1#1 process 51 exited with code 0
2023/11/27 14:12:00 [notice] 53#53 app process 54 exited with code 0
2023/11/27 14:12:00 [notice] 53#53 app process 56 exited with code 0
2023/11/27 14:12:00 [notice] 53#53 app process 55 exited with code 0
1
2023/11/27 14:12:00 [notice] 52#52 waiting for 1 app processes
[...]
2023/11/27 14:12:00 [notice] 52#52 waiting for 1 app processes
172.17.0.1 - - [27/Nov/2023:14:12:21 +0000] "GET /test/timeout HTTP/1.1" 200 19 "-" "curl/8.4.0"
2023/11/27 14:12:21 [notice] 53#53 app process 57 exited with code 0
2023/11/27 14:12:21 [notice] 1#1 process 53 exited with code 0
2023/11/27 14:12:21 [notice] 1#1 process 52 exited with code 0
One downside I noticed is that new connections are still accepted during the time it waits for the end of the processing. All new connections will receive an empty reply when the current request finishes processing and everything quits.
Hello, just to let you know that the issue is still present in the last version (1.33.0) of Unit. Is there anything I can do to help further on this issue?
The issue is still present on 1.34.2 and affects PHP too – probably all app modules?
It seems the only solution currently is @TokuiNico's graceful shutdown script + you need to stop listening for new connections by doing DELETE /config/listeners before starting to poll for active connections.
EDIT: another solution that seems to work is to define a custom entrypoint as pid1 which calls the built-in entrypoint
Signals work as expected & connections are drained before exit 👇
#!/bin/bash
/usr/local/bin/docker-entrypoint.sh "$@"
The same issue as without custom entrypoint 👇
#!/bin/bash
exec /usr/local/bin/docker-entrypoint.sh "$@"
I tried this patch instead:
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index de76f19e..62d65277 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -458,6 +458,16 @@ nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
nxt_runtime_close_idle_connections(engine);
+ if (rt->type == NXT_PROCESS_ROUTER) {
+ nxt_app_t *app;
+ nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
+ while (app->active_requests > 0) {
+ nxt_debug(task, "waiting for app %V (active request %d)",
+ &app->name, app->active_requests);
+ sleep(1);
+ }
+ } nxt_queue_loop;
+ }
if (done) {
nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
task, rt, engine);
This works for simple curl test. But it doesn't work when I send many requests by wrk.
Although nxt_process_quit closes all listening sockets, wrk uses keep-alive.
nxt_router should return Connection: close header and close client connection.
I removed listeners and apps during requests.
$ curl -X PUT --data-binary @noapps.json --unix-socket ./build/var/run/unit/control.unit.sock 'http://localhost/config'
{
"success": "Reconfiguration done."
}
$ curl --unix-socket build/var/run/unit/control.unit.sock http://localhost:8080/config
{
"listeners": {},
"applications": {}
}
With curl, it looks graceful shutdown.
$ curl http://localhost:8080/
(app and listeneres are removed from another session)
(waiting few seconds...)
Hello, WSGI
$ curl http://localhost:8080/
curl: (7) Failed to connect to localhost port 8080 after 0 ms: Couldn't connect to server
But with wrk (keep-alive), it caused tiemout.
$ wrk -c 8 -d 50 http://localhost:8080/
Running 50s test @ http://localhost:8080/
2 threads and 8 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.27ms 559.22us 2.47ms 87.50%
Req/Sec 0.00 0.00 0.00 100.00%
16 requests in 50.03s, 2.20KB read
Socket errors: connect 0, read 0, write 11020887, timeout 8
Non-2xx or 3xx responses: 8
Requests/sec: 0.32
Transfer/sec: 44.93B
To reproduce, my wsgi.py and config.
$ cat wsgi.py
import time
def application(environ, start_response):
time.sleep(10)
start_response('200 OK', [('Content-Type', 'text/plain')])
yield b'Hello, WSGI\n'
$ cat config.json
{
"listeners": {
"127.0.0.1:8080": {
"pass": "applications/python"
}
},
"applications": {
"python": {
"type": "python",
"module": "wsgi",
"path": "/home/methane/work/unit",
"threads": 20
}
}
}
To fix this issue, we need to significantly redesign the Graceful Shutdown process.
- When a listener is removed, the controller process should not only stop accepting new connections, but also stop keep-alive for existing connections accepted by that listener.
- The main process should remove all listeners from the controller when it receives a graceful shutdown signal, and wait until the number of connections reaches 0. The termination of the apps process should be left to the controller process.
I attempted to implement (1), but it was not easy as I am still not fully familiar with the Unit source code. I am not sure how to check if the connection that returns the HTTP response was created by a listener that has been removed.
@ac000 would you work on this issue by the end of this month? Or may I continue to attempt the implementation of (1)?
- When a listener is removed, the controller process should not only stop accepting new connections, but also stop keep-alive for existing connections accepted by that listener.
It's actually the router process that has the listen(2) socket(s).
The main process does the bind(2)'s and passes the fd's to the router process via SCM_RIGHTS.
- The main process should remove all listeners from the controller when it receives a graceful shutdown signal, and wait until the number of connections reaches 0. The termination of the apps process should be left to the controller process.
The application processes are children of the prototype processes, which are in turn children of the main process.
I attempted to implement (1), but it was not easy as I am still not fully familiar with the Unit source code. I am not sure how to check if the connection that returns the HTTP response was created by a listener that has been removed.
I think really all that needs to be done is to call close(2) on the listen sockets. However this is slightly hindered by the fact that the router process doesn't have a list of all the listen sockets.
We call into nxt_runtime_quit() twice. The first time is when we'd want to
close the listen sockets. The second time wait for active connections to drop
to 0.
(This is without having to completely re-architecture the whole thing)
I have been looking into what it would take to fix all this and it's looking non-trivial currently.
Also this is something that was obviously thought about but not realised
static void
nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigterm handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* TODO: fast exit. */
nxt_exiting = 1;
nxt_runtime_quit(task, 0);
}
static void
nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigquit handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* TODO: graceful exit. */
nxt_exiting = 1;
nxt_runtime_quit(task, 0);
}
So do we only want to do a graceful shutdown in some circumstances?
Just one of likely several questions that will need answering.
@ac000 would you work on this issue by the end of this month?
Or may I continue to attempt the implementation of (1)?
I'll poke at it a bit more, but given Unit's current status, I'll unlikely be able to dedicate a lot of time to it.
OK, so here is something that seems to work.
It will close the listen sockets and wait for the applications to finish.
Some background...
The router process is multi-threaded and by default we create one thread per on-line cpu/core to handle client connections.
For example a system with two cpus/cores will have three router threads by default (can be changed via settings.listen_threads), the main router thread and two router threads handling client connections. The main router thread is responsible for initialising various bits of router configuration and starting the required number of listen threads. It's also responsible for handling the shutting down of the router process.
The listen threads do have a list of the listen sockets, the main thread does not.
Buried deep down inside the task structure is an array for holding the
listen sockets, This is somewhat populated in the main process, mainly
for the clean removal of UNIX domain sockets (see commit ccaad38bc).
We can make use of this array in the router process to just store the listen socket fd's.
The other part of this is the waiting for the application processes. I have used a modified version of the previous patches in this issue to make use of a condition variable instead of sleeping etc...
The resulting patch looks like
diff --git ./src/nxt_router.c ./src/nxt_router.c
index af9dad29..02f769fe 100644
--- ./src/nxt_router.c
+++ ./src/nxt_router.c
@@ -1292,6 +1292,27 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
nxt_router_conf_ready(task, tmcf);
+ rt = task->thread->runtime;
+
+ nxt_array_destroy(rt->listen_sockets);
+
+ rt->listen_sockets = nxt_array_create(rt->mem_pool, 1,
+ sizeof(nxt_listen_socket_t));
+ if (nxt_slow_path(rt->listen_sockets == NULL)) {
+ goto fail;
+ }
+
+ nxt_queue_each(skcf, &router->sockets, nxt_socket_conf_t, link) {
+ nxt_listen_socket_t *ls;
+
+ ls = nxt_array_zero_add(rt->listen_sockets);
+ if (nxt_slow_path(ls == NULL)) {
+ goto fail;
+ }
+
+ ls->socket = skcf->listen->socket;
+ } nxt_queue_loop;
+
return;
fail:
@@ -1885,6 +1906,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
goto app_fail;
}
+ nxt_thread_cond_create(&app->active_req_cond);
+
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
@@ -2247,6 +2270,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_remove(&app->link);
nxt_thread_mutex_destroy(&app->mutex);
+ nxt_thread_cond_destroy(&app->active_req_cond);
nxt_mp_destroy(app->mem_pool);
} nxt_queue_loop;
@@ -4803,6 +4827,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
nxt_thread_mutex_lock(&app->mutex);
app->active_requests -= got_response + dec_requests;
+ if (app->active_requests == 0) {
+ nxt_thread_cond_signal(&app->active_req_cond);
+ }
nxt_thread_mutex_unlock(&app->mutex);
@@ -4815,6 +4842,9 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
main_app_port->active_requests -= got_response + dec_requests;
app->active_requests -= got_response + dec_requests;
+ if (app->active_requests == 0) {
+ nxt_thread_cond_signal(&app->active_req_cond);
+ }
if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
@@ -5168,6 +5198,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
app->shared_port = NULL;
}
+ nxt_thread_cond_destroy(&app->active_req_cond);
nxt_thread_mutex_destroy(&app->mutex);
nxt_mp_destroy(app->mem_pool);
diff --git ./src/nxt_router.h ./src/nxt_router.h
index e2c5b83b..52ae0458 100644
--- ./src/nxt_router.h
+++ ./src/nxt_router.h
@@ -156,6 +156,8 @@ struct nxt_app_s {
nxt_port_t *proto_port;
nxt_port_mmaps_t outgoing;
+
+ nxt_thread_cond_t active_req_cond;
};
diff --git ./src/nxt_runtime.c ./src/nxt_runtime.c
index de76f19e..321694fa 100644
--- ./src/nxt_runtime.c
+++ ./src/nxt_runtime.c
@@ -22,6 +22,7 @@ static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
+static void nxt_runtime_close_listen_sockets(const nxt_task_t *task);
static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
@@ -427,6 +428,22 @@ nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
}
+static void
+nxt_runtime_close_listen_sockets(const nxt_task_t *task)
+{
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ for (size_t i = 0; i < rt->listen_sockets->nelts; i++) {
+ nxt_listen_socket_t *ls;
+
+ ls = (nxt_listen_socket_t *)rt->listen_sockets->elts + i;
+ close(ls->socket);
+ }
+}
+
+
void
nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
{
@@ -453,12 +470,27 @@ nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
if (rt->type == NXT_PROCESS_MAIN) {
nxt_runtime_stop_all_processes(task, rt);
done = 0;
+ } else if (rt->type == NXT_PROCESS_ROUTER) {
+ nxt_runtime_close_listen_sockets(task);
+ done = 0;
}
}
nxt_runtime_close_idle_connections(engine);
if (done) {
+ if (rt->type == NXT_PROCESS_ROUTER) {
+ nxt_app_t *app;
+
+ nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
+ pthread_mutex_lock(&app->mutex);
+ while (app->active_requests > 0) {
+ nxt_thread_cond_wait(&app->active_req_cond, &app->mutex,
+ NXT_INFINITE_NSEC);
+ }
+ } nxt_queue_loop;
+ }
+
nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
task, rt, engine);
}