Fix: Notify app of client disconnection when request is in progress.
Previously, the app was not notified when the client disconnected. This caused issues especially in cases of websocket connections and SSE Events where the app continued to send data to the router, which could not deliver it to the client due to the disconnection.
Changes made:
- Added functionality to send a port message to notify the app of client disconnection.
- For now handled and tested this scenario only for ASGI websockets and ASGI HTTP, ensuring that if the app detects a client disconnection, it follows the ASGI specification.
This ensures that the app is properly informed of client disconnections and can handle them according to the ASGI spec.
Would you mind splitting the python changes into a separate commit? (within this PR...)
Would you mind splitting the python changes into a separate commit? (within this PR...)
Sure, Doing changes as per suggetions
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
Thanks @gourav-kandoria and the whitespace checker no longer complains :-)
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
Yeah, OSError would make more sense here...
But then we'd have to flow errno through...
@ac000 have made the changes. Just one more thing to highlight. The exception as per asgi spect should be of subclass of OSError. (If send() is called on a closed connection the server should raise a server-specific subclass of OSError. ) . But, we are raising PyExc_RuntimeError which is fine for abnormal cases. But here this could be returned PyExc_OS. Please find this for reference- https://docs.python.org/3/c-api/exceptions.html#standard-exceptions
Yeah,
OSErrorwould make more sense here...But then we'd have to flow
errnothrough...
You mean, to flow the error all the way from router to app passing exact errorno?
Yeah,
OSErrorwould make more sense here... But then we'd have to flowerrnothrough...You mean, to flow the error all the way from router to app passing exact errorno?
Right.
As this error will happen in the router process that is where errno(3p) will be set. In this case I guess it's likely to be either ECONNRESET or EPIPE.
I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say ECONNRESET
However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.
@gourav-kandoria
Was there an open issue about this?
Hi @gourav-kandoria @ac000,
However, what you have there now looks reasonable to me, but I'd like to get @hongzhidao 's overall input on this.
Will do it, thanks for the contribution.
@gourav-kandoria
Was there an open issue about this?
@ac000 Exact same issue was not opened but this issue was due to this reason https://github.com/nginx/unit/issues/1501
I'm not saying you need to do that as that's likely not a trivial change. Or we could just hardcode say
ECONNRESET
Got it. So for now, will raise ECONNRESET
Hi @gourav-kandoria
Yeah, even though I said that, I'm not sure it's the right thing to do...
Do you have some reproducer for this issue?
I've been trying to myself, but no luck, I am unable to trigger either of nxt_http_request_error() or nxt_http_request_error_handler() which are the only places I see us setting r->error.
Python also seems to be notified (I.e. if m['type'] == 'websocket.disconnect': triggers) about the websocket closure even if you kill -9 the client (the kernel closes the socket) .
The only was I can get thing gummied up is if I firewall the websocket after the client connects.
With this hack
import time
async def application(scope, receive, send):
if scope['type'] == 'websocket':
print("WebSocket!")
while True:
m = await receive()
if m['type'] == 'websocket.connect':
print("Waiting for websocket.accept")
await send({'type': 'websocket.accept'})
print("Got WebSocket connection")
time.sleep(5);
send(
{
'type': 'websocket.send',
'bytes': m.get('bytes', None),
'text': "Test",
}
)
If I Ctrl-c the client while the app is sleeping, the router process never attempts to send the message (even though AFAICT the python app has called send() sending the data to the router process), perhaps because Unit knows the socket is closed.
So at the moment I am having a bit of a doubt about this whole thing...
Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...
Heh, just noticed the you pointed out an open issue... after a quick skim, I'm even more confused now... that doesn't seem to be about WebSockets and this PR does...
oh sorry my bad for mixing up things. Just for the issue which I mentioned earlier. I am sharing app side code and client side script which I used to test it. Will share the scenario, why also made changes websocket related files once the changes to this thing are verified.
application :
async def application_sse(scope, receive, send):
if scope['type'] == 'http':
headers = [
(b'content-type', b'text/event-stream'),
(b'cache-control', b'no-cache'),
(b'connection', b'keep-alive'),
]
await send({
'type': 'http.response.start',
'status': 200,
'headers': headers,
})
send_task = asyncio.create_task(send_messages_sse(send))
receive_task = asyncio.create_task(receive_messages_sse(receive))
await asyncio.gather(send_task, receive_task)
async def receive_messages_sse(receive):
message = await receive()
print(f'message received: {message}')
if message['type'] == 'http.disconnect':
return
async def send_messages_sse(send):
i = 0
while True:
try:
message = f"event: count\ndata: {i}\n\n"
print(f'message sent: {message}')
await send({
'type': 'http.response.body',
'body': message.encode('utf-8'),
'more_body': True
})
i+=2
await asyncio.sleep(2)
except Exception as err:
print(f'err : {err}')
break
client side script:
import asyncio
import aiohttp
async def listen_to_events():
async with aiohttp.ClientSession() as session:
async with session.get('http://localhost:8001') as response:
async for line in response.content:
if line:
print(line.decode('utf-8').strip())
if __name__ == '__main__':
asyncio.run(listen_to_events())
used ctrl+c to close the connection from client side
Hmm, I'd never heard of Server-Sent events before, even though they pre-date WebSockets...
But, yes, I can reproduce this with the above application + curl(1). Thanks.
An interesting observation is with WebSockets we see (from router to client)
[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendto(27, "\201\4Test", 6, 0, NULL, 0) = 6
[pid 31184] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\315y\0\0\1\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
Ctrl-C'ing the client and everything behaves as expected. The connection is closed, the router process sees this and informs the Python application.
With Server-Sent Events we see instead...
[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31002] writev(27, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31002] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=30999, si_uid=1000} ---
Once we get -EPIPE we call into nxt_http_request_error_handler() & nxt_http_request_close_handler()
We seem to use writev(3p) for HTTP sockets and sendto(2) for WebSockets... not sure why, probably not that important, just an interesting curiosity.
With your first patch we now see
[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 6\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n16\r\n", iov_len=6}, {iov_base="event: count\ndata: 8\n\n", iov_len=22}], 2) = 28
[pid 31734] writev(11, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 10\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 31734] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=31731, si_uid=1000} ---
[pid 31734] sendmsg(31, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\363{\0\0\1\0!\1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
So the router process is now informing the Pythion application, though in this case it's still writing to stdout...AFAICT the application doesn't get the http.disconnect message...
Applying this bit of the second patch
--- a/src/python/nxt_python_asgi_http.c
+++ b/src/python/nxt_python_asgi_http.c
@@ -368,6 +368,11 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
"sent, after response already completed");
}
+ if (nxt_slow_path(http->closed)) {
+ return PyErr_Format(PyExc_ConnectionResetError,
+ "Connection Closed ");
+ }
+
if (nxt_slow_path(http->send_future != NULL)) {
return PyErr_Format(PyExc_RuntimeError, "Concurrent send");
}
We don't seem to hit that if () statement as we call this function before we call nxt_http_request_error_handler()
r->error is set in nxt_router_http_request_done(), but http->closed doesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
r->erroris set innxt_router_http_request_done(), buthttp->closeddoesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
I just noticed a strange behaviour on my system. I was assuming port message received from router will call nxt_unit_process_client_error, then from this below mentioned flow should happen.
nxt_unit_process_client_error -> nxt_py_asgi_close_handle -> nxt_py_asgi_http_close_handler.
If this happens everything seems to work correctly. But what I noticed is that, after starting unit. If I am attaching application process with debugger, in the nxt_unit_process_client_error function. I am able to receive req object by this statement req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0); and now if I even stop the debugger. for subsequent tests request I am always getting this request object upon disconnection.
But if I start unit and don't attache dubgger with application process. This statement( req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0) ) is returning NULL.
Is it like in debugging mode, this object is kept in memory otherwise not?
r->erroris set innxt_router_http_request_done(), buthttp->closeddoesn't seem to be getting set, hmm, maybe it's not supposed to and we just need to flow the error condition through... needs more investigation...
http->closed is supposed to set in this function "nxt_py_asgi_http_close_handler" . which is getting set when debugger is attached with application and not when debugger is not set as I mentioned in above comment
Certainly don't be surprised that debugging can change the behaviour of the program...
Certainly don't be surprised that debugging can change the behaviour of the program...
oh okay, So, In PR, wherever I made changes related to nxt_unit_request_hash_find. The purpose of those were to make sure that req object is not removed from hash while request is still in progress and it would get removed once nxt_unit_request_info_release is called after request completion. But It is getting removed through some other way. So, I have two questions now:
- Is it fine, if we take this approach. I mean keeping performance or memory implications in mind ?
- If yes, How do I make sure, I am always able to get this request object from hash, untill it is released by nxt_unit_request_info_release
I think we need to step back for a minute as there may be a more fundamental issue here.
With a WebSocket
(strace(1)ing the Python app)
epoll_wait(12,
<-- Ctrl-C the client
[{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, -1) = 1
epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, 0) = 1
recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\243\205\0\0\1\0 \1\0\0\0\0", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
recvmsg(8, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
epoll_wait(12, [], 3, 0) = 0
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \0\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
epoll_wait(12,
So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app
However, with a HTTP connection
epoll_wait(14,
<-- Ctrl-C the client
No notification is sent to the python app...
When a WebSocket is closed nxt_py_asgi_close_handler() is called but not when a HTTP connection is closed.
I think we need to step back for a minute as there may be a more fundamental issue here. sure.
I think we need to step back for a minute as there may be a more fundamental issue here.
With a WebSocket
(strace(1)ing the Python app)
epoll_wait(12, <-- Ctrl-C the client [{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, -1) = 1 epoll_wait(12, [{events=EPOLLIN, data={u32=8, u64=139869904961544}}], 3, 0) = 1 recvmsg(8, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\243\205\0\0\1\0 \1\0\0\0\0", iov_len=16384}], msg_iovlen=1, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=34211, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16 recvmsg(8, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable) epoll_wait(12, [], 3, 0) = 0 sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \0\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16 sendmsg(14, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\r\0\0\0\252\205\0\0\0\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16 epoll_wait(12,So you can see that python app is sitting in epoll_wait(2), when the socket is closed, the router process notifies the python app
However, with a HTTP connection
epoll_wait(14, <-- Ctrl-C the clientNo notification is sent to the python app...
When a WebSocket is closed
nxt_py_asgi_close_handler()is called but not when a HTTP connection is closed.
Just to explain what's in the PR
In Http SSE case, The code in pr is behaving this way. If app sends data to router and router fails to write to socket due to error like closed connection. It is setting r->error flag as true. and doing all resouce cleanup etc. At last nxt_router_http_request_done is being called. it is where I have plugged the code to notify app using client error msg.
So, router is only notifying when it fails to writes bytes to client not when client connection is actually closed.
... not when client connection is actually closed.
This is the bit we need to fix first, then the other bit may not even be an issue...
Just some notes...
The message (NXT_PORT_MSG_WEBSOCKET_LAST) that notifies about the websocket disconnect is sent from nxt_http_websocket_error_handler()
In the router process when closing a WebSocket
[pid 36046] epoll_wait(17,
<-- Ctrl-C the client
[{events=EPOLLIN|EPOLLRDHUP, data={u32=671094160, u64=139939295532432}}], 32, 20167) = 1
[pid 36046] recvfrom(27, "", 135, 0, NULL, NULL) = 0
[pid 36046] fstat(1, {st_mode=S_IFCHR|0620, st_rdev=makedev(0x88, 0x1), ...}) = 0
[pid 36046] write(1, "nxt_h1p_conn_ws_error: \n", 24) = 24
[pid 36046] sendmsg(16, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\313\214\0\0\2\0 \1\0\0\0\0", iov_len=16}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 16
[pid 36046] write(1, "nxt_h1p_request_close: \n", 24) = 24
[pid 36046] write(1, "nxt_h1p_shutdown: \n", 19) = 19
[pid 36046] epoll_wait(17, [{events=EPOLLIN, data={u32=671091920, u64=139939295530192}}], 32, 0) = 1
[pid 36046] recvmsg(23, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\315\214\0\0\0\0 \0\0\0\0\0", iov_len=16}, {iov_base="", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=36045, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 16
[pid 36046] recvmsg(23, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 36046] write(1, "nxt_h1p_closing: \n", 18) = 18
[pid 36046] epoll_ctl(17, EPOLL_CTL_MOD, 23, {events=EPOLLIN|EPOLLRDHUP|EPOLLET, data={u32=671091920, u64=139939295530192}}) = 0
[pid 36046] epoll_ctl(17, EPOLL_CTL_DEL, 27, 0xa35c24) = 0
[pid 36046] epoll_wait(17, [], 32, 0) = 0
[pid 36046] close(27) = 0
[pid 36046] epoll_wait(17,
epoll_wait(2) returns EPOLLRDHUP, good...
With a HTTP connection
[pid 36043] epoll_wait(3,
<-- Ctrl-C the client
<unfinished ...>
[pid 36046] <... epoll_wait resumed>[{events=EPOLLIN|EPOLLRDHUP, data={u32=671099104, u64=139939295537376}}], 32, -1) = 1
[pid 36046] epoll_wait(17,
Again we get EPOLLRDHUP from epoll_wait(2), good, but we immediately go back to epoll_wait(2) again...
With a WebSocket when the connection is closed we call
955 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
956 ev->task, ev, ev->data);
in nxt_epoll_poll().
When a HTTP connection is closed, we don't.
This seems to be due to ev->read == NXT_EVENT_BLOCKED in the HTTP case and ev->read == NXT_EVENT_DEFAULT in the WebSocket case...
~~As a POC, this hack fixes the python SSE app~~
Forget that, this causes the router process to crash...
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index d53df1bc..05efc01a 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -937,17 +939,23 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
+ if (events & EPOLLRDHUP)
+ ev->read = NXT_EVENT_DEFAULT;
+
When using a WebSocket we get (after Ctrl-C'ing the client)
(gdb) bt
#0 nxt_epoll_block_read (engine=0x1728050, ev=0x7f9d740028e0) at src/nxt_epoll_engine.c:512
#1 0x000000000043b5dd in nxt_conn_io_read (task=task@entry=0x7f9d740039e0, obj=0x7f9d740028e0,
data=0x7f9d74003d00) at src/nxt_conn_read.c:97
#2 0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0x1728050) at src/nxt_event_engine.c:542
#3 0x000000000041c5b4 in nxt_router_thread_start (data=0x170a910) at src/nxt_router.c:3717
#4 0x0000000000412865 in nxt_thread_trampoline (data=0x170a910) at src/nxt_thread.c:126
#5 0x00007f9d81689d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6 0x00007f9d8170ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
Calling nxt_epoll_block_read() is good...
With a HTTP connection we get
(gdb) bt
#0 nxt_epoll_enable_read (engine=0xc9d9a0, ev=0x7fcf80000cd0) at src/nxt_epoll_engine.c:418
#1 0x000000000040bbfd in nxt_port_queue_read_handler (task=task@entry=0xc9d9a0, obj=0x7fcf80000cd0,
data=<optimized out>) at src/nxt_port_socket.c:1013
#2 0x0000000000413f97 in nxt_event_engine_start (engine=engine@entry=0xc9d9a0) at src/nxt_event_engine.c:542
#3 0x000000000041c5b4 in nxt_router_thread_start (data=0xc8a390) at src/nxt_router.c:3717
#4 0x0000000000412865 in nxt_thread_trampoline (data=0xc8a390) at src/nxt_thread.c:126
#5 0x00007fcf86e89d22 in start_thread (arg=<optimized out>) at pthread_create.c:443
#6 0x00007fcf86f0ed40 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
A completely different code path...
strace(1)ing the router process
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 20\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29
<-- Ctrl-C the client
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 24\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 24\n\n", iov_len=23}], 2) = 29
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 26\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 26\n\n", iov_len=23}], 2) = -1 EPIPE (Broken pipe)
[pid 37087] --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=37084, si_uid=1000} ---
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 28\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 30\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 32\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
[pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_namelen=0}, 0) = -1 EAGAIN (Resource temporarily unavailable)
...
The UNIX domain socket I guess is the Python app talking to the router process...
pid 37087] recvmsg(21<UNIX:[89761->89762]>, {msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\10\0\0\0\336\220\0\0\0\0\33\0\0\0\0\0", iov_len=16}, {iov_base="event: count\ndata: 22\n\n", iov_len=16384}], msg_iovlen=2, msg_control=[{cmsg_len=28, cmsg_level=SOL_SOCKET, cmsg_type=SCM_CREDENTIALS, cmsg_data={pid=37086, uid=1000, gid=1000}}], msg_controllen=32, msg_flags=0}, 0) = 39
Python app sends event: count\ndata: 22\n\n to the router process, then
[pid 37087] writev(27<TCPv6:[[]:8000->[]:34572]>, [{iov_base="\r\n17\r\n", iov_len=6}, {iov_base="event: count\ndata: 22\n\n", iov_len=23}], 2) = 29
The router process then sends event: count\ndata: 22\n\n to the client.
After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.
That's fine, but we do need to properly handle EPOLLRDHUP, hmm, but even then that'll only fix it for epoll(7) we still need to worry about all the other poll mechanisms...
After Crtl-C'ing the client we don't attempt to read from the client (like we do in the WebSocket case), I guess because we're not expecting to be reading any data from the client after the initial request.
makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.
makes sense. I just want to understand. Why it won't be okay. If at the time of writing back to socket. we notify app of disconnection. I get that, from the app perspective. it would be like some bytes have been sent but not actually, because, router would have discarded them and then notify the app of disconnection.
I dunno, maybe that is the right approach... however at the moment if you have an application that starts a chunked transfer, (E.g. a SSE application) but then doesn't send any data you can effectively DOS the server by opening and closing a bunch of connections to the application, as you then end up with a bunch of connections in CLOSE_WAIT. E.g.
$ ss -tnp state CLOSE-WAIT | sed 's/\[[^[]*\]/[::1]/g'
Recv-Q Send-Q Local Address:Port Peer Address:Port Process
1 0 [::1]:8000 [::1]:59800 users:(("unitd",pid=41460,fd=31))
1 0 [::1]:8000 [::1]:54674 users:(("unitd",pid=41460,fd=27))
1 0 [::1]:8000 [::1]:35932 users:(("unitd",pid=41460,fd=30))
1 0 [::1]:8000 [::1]:35918 users:(("unitd",pid=41460,fd=29))
See this cloudflare article for the gory details.
Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use EPIPE for the OSError? (although I think you were only doing that for the websocket case).
HI @gourav-kandoria
So in trying a variation of your patches to try and handle the client closing the connection, I have (hopefully it's all there as I tried to leave most of my debug code behind...)
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 772fb41a..e801a2ee 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
/* Status report. */
nxt_port_handler_t status;
+ nxt_port_handler_t client_close;
+
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
nxt_port_handler_t read_queue;
@@ -115,6 +117,8 @@ typedef enum {
_NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
_NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
+ _NXT_PORT_MSG_CLIENT_CLOSE = nxt_port_handler_idx(client_close),
+
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
_NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
@@ -160,6 +164,8 @@ typedef enum {
NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
+ NXT_PORT_MSG_CLIENT_CLOSE = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE),
+
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 44ea823b..bf8f5ff1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
+ printf("%s: \n", __func__);
+
r = obj;
nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
@@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
+ printf("%s: \n", __func__);
+
r = data;
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data;
+
+ printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n",
+ __func__, req_rpc_data->stream);
+ nxt_port_socket_write(task, req_rpc_data->app_port,
+ NXT_PORT_MSG_CLIENT_CLOSE, -1,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
+
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 966a6c0f..fe62861c 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_CLIENT_CLOSE:
+ printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__);
+ rc = nxt_unit_process_client_close(ctx, &recv_msg);
+ break;
+
case _NXT_PORT_MSG_REMOVE_PID:
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1377,18 +1384,18 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
if (req->content_length
> (uint64_t) (req->content_buf->end - req->content_buf->free))
{
- res = nxt_unit_request_hash_add(ctx, req);
- if (nxt_slow_path(res != NXT_UNIT_OK)) {
- nxt_unit_req_warn(req, "failed to add request to hash");
-
- nxt_unit_request_done(req, NXT_UNIT_ERROR);
-
- return NXT_UNIT_ERROR;
- }
-
/*
* If application have separate data handler, we may start
* request processing and process data when it is arrived.
@@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;
- req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
if (req == NULL) {
return NXT_UNIT_OK;
}
@@ -1723,6 +1730,34 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
+static int
+nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_callbacks_t *cb;
+ nxt_unit_request_info_t *req;
+
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
+ if (req == NULL) {
+ printf("%s: ERROR [req] not found for stream [%d]\n", __func__,
+ recv_msg->stream);
+ return NXT_UNIT_OK;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->close_handler) {
+ cb->close_handler(req);
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
{
@@ -6530,6 +6565,7 @@ nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
case NXT_OK:
req_impl->in_hash = 1;
+ printf("%s: Added req for stream [%d]\n", __func__, *stream);
return NXT_UNIT_OK;
default:
@@ -6557,6 +6593,7 @@ nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
pthread_mutex_lock(&ctx_impl->mutex);
if (remove) {
+ printf("%s: Removing req for stream [%d]\n", __func__, stream);
res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
} else {
But I think I see the issue you were having with looking up req in the hash table, this seems to be simply due to the fact that it isn't being added.
I.e. we don't hit this code
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ return NXT_UNIT_ERROR;
+ }
+
Trying it earlier I.e. above the if () statement, does result in it being added but also in general breakage...
Perhaps @hongzhidao has some idea?
See this cloudflare article for the gory details.
nice read.
Hmm, do you want to re-work your patches to not do any WebSocket stuff as that seems to be handled correctly as is and use
EPIPEfor the OSError? (although I think you were only doing that for the websocket case).
well, I guess keeping this would still make sense,
ws->state = NXT_WS_DISCONNECTED;
if (ws->receive_future == NULL) {
return;
}
Because, if there is no receive awaiting. then at the time of when send is called this set state would raise exception as should be done as per asgi spec. Basically the point here is to keep the state correctly regardless of whether some receive future is awaiting or not. as is done in nxt_py_asgi_http_close_handler.
Also, even if exception is occuring on router side, But it is not being propagated to application as if we see in nxt_py_asgi_websocket_close_handler function. There isn't anything that tells us what was the OSError occured. So, in the absence of any particular error code, we should raise any exception that is atlease subClass of osError as per asgi spec
Summary of issues.
With the following python ASGI application
async def application(scope, receive, send):
while True:
m = await receive()
if m['type'] == 'http.disconnect':
print("Client Disconnect")
break
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/plain"]],
}
)
await send(
{
'type': 'http.response.body',
'body': b"Testing...\n",
'more_body': True
}
)
Opening a connection with curl and then Ctrl-C'ing it
$ curl rhel-9:8000/
Testing...
^C
Results in a connection stuck in CLOSE_WAIT and a leaked file descriptor
tcp CLOSE-WAIT 1 0 [::1]:8000 [::!]:47140
This patch at least gets rid of the connection being stuck in CLOSE_WAIT, but it doesn't look like the application is notified.
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index d53df1bc..2d5b6fd0 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -936,18 +950,26 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
#if (NXT_HAVE_EPOLL_EDGE)
ev->epoll_eof = ((events & EPOLLRDHUP) != 0);
+ if (ev->epoll_eof)
+ ev->read = NXT_EVENT_INACTIVE;
#endif
With this python ASGI application
import asyncio
async def application(scope, receive, send):
if scope['type'] == 'http':
headers = [
(b'content-type', b'text/event-stream'),
(b'cache-control', b'no-cache'),
(b'connection', b'keep-alive'),
]
await send({
'type': 'http.response.start',
'status': 200,
'headers': headers,
})
send_task = asyncio.create_task(send_messages_sse(send))
receive_task = asyncio.create_task(receive_messages_sse(receive))
await asyncio.gather(send_task, receive_task)
async def receive_messages_sse(receive):
message = await receive()
print(f'message received: {message}')
if message['type'] == 'http.disconnect':
print(f'http.disconnect')
return
async def send_messages_sse(send):
i = 0
while True:
try:
message = f"event: count\ndata: {i}\n\n"
print(f'message sent: {message}')
await send({
'type': 'http.response.body',
'body': message.encode('utf-8'),
'more_body': True
})
i+=2
await asyncio.sleep(2)
except Exception as err:
print(f'err : {err}')
break
The above patch causes the router process to segfault.
Without the above patch, if you open a connection to the application then Ctrl-C it, you don't get the CLOSE_WAIT issue, but the application also isn't notified that the connection is closed and so keeps sending messages to the router process.
With this slightly modified version of @gourav-kandoria 's patch
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 772fb41a..e801a2ee 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -59,6 +59,8 @@ struct nxt_port_handlers_s {
/* Status report. */
nxt_port_handler_t status;
+ nxt_port_handler_t client_close;
+
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
nxt_port_handler_t read_queue;
@@ -115,6 +117,8 @@ typedef enum {
_NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
_NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
+ _NXT_PORT_MSG_CLIENT_CLOSE = nxt_port_handler_idx(client_close),
+
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
_NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
@@ -160,6 +164,8 @@ typedef enum {
NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
+ NXT_PORT_MSG_CLIENT_CLOSE = nxt_msg_last(_NXT_PORT_MSG_CLIENT_CLOSE),
+
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 44ea823b..bf8f5ff1 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5276,6 +5276,8 @@ nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
+ printf("%s: \n", __func__);
+
r = obj;
nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
@@ -5295,11 +5297,22 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
+ printf("%s: \n", __func__);
+
r = data;
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_t *req_rpc_data = r->req_rpc_data;
+
+ printf("%s: Sending [NXT_PORT_MSG_CLIENT_CLOSE] message / [%d]...\n",
+ __func__, req_rpc_data->stream);
+ nxt_port_socket_write(task, req_rpc_data->app_port,
+ NXT_PORT_MSG_CLIENT_CLOSE, -1,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
+
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 966a6c0f..866d1e1d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -74,6 +74,8 @@ static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
@@ -1121,6 +1123,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
rc = nxt_unit_process_websocket(ctx, &recv_msg);
break;
+ case _NXT_PORT_MSG_CLIENT_CLOSE:
+ printf("%s: Got message [NXT_PORT_MSG_CLIENT_CLOSE]\n", __func__);
+ rc = nxt_unit_process_client_close(ctx, &recv_msg);
+ break;
+
case _NXT_PORT_MSG_REMOVE_PID:
if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
@@ -1418,7 +1425,7 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_mmap_buf_t *b;
nxt_unit_request_info_t *req;
- req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
if (req == NULL) {
return NXT_UNIT_OK;
}
@@ -1723,6 +1730,35 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
+static int
+nxt_unit_process_client_close(nxt_unit_ctx_t *ctx,
+ nxt_unit_recv_msg_t *recv_msg)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_callbacks_t *cb;
+ nxt_unit_request_info_t *req;
+
+ req = nxt_unit_request_hash_find(ctx, recv_msg->stream, 0);
+ if (req == NULL) {
+ printf("%s: ERROR [req] not found for stream [%d]\n", __func__,
+ recv_msg->stream);
+ return NXT_UNIT_OK;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->close_handler) {
+ printf("%s: Calling [cb->close_handler(req)]\n", __func__);
+ cb->close_handler(req);
+ } else {
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
{
@@ -4826,10 +4862,22 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
continue;
}
+#if 1
+ printf("%s: Adding req to hash table\n", __func__);
+ res = nxt_unit_request_hash_add(ctx, req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_req_warn(req, "failed to add request to hash");
+
+ nxt_unit_request_done(req, NXT_UNIT_ERROR);
+
+ continue;
+ }
+#endif
+
if (req->content_length
> (uint64_t) (req->content_buf->end - req->content_buf->free))
{
- res = nxt_unit_request_hash_add(ctx, req);
+#if 0
if (nxt_slow_path(res != NXT_UNIT_OK)) {
nxt_unit_req_warn(req, "failed to add request to hash");
@@ -4837,7 +4885,7 @@ nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
continue;
}
-
+#endif
/*
* If application have separate data handler, we may start
* request processing and process data when it is arrived.
diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c
index 702f4d8d..2ed6f964 100644
--- a/src/python/nxt_python_asgi.c
+++ b/src/python/nxt_python_asgi.c
@@ -626,10 +626,13 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
static void
nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
{
+ printf("%s: \n", __func__);
+
if (req->request->websocket_handshake) {
nxt_py_asgi_websocket_close_handler(req);
} else {
+ printf("%s: Calling [nxt_py_asgi_http_close_handler(req)]\n", __func__);
nxt_py_asgi_http_close_handler(req);
}
}
diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c
index cdd6357e..81c97b7e 100644
--- a/src/python/nxt_python_asgi_http.c
+++ b/src/python/nxt_python_asgi_http.c
@@ -362,6 +362,12 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
Py_ssize_t body_len, body_off;
nxt_py_asgi_ctx_data_t *ctx_data;
+ printf("%s: \n", __func__);
+
+ if (http->closed) {
+ return PyErr_Format(PyExc_RuntimeError, "peer closed conenction");
+ }
+
if (nxt_slow_path(http->complete)) {
return PyErr_Format(PyExc_RuntimeError,
"Unexpected ASGI message 'http.response.body' "
@@ -646,6 +652,7 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
if (nxt_fast_path(http != NULL)) {
http->closed = 1;
+ printf("%s: Calling [nxt_py_asgi_http_emit_disconnect(http)]\n", __func__);
nxt_py_asgi_http_emit_disconnect(http);
}
}
On the first connection attempt and Ctrl-C it seems to do the right thing and the application stops sending messages.
However on subsequent connections it's back to the original problem. as it doesn't seem to be adding req to the hash table, I notice its streamid has changed from 8 to 9, so perhaps things didn't get cleaned up properly previously.
Hi @gourav-kandoria @ac000, This looks like a topic of whether to support client abort, just my two cents.
- It can be an option like
ignore_client_abort,notify_client_abort, etc, and the default behavior is ignoring it so that we don't change any behavior on the client by default. - The option seems to be in the
applicationsobject. - I feel we need to call
nxt_conn_read()with a closed handler in the correct place to track the client connection. It seems not a good idea to rely onr->errororr->closed. - Welcome to add unit tests on it.