Can't stop client without raising an error
I can't find a way to close a client program using async-websocket without errors.
Here's the error I get when I quit using Ctrl+C :
/home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'IO::Event::Selector::EPoll#select': Interrupt
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'Async::Scheduler#run_once!'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:425:in 'Async::Scheduler#run_once'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:498:in 'block in Async::Scheduler#run'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:461:in 'block in Async::Scheduler#run_loop'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Thread.handle_interrupt'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Async::Scheduler#run_loop'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:497:in 'Async::Scheduler#run'
from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/kernel/async.rb:34:in 'Kernel#Async'
from lib/listener.rb:224:in 'MexCWebSocketClient#run'
from lib/listener.rb:282:in '<main>'
I implemented a simple REPL with a quit command that send { method: "unsubscribe", ... } messages and I wait for the response before shutting down the WebSocket connection using : connection.shutdown and I get this error :
4.04s warn: Async::Task [oid=0x1ce0] [ec=0x1ce8] [pid=423165] [2025-01-28 00:03:03 +0100]
| Task may have ended with unhandled exception.
| FrozenError: can't modify frozen IO::Stream::StringBuffer: "\x88\x02\x03\xE8"
| → /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::SSL::SSLSocket#sysread_nonblock'
| /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::Buffering#read_nonblock'
Then I have to Ctrl-C to end the program which then display the same error as above.
Do you have any tips on how to end a WebSocket client properly / gracefullty ?
Many thanks for this amazing piece of code
-- Pierre
This how it looks like :
# require_relative '../config/environment'
# Time.current and other time manipulation methods
require 'active_support/all'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
class MexCWebSocketClient
def initialize
@ticker_updates = {}
@log_messages = []
@last_pong = nil
@subscriptions = {} # symbol: status (:subscribing -> :subscribed -> :unsubscribing -> :unsubscribed)
end
def log(message)
puts message unless $stdout.tty?
@log_messages << message
end
def ping(connection)
log("Ping...")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "PING",
params: []
}))
end
def subscribe(symbol, connection)
state = @subscriptions.fetch(symbol, :unsubscribed)
if state == :unsubscribed
@subscriptions[symbol] = :subscribing
log("Subscribe to #{symbol} updates")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "SUBSCRIPTION",
params: [ "[email protected]@#{symbol}@UTC+1" ]
}))
connection.flush
else
log("Can't subscribe to #{symbol} updates (state=#{state})")
end
end
def unsubscribe(symbol, connection)
state = @subscriptions.fetch(symbol, :unsubscribed)
if state == :subscribed
@subscriptions[symbol] = :unsubscribing
log("Unsubscribe from #{symbol} updates")
connection.write(Protocol::WebSocket::TextMessage.generate({
method: "UNSUBSCRIPTION",
params: [ "[email protected]@#{symbol}@UTC+1" ]
}))
connection.flush
else
log("Can't unsubscribe from #{symbol} updates (state=#{state})")
end
end
def display_status(connection)
messages = [@log_messages.count, 10].min
(@ticker_updates.keys.length + 4 + messages).times { print "\e[A\e[K" } if $stdout.tty?
puts "MEXC Public Listener Realtime Status"
puts "Last PingPong\t#{@last_pong}"
puts "Last Ticker Updates"
@ticker_updates.each do |symbol, data|
puts " #{'%-12s' % symbol}#{'%16s' % data[:p]} (#{data[:tr]})"
end
puts "Last Log Messages"
@log_messages.last(messages).each do |msg|
puts " #{msg}"
end
end
def process_command(command, connection)
log "Process Command #{command}"
case command.chomp
when /\Aquit\z/
puts("Remove current subscriptions...")
@subscriptions.each do |symbol, state|
unsubscribe(symbol, connection) if state == :subscribed
end
sleep(0.1) while @ticker_updates.count > 0
puts("Shutting down connection...")
connection.shutdown
when /\Astatus\z/
display_status(connection)
when /\Asubscribe (\w+)\z/
symbol = $1
# if Pair.find_by(symbol: symbol).present?
subscribe(symbol, connection)
# else
# log("subscribe error: #{symbol} does not exist")
# end
when /\Aunsubscribe (\w+)\z/
symbol = $1
# if Pair.find_by(symbol: symbol).present?
unsubscribe(symbol, connection)
# else
# log("unsubscribe error: #{symbol} does not exist")
# end
end
end
def process_message(message)
m = message.parse # decode message as JSON
if (s = m[:s]) && (c = m[:c]) && c.start_with?("[email protected]") && (d = m[:d])
@ticker_updates[s] = d
elsif m[:msg]
if m[:msg] == "PONG"
@last_pong = Time.current
else
symbols = m[:msg].scan(/@([A-Z]+)@UTC/).flatten.uniq
symbols.each do |symbol|
if @subscriptions.has_key?(symbol)
case @subscriptions[symbol]
when :subscribing
@subscriptions[symbol] = :subscribed
when :unsubscribing
@ticker_updates.delete(symbol)
@subscriptions[symbol] = :unsubscribed
end
end
end
end
else
log(m.to_h)
end
end
def run
Async do |task|
endpoint = Async::HTTP::Endpoint.parse("wss://wbs.mexc.com/ws",
alpn_protocols: Async::HTTP::Protocol::HTTP11.names
)
Async::WebSocket::Client.connect(endpoint) do |connection|
input_task = task.async do
while command = $stdin.gets
process_command(command, connection)
$stdout.write "> "
end
end
ping_task = task.async do
loop do
ping(connection)
sleep(20)
end
end
$stdout.write("Connected to MexC WebSocket server.\n")
$stdout.write "> "
# Always subscribe to BTCUSDT ticker
subscribe('BTCUSDT', connection)
while message = connection.read
process_message(message)
end
puts "Exiting..."
ensure
puts "Stopping Tasks..."
begin
input_task&.stop
renew_task&.stop
input_task&.wait
renew_task&.wait
rescue
puts "Ignore Task End exceptions"
end
puts "Done."
end
end
rescue
puts "Exit ignoring exceptions..."
end
end
client = MexCWebSocketClient.new
client.run
Run using Ruby 3.4.1 with async, async-websocket and rails installed (or at least active-support)
On prompt > try commands : status, subscribe ETHUSDT, unsubscribe BTCUSDT and quit or Ctrl+C