example on stop accept
How do I stop accepting connections?
In my server I want to have shutdown behavior that gracefully handles all existing connections, but does not accept new ones.
Good question
Graceful shutdown isn’t too complicated it could be done with a timeout around children tasks... would need to manage accept loop a bit more explicitly.... then on shutdown wait for all children tasks to complete.... after timeout invoke stop on them... then exit yourself.
Here is a rough idea of how to do it:
require 'async'
require 'async/io'
Async.logger.info!
GRACE_TIME = 5
def graceful_server(endpoint, condition = nil, &block)
Async do |parent_task|
server_tasks = []
Async.logger.info "Binding to #{endpoint}..."
endpoint.each do |endpoint|
server = endpoint.bind
server.listen(Socket::SOMAXCONN)
Async.logger.info "Accepting connections from #{server}..."
server_tasks << parent_task.async do
server.accept_each(task: parent_task, &block)
ensure
server.close
end
end
if condition
Async.logger.info "Waiting on #{condition}..."
condition.wait
Async.logger.info("Task tree"){|buffer| parent_task.print_hierarchy(buffer)}
Async.logger.info "Stopping #{server_tasks.count} accept loops..."
server_tasks.each(&:stop)
children = parent_task.children
Async.logger.info "Stopping #{children.count} connections..."
if children.any?
Async.logger.warn("Waiting for #{children.count} connections...")
parent_task.sleep GRACE_TIME
end
children = parent_task.children
if children.any?
Async.logger.warn("Stopping #{children.count} connections...")
children.each(&:stop)
end
end
end
end
def echo_server(endpoint)
condition = Async do |task|
task.sleep 60 while true
end
graceful_server(endpoint, condition) do |client, task:|
# This is an asynchronous block within the current reactor:
while data = client.read(512)
# This produces out-of-order responses.
task.sleep(rand * 0.01)
client.write(data.reverse)
end
ensure
client.close
end
return condition
end
def echo_client(endpoint, data)
Async do |task|
endpoint.connect do |peer|
10.times do
Async.logger.info "Client #{data}: sleeping"
task.sleep 2
result = peer.write(data)
message = peer.read(512)
Async.logger.info "Sent #{data}, got response: #{message}"
end
end
end
end
Async do |task|
endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)
Async.logger.info "Starting server..."
server = echo_server(endpoint)
Async.logger.info "Clients connecting..."
1.times.collect do |i|
echo_client(endpoint, "Hello World #{i}")
end
task.sleep 5
Async.logger.info "Stopping server..."
server.stop
end
Async.logger.info "Finished..."
Honestly, it seems pretty complicated, but I guess graceful shutdown isn't that trivial.
It should work to put a timeout around the tasks, but I think there is a bug... needs to be fixed.
parent_task.with_timeout(GRACE_TIME) do
children.each(&:wait)
rescue Async::TimeoutError
Async.logger.warn("Could not terminate child connections...")
end
Task#wait is not timeout safe. :( doh.
Okay, this is fixed in https://github.com/socketry/async/commit/0b631958a8af9f07432de81fdb9b48baa41782ae
I'll need to test a bit more.
Okay, this implementation works with async 1.15.1:
require 'async'
require 'async/io'
Async.logger.info!
GRACE_TIME = 5
def graceful_server(endpoint, condition = nil, &block)
Async do |parent_task|
server_tasks = []
Async.logger.info "Binding to #{endpoint}..."
endpoint.each do |endpoint|
server = endpoint.bind
server.listen(Socket::SOMAXCONN)
Async.logger.info "Accepting connections from #{server}..."
server_tasks << parent_task.async do
server.accept_each(task: parent_task, &block)
ensure
server.close
end
end
if condition
Async.logger.info "Waiting on #{condition}..."
condition.wait
Async.logger.info("Task tree"){|buffer| parent_task.print_hierarchy(buffer)}
Async.logger.info "Stopping #{server_tasks.count} accept loops..."
server_tasks.each(&:stop)
children = parent_task.children
Async.logger.info "Stopping #{children.count} connections..."
if children.any?
Async.logger.warn("Waiting for #{children.count} connections...")
parent_task.with_timeout(GRACE_TIME) do
children.each(&:wait)
rescue Async::TimeoutError
Async.logger.warn("Could not terminate child connections...")
end
end
children = parent_task.children
if children.any?
Async.logger.warn("Stopping #{children.count} connections...")
children.each(&:stop)
end
end
end
end
def echo_server(endpoint)
condition = Async do |task|
task.sleep 60 while true
end
graceful_server(endpoint, condition) do |client, task:|
# This is an asynchronous block within the current reactor:
while data = client.read(512)
# This produces out-of-order responses.
task.sleep(rand * 0.01)
client.write(data.reverse)
end
ensure
client.close
end
return condition
end
def echo_client(endpoint, data)
Async do |task|
endpoint.connect do |peer|
10.times do
Async.logger.info "Client #{data}: sleeping"
task.sleep 2
result = peer.write(data)
message = peer.read(512)
Async.logger.info "Sent #{data}, got response: #{message}"
end
end
end
end
Async do |task|
endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)
Async.logger.info "Starting server..."
server = echo_server(endpoint)
Async.logger.info "Clients connecting..."
1.times.collect do |i|
echo_client(endpoint, "Hello World #{i}")
end
task.sleep 5
Async.logger.info "Stopping server..."
server.stop
end
Async.logger.info "Finished..."
I think you probably also want to tell the server tasks to signal the clients that shutdown is in progress...
To do this, you'll need some broadcast message. It will ultimately depend on how you implement your connection state... there is nothing wrong with making an array of connections and then just connections.each{|connection| connection.write("Goodbye!")} before waiting on the children tasks.
One more thing to keep in mind is if you have a graceful shutdown, you need to handle the situations where the graceful shutdown fails in some way and invoke #stop on everything.
wow, thanks!
when are there multiple server_tasks ? to me it looks like there is just one server_task ?
server_tasks << parent_task.async do
server.accept_each(task: parent_task, &block)
ensure
server.close
end
bind to localhost instead of 0.0.0.0 and you'll get IPv4 and IPv6
what about threads (Async::Container) - I tried to follow falcon source with SharedEndpoint, but I can't come up with how to do it without server.accept_each that returns the task..
How would threads solve the problem? I think it only makes more problems.
Sorry for the terse reply.
So, now that I have few more minutes to spare.
Generally speaking, threads and processes are for scaling your application across multiple processors and computers.
They shouldn't be for the purposes of implementing functionality, at least not in this situation.
Graceful shutdown is going to be directly part of the protocol. e.g. how websocket and http/2 can send close and goaway respectively. Connection is still open at the OS level, but essentially closed from the application POV. How you negotiate this will depend on what you are doing. For example, a web server might be servicing a long running request. Should you wait until the request is finished before terminating? Yes, if you are feeling friendly. What happens if you wait 30 minutes and user is still connected. Maybe you get a bit more frustrated and terminate the connection more forcefully. All these steps, short of calling close(fd) are client/server protocol concerns.
Introducing threads won't solve this problem, it's only tangentially related to resource management, but doesn't solve the client/server protocol concerns e.g. grace period, knowing when the server can shut down immediately, etc. Some of those concerns are addressed in the example code above, but generally speaking, it's not an easy problem to solve.
In the first instance, perhaps you can describe what you should do to tell the remote end that the server is shutting down. Do you send a message?
Uhm, I should have opened up a bit more what I want to do :)
So I used your approach to write a TCP reverse proxy which just proxies requests through to a webserver (nginx). Connection draining works great and I even added reuse_port to the mix to see if I can a) drain connections and exit and b) replace the proxy with another instance without clients noticing. Great success!
Then I did some fast benchmarking and if I ran wrk directly against nginx I got "6000r/s" and when I ran it against my proxy, I got "3000r/s". I think that the performance is acceptable for my usecase, but I was just curious what would happen if I added threads. So: I don't want to use threads to implement some weird connection draining, but simply to see how it would affect the performance.
So I took a look at falcon which uses Async::Container and SharedEndpoint - but I can't come up with a way to use accept_each for the SharedEndpoint - only accept - and with accept I don't get server_tasks that I could stop for connection draining.
My threaded implementation seemed to perform better than "3000r/s" though, but I really need connection draining (eg. stop accepting new connections) - I tried calling .close and .stop on everything, but new connections were still being accepted while the server was draining (which worked).
So my question is that how to use .accept_each with SharedEndpoint which seems to be required for threaded approach? It was also getting late while I tried to refactor my code, so I might have missed something obvious...
And as always, thank you so much for your answers and time. Socketry rules.
I see, that makes sense. It's late so I'll take a look tomorrow, but do you think you can share your code?
You should be able to get more than 3000 req/s - what OS are you testing on?
I made a mistake by coding directly in a private codebase... But my code follows your example very closely.
I see if I can extract a version of it, but basically it's your example + attempt to use falcon style threaded setup...
I have no need to use "falcon style" - it was just the only example where I could see this in action.
I guess my question could also be reworded as "how to do connection draining with falcon", since to me it looks like you can not because falcon uses async-http-server which uses accept and not accept_each.
Is this just a straight socket proxy or are you proxying HTTP requests?
I did the benchmarking on OSX, but I think it's safe to ignore all numbers - it was a quick try.
OSX nio4r uses IO.select - it's not that efficient. Try Linux.
I guess my question could also be reworded as "how to do connection draining with falcon", since to me it looks like you can not because falcon uses async-http-server which uses accept and not accept_each.
The process model of falcon is really limited at the moment. It's the next thing to sort out.
For example, if a process/thread crashes, it won't get restarted.
Connection draining/graceful shutdown is also on the cards. It's required for rolling restarts which I hope to support soon in a generic way.
Proxying http requests, but as TCP (so basically your example with tiny modifications to not echo, but to make another connection)
I might be optimistic, but to me it looks like draining would be a "simple" thing to do in Falcon if only it would stop accepting new connections? I just don't know how :)
Okay - I'll try Linux - and as I said this need for threads is just to see what happens. And to understand async-* APIs. I'm happy with just one thread/process for now.
Figured out why the performance was bad - I had peer.read(512) when the message ("welcome to nginx" html-page) is 850. After I changed that to peer.read(1024). With 512 the requests/s is around 500 and with 1024 it's 3900 (!)
So my quick comparison:
Proxy Nginx
wrk 3900 5500
hey 3800 4400
where Proxy is the async-ruby component and Nginx means direct connection without proxy.
Pushed some code to: https://github.com/matti/socketry-proxy
Nice, thanks! I will take a look.