async-websocket icon indicating copy to clipboard operation
async-websocket copied to clipboard

Can't stop client without raising an error

Open zedalaye opened this issue 10 months ago • 1 comments

I can't find a way to close a client program using async-websocket without errors.

Here's the error I get when I quit using Ctrl+C :

/home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'IO::Event::Selector::EPoll#select': Interrupt
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:386:in 'Async::Scheduler#run_once!'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:425:in 'Async::Scheduler#run_once'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:498:in 'block in Async::Scheduler#run'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:461:in 'block in Async::Scheduler#run_loop'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Thread.handle_interrupt'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:458:in 'Async::Scheduler#run_loop'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/async/scheduler.rb:497:in 'Async::Scheduler#run'
        from /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/gems/3.4.0/gems/async-2.21.1/lib/kernel/async.rb:34:in 'Kernel#Async'
        from lib/listener.rb:224:in 'MexCWebSocketClient#run'
        from lib/listener.rb:282:in '<main>'

I implemented a simple REPL with a quit command that send { method: "unsubscribe", ... } messages and I wait for the response before shutting down the WebSocket connection using : connection.shutdown and I get this error :

 4.04s     warn: Async::Task [oid=0x1ce0] [ec=0x1ce8] [pid=423165] [2025-01-28 00:03:03 +0100]
               | Task may have ended with unhandled exception.
               |   FrozenError: can't modify frozen IO::Stream::StringBuffer: "\x88\x02\x03\xE8"
               |   → /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::SSL::SSLSocket#sysread_nonblock'
               |     /home/pierre/.local/share/mise/installs/ruby/3.4.1/lib/ruby/3.4.0/openssl/buffering.rb:217 in 'OpenSSL::Buffering#read_nonblock'

Then I have to Ctrl-C to end the program which then display the same error as above.

Do you have any tips on how to end a WebSocket client properly / gracefullty ?

Many thanks for this amazing piece of code

-- Pierre

zedalaye avatar Jan 27 '25 23:01 zedalaye

This how it looks like :

# require_relative '../config/environment'

# Time.current and other time manipulation methods
require 'active_support/all' 

require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

class MexCWebSocketClient
  def initialize
    @ticker_updates = {}
    @log_messages = []
    @last_pong = nil
    @subscriptions = {} # symbol: status (:subscribing -> :subscribed -> :unsubscribing -> :unsubscribed)
  end

  def log(message)
    puts message unless $stdout.tty?
    @log_messages << message
  end

  def ping(connection)
    log("Ping...")
    connection.write(Protocol::WebSocket::TextMessage.generate({
      method: "PING", 
      params: []
    }))    
  end

  def subscribe(symbol, connection)
    state = @subscriptions.fetch(symbol, :unsubscribed)
    if state == :unsubscribed
      @subscriptions[symbol] = :subscribing

      log("Subscribe to #{symbol} updates")
      connection.write(Protocol::WebSocket::TextMessage.generate({
        method: "SUBSCRIPTION", 
        params: [ "[email protected]@#{symbol}@UTC+1" ]
      }))
      connection.flush
    else
      log("Can't subscribe to #{symbol} updates (state=#{state})")
    end
  end

  def unsubscribe(symbol, connection)
    state = @subscriptions.fetch(symbol, :unsubscribed)
    if state == :subscribed
      @subscriptions[symbol] = :unsubscribing

      log("Unsubscribe from #{symbol} updates")
      connection.write(Protocol::WebSocket::TextMessage.generate({
        method: "UNSUBSCRIPTION", 
        params: [ "[email protected]@#{symbol}@UTC+1" ]
      }))  
      connection.flush
    else
      log("Can't unsubscribe from #{symbol} updates (state=#{state})")
    end      
  end

  def display_status(connection)
    messages = [@log_messages.count, 10].min
    (@ticker_updates.keys.length + 4 + messages).times { print "\e[A\e[K" } if $stdout.tty?
    puts "MEXC Public Listener Realtime Status"
    puts "Last PingPong\t#{@last_pong}"
    puts "Last Ticker Updates"
    @ticker_updates.each do |symbol, data|
      puts "  #{'%-12s' % symbol}#{'%16s' % data[:p]} (#{data[:tr]})"
    end
    puts "Last Log Messages"
    @log_messages.last(messages).each do |msg|
      puts "  #{msg}"
    end    
  end

  def process_command(command, connection)
    log "Process Command #{command}"

    case command.chomp
    when /\Aquit\z/
      puts("Remove current subscriptions...")
      @subscriptions.each do |symbol, state| 
        unsubscribe(symbol, connection) if state == :subscribed
      end
      sleep(0.1) while @ticker_updates.count > 0
      puts("Shutting down connection...")
      connection.shutdown

    when /\Astatus\z/
      display_status(connection)
    
    when /\Asubscribe (\w+)\z/
      symbol = $1
      # if Pair.find_by(symbol: symbol).present?        
        subscribe(symbol, connection)
      # else
      #   log("subscribe error: #{symbol} does not exist") 
      # end

    when /\Aunsubscribe (\w+)\z/
      symbol = $1
      # if Pair.find_by(symbol: symbol).present?
        unsubscribe(symbol, connection)
      # else
      #   log("unsubscribe error: #{symbol} does not exist") 
      # end
    end
  end

  def process_message(message)
    m = message.parse # decode message as JSON

    if (s = m[:s]) && (c = m[:c]) && c.start_with?("[email protected]") && (d = m[:d])
      @ticker_updates[s] = d
    elsif m[:msg]
      if m[:msg] == "PONG"
        @last_pong = Time.current
      else
        symbols = m[:msg].scan(/@([A-Z]+)@UTC/).flatten.uniq
        symbols.each do |symbol|
          if @subscriptions.has_key?(symbol)
            case @subscriptions[symbol]
            when :subscribing
              @subscriptions[symbol] = :subscribed
            when :unsubscribing
              @ticker_updates.delete(symbol)
              @subscriptions[symbol] = :unsubscribed
            end
          end
        end
      end
    else
      log(m.to_h)
    end
  end

  def run
    Async do |task|
      endpoint = Async::HTTP::Endpoint.parse("wss://wbs.mexc.com/ws",
        alpn_protocols: Async::HTTP::Protocol::HTTP11.names
      )

      Async::WebSocket::Client.connect(endpoint) do |connection|
        input_task = task.async do
          while command = $stdin.gets
            process_command(command, connection)
            $stdout.write "> "
          end
        end

        ping_task = task.async do
          loop do
            ping(connection)
            sleep(20)
          end
        end

        $stdout.write("Connected to MexC WebSocket server.\n")
        $stdout.write "> "

        # Always subscribe to BTCUSDT ticker
        subscribe('BTCUSDT', connection)

        while message = connection.read
          process_message(message)
        end

        puts "Exiting..."
      ensure
        puts "Stopping Tasks..."
        begin  
          input_task&.stop
          renew_task&.stop
          input_task&.wait
          renew_task&.wait
        rescue
          puts "Ignore Task End exceptions"
        end  
        puts "Done."
      end
    end
  rescue
    puts "Exit ignoring exceptions..."
  end
end

client = MexCWebSocketClient.new
client.run

Run using Ruby 3.4.1 with async, async-websocket and rails installed (or at least active-support) On prompt > try commands : status, subscribe ETHUSDT, unsubscribe BTCUSDT and quit or Ctrl+C

zedalaye avatar Jan 27 '25 23:01 zedalaye