aiohttp
aiohttp copied to clipboard
need hooks on websocket client side like on_websocket_connect, on_websocket_error, on_websocket_close
Hello, as for websocket client side, is there a way to implement hooks like on_websocket_connect, on_websocket_error, on_websocket_close just as Java Jetty websocket client provides? also, how to do if I need to customize my one ping(), and do sth every time when a ping msg is sent. looking forward to your reply, thank you
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@WebSocket
public class CustomWebSocketClient {
private Session session;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final AtomicInteger pingCounter = new AtomicInteger(0);
// Hook: Called when WebSocket connection is established
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
this.session = session;
System.out.println("Connected to: " + session.getRemoteAddress());
// Start periodic ping scheduling
startPingSchedule();
}
// Hook: Called when a WebSocket error occurs
@OnWebSocketError
public void onWebSocketError(Throwable cause) {
System.err.println("WebSocket error: " + cause.getMessage());
cause.printStackTrace();
// Optional: Attempt to reconnect or clean up
stopPingSchedule();
}
// Hook: Called when WebSocket connection is closed
@OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) {
System.out.println("Connection closed: Status=" + statusCode + ", Reason=" + reason);
this.session = null;
// Stop ping scheduling
stopPingSchedule();
}
// Handle incoming messages (optional for completeness)
@OnWebSocketMessage
public void onWebSocketMessage(String message) {
System.out.println("Received message: " + message);
}
// Handle incoming pong messages (to verify ping responses)
@OnWebSocketMessage
public void onWebSocketPong(ByteBuffer payload) {
String payloadStr = new String(payload.array());
System.out.println("Received pong: " + payloadStr);
}
// Custom ping method
private void sendPing() {
if (session != null && session.isOpen()) {
try {
String pingData = "Ping-" + pingCounter.incrementAndGet();
ByteBuffer payload = ByteBuffer.wrap(pingData.getBytes());
session.getRemote().sendPing(payload);
// Custom action every time a ping is sent
onPingSent(pingData);
} catch (Exception e) {
System.err.println("Error sending ping: " + e.getMessage());
}
} else {
System.out.println("Cannot send ping: Session is not open");
}
}
// Custom action triggered on each ping
private void onPingSent(String pingData) {
// Example: Log ping event or perform custom logic
System.out.println("Ping sent: " + pingData + " at " + System.currentTimeMillis());
// Add your custom logic here, e.g., update metrics, notify a service, etc.
// For example: metricsService.incrementPingCount();
}
// Schedule periodic pings
private void startPingSchedule() {
scheduler.scheduleAtFixedRate(this::sendPing, 5, 5, TimeUnit.SECONDS);
System.out.println("Started ping schedule (every 5 seconds)");
}
// Stop ping scheduling
private void stopPingSchedule() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
System.out.println("Stopped ping schedule");
}
// Main method to run the client
public static void main(String[] args) {
WebSocketClient client = new WebSocketClient();
CustomWebSocketClient socket = new CustomWebSocketClient();
try {
client.start();
URI uri = new URI("ws://localhost:8080");
client.connect(socket, uri).get(5, TimeUnit.SECONDS);
System.out.println("Connecting to: " + uri);
// Keep the application running
Thread.sleep(60000); // Run for 60 seconds
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}