aiohttp icon indicating copy to clipboard operation
aiohttp copied to clipboard

need hooks on websocket client side like on_websocket_connect, on_websocket_error, on_websocket_close

Open torchcc opened this issue 4 years ago • 1 comments

Hello, as for websocket client side, is there a way to implement hooks like on_websocket_connect, on_websocket_error, on_websocket_close just as Java Jetty websocket client provides? also, how to do if I need to customize my one ping(), and do sth every time when a ping msg is sent. looking forward to your reply, thank you

torchcc avatar Jan 16 '21 04:01 torchcc


import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@WebSocket
public class CustomWebSocketClient {
    private Session session;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final AtomicInteger pingCounter = new AtomicInteger(0);

    // Hook: Called when WebSocket connection is established
    @OnWebSocketConnect
    public void onWebSocketConnect(Session session) {
        this.session = session;
        System.out.println("Connected to: " + session.getRemoteAddress());
        
        // Start periodic ping scheduling
        startPingSchedule();
    }

    // Hook: Called when a WebSocket error occurs
    @OnWebSocketError
    public void onWebSocketError(Throwable cause) {
        System.err.println("WebSocket error: " + cause.getMessage());
        cause.printStackTrace();
        
        // Optional: Attempt to reconnect or clean up
        stopPingSchedule();
    }

    // Hook: Called when WebSocket connection is closed
    @OnWebSocketClose
    public void onWebSocketClose(int statusCode, String reason) {
        System.out.println("Connection closed: Status=" + statusCode + ", Reason=" + reason);
        this.session = null;
        
        // Stop ping scheduling
        stopPingSchedule();
    }

    // Handle incoming messages (optional for completeness)
    @OnWebSocketMessage
    public void onWebSocketMessage(String message) {
        System.out.println("Received message: " + message);
    }

    // Handle incoming pong messages (to verify ping responses)
    @OnWebSocketMessage
    public void onWebSocketPong(ByteBuffer payload) {
        String payloadStr = new String(payload.array());
        System.out.println("Received pong: " + payloadStr);
    }

    // Custom ping method
    private void sendPing() {
        if (session != null && session.isOpen()) {
            try {
                String pingData = "Ping-" + pingCounter.incrementAndGet();
                ByteBuffer payload = ByteBuffer.wrap(pingData.getBytes());
                session.getRemote().sendPing(payload);
                
                // Custom action every time a ping is sent
                onPingSent(pingData);
            } catch (Exception e) {
                System.err.println("Error sending ping: " + e.getMessage());
            }
        } else {
            System.out.println("Cannot send ping: Session is not open");
        }
    }

    // Custom action triggered on each ping
    private void onPingSent(String pingData) {
        // Example: Log ping event or perform custom logic
        System.out.println("Ping sent: " + pingData + " at " + System.currentTimeMillis());
        
        // Add your custom logic here, e.g., update metrics, notify a service, etc.
        // For example: metricsService.incrementPingCount();
    }

    // Schedule periodic pings
    private void startPingSchedule() {
        scheduler.scheduleAtFixedRate(this::sendPing, 5, 5, TimeUnit.SECONDS);
        System.out.println("Started ping schedule (every 5 seconds)");
    }

    // Stop ping scheduling
    private void stopPingSchedule() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
        }
        System.out.println("Stopped ping schedule");
    }

    // Main method to run the client
    public static void main(String[] args) {
        WebSocketClient client = new WebSocketClient();
        CustomWebSocketClient socket = new CustomWebSocketClient();

        try {
            client.start();
            URI uri = new URI("ws://localhost:8080");
            client.connect(socket, uri).get(5, TimeUnit.SECONDS);
            System.out.println("Connecting to: " + uri);

            // Keep the application running
            Thread.sleep(60000); // Run for 60 seconds
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

ljluestc avatar May 10 '25 13:05 ljluestc