Websocket reconnect doesn't work
Check existing issues
- [X] I checked there isn't already an issue for the bug I encountered.
Viem Version
2.18.5
Current Behavior
Websocket doesn't reconnect after a connection is dropped preventing users from receiving events.
Expected Behavior
[2024-07-30 22:26:24.411] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d50dad340","0x000000000000000000000000000000000000000000000000000000000006444e"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f810","blockNumber":"59991410","transactionHash":"0xd01e7b361907762ed05bf7a460cfebee5b81022e160eae979f5d8cc1941e9cbd","transactionIndex":65,"blockHash":"0x6827b8a4059d7e188fcd3a1338a6209bb563d259252fdb067f91949cc4bef26a","logIndex":304,"removed":false,"args":{"current":"332069000000","roundId":"410702","updatedAt":"1722349584"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:34.342] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5a1d27f0","0x000000000000000000000000000000000000000000000000000000000006444f"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f81a","blockNumber":"59991415","transactionHash":"0x06c8fafeeadc8e269a91af5a8ecd7475f388b01176a3a78ec8a23548e6885565","transactionIndex":9,"blockHash":"0x430ec10a97a9bc4280d1320966d8136599f4edb8b73a67e02afc3efa572f5426","logIndex":51,"removed":false,"args":{"current":"332224342000","roundId":"410703","updatedAt":"1722349594"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:44.392] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5ea35808","0x0000000000000000000000000000000000000000000000000000000000064450"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f824","blockNumber":"59991420","transactionHash":"0xd4a47f2d47ed475879ad1cd263c2360b4789af27ef6f459d709075cf4ad080ae","transactionIndex":28,"blockHash":"0x6710a37394365575d07c9253f7133045f2d2db982f603a8ca55366289c9f505b","logIndex":166,"removed":false,"args":{"current":"332300245000","roundId":"410704","updatedAt":"1722349604"},"eventName":"AnswerUpdated"}]
Steps To Reproduce
Run minimal reproducible example usingts-node:
ts-node example.ts
It works fine for about an hour but then after a connection drop events are no longer received. Tested with multiple websocket providers.
Link to Minimal Reproducible Example
https://gist.github.com/justefg/95acdcb5d8cbee6930b6177120c24cbc
Anything else?
No response
Hooked up onError to WebSocket subscriptions here: https://github.com/wevm/viem/commit/889371e3d62b8c6044c463f933bc49c4f00e742b, so you should be able to receive socket closure errors on your onError callback in the next release. However, more work is needed to get reconnection to work properly, will work on that next. For now, you can manually reinvoke watchContractEvent when you receive a socket closure error.
thank you @jxom
@justefg I suppose you are using chainstack as your node provider? They do close websocket connections after a while and viem does not catch that case. This is my hack. Perhaps it helps:
/**
* NOTE: We are using a custom re-connect mechanism as node providers
* might close the websocket even if keep-alives are used. Viem does not
* support this reconnect mechanism out of the box, so we have to implement
* this ourselves.
*/
private async connect() {
this._logger.info("Connecting...");
const socketRpcClient = await this._client.transport.getRpcClient();
const heartbeat = () => {
this._logger.info("ping ->");
this._client
.getBlockNumber()
.then((_) => {
this._logger.info("<- pong");
})
.catch((err) => this._logger.error(err));
};
const intervalId = setInterval(heartbeat, 5 * 60 * 1000);
const onOpen = (_: Event) => {
// drop
};
const onMessage = (_: MessageEvent<any>) => {
// drop
};
const onError = (ev: Event) => {
this._logger.error(ev);
};
const onClose = async () => {
this._logger.warn("Websocket connection closed!");
socketRpcClient.socket.removeEventListener("open", onOpen);
socketRpcClient.socket.removeEventListener("message", onMessage);
socketRpcClient.socket.removeEventListener("error", onError);
socketRpcClient.socket.removeEventListener("close", onClose);
// NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
// happens on socket level, the same socketClient with the closed websocket will be
// re-used from cache leading to 'Socket is closed.' error.
socketRpcClient.close();
clearInterval(intervalId);
this._client = this._clientManager.getClient(this._chain);
this._logger.info("Re-establishing connection!");
this.connect();
};
const setupEventListeners = () => {
socketRpcClient.socket.addEventListener("open", onOpen);
socketRpcClient.socket.addEventListener("message", onMessage);
socketRpcClient.socket.addEventListener("error", onError);
socketRpcClient.socket.addEventListener("close", onClose);
};
setupEventListeners();
heartbeat();
}
Hey @jxom, hows progress on that? Just run into the issue today. Is is already fixed in latest release?
@nstylo interesting. have you tested it?
@justefg yes, works very reliably. I have to see on which version of view I am using this solution.
Hey @nstylo I am running a node.js application, which listens to events on websocket, via the viem client,. and then inserts these events in a mongoDb collection. I am re-using the code you pasted above to fix the connection close issue, but i am facing a very weird issue on my side. After few days of activity, my node.js application runs out of sockets/file-descriptors, and all the network requests (viem ws rpc and outbound mongodb requests) fail with socket timeout error. It seems that after a reconnection attempt, node.js GC is not able to clean up the closed sockets.
Have anyone else also faced a similar issue? Or am i doing something wrong here?
export class WsListener {
private logger = new Logger(WsListener.name);
private client: PublicClient;
private contractAddress;
private eventAbi;
private handlerFn;
private chainId;
private wsUrl;
private intervalId: NodeJS.Timeout;
private connectionEventTracker;
private eventName;
constructor({
chainId,
wsUrl,
contractAddress,
eventName,
eventAbi,
handlerFn,
connectionEventTracker,
}: {
chainId: number;
wsUrl: string;
contractAddress: `0x${string}`;
eventName: string;
eventAbi: string;
handlerFn: (chainId: number, log: EVENT_LOG[]) => any;
connectionEventTracker: (params: {
chainId: number;
trackedEvent: string;
type: WebsocketConnectionEventType;
}) => Promise<void>;
}) {
this.wsUrl = wsUrl;
this.contractAddress = contractAddress;
this.eventName = eventName;
this.eventAbi = eventAbi;
this.handlerFn = handlerFn;
this.chainId = chainId;
this.connectionEventTracker = connectionEventTracker;
this.connectClient();
}
private async connectClient() {
try {
// add a log in db, when a new connection is created
this.connectionEventTracker({
chainId: this.chainId,
trackedEvent: this.eventName,
type: WebsocketConnectionEventType.Connection,
});
this.client = createPublicClient({
transport: webSocket(this.wsUrl, {
keepAlive: { interval: 1_000 }, // 1000 ms (will send keep alive ping messages every 1 sec)
reconnect: true,
retryCount: 5,
timeout: 60_000, // 60 secs
}),
});
this.attachHandler();
await this.setupEventListeners();
// ping every min
this.intervalId = setInterval(() => this.heartbeat(), 1 * 60 * 1000);
} catch (err) {
this.logger.error(`Error while connecting client: `, err);
}
}
private attachHandler() {
// listen to event
this.client.watchContractEvent({
address: this.contractAddress,
abi: parseAbi([this.eventAbi]),
eventName: this.eventName,
onLogs: async (logs: EVENT_LOG[]) => {
this.logger.log(
`...received event ${this.eventName} on ${this.chainId}...`,
);
await this.handlerFn(this.chainId, logs);
},
});
}
private async setupEventListeners() {
const socketRpcClient = await this.client.transport.getRpcClient();
// using arrow function wrappers over the event listeners, to preserve `this` context
socketRpcClient.socket.addEventListener('open', (args: any) =>
this.onOpen(args),
);
socketRpcClient.socket.addEventListener('message', (args: any) =>
this.onMessage(args),
);
socketRpcClient.socket.addEventListener('error', (args: any) =>
this.onError(args),
);
socketRpcClient.socket.addEventListener('close', () => this.onClose());
}
getClient() {
return this.client;
}
async heartbeat() {
try {
this.logger.log(`...ping.........`);
await this.client.getBlockNumber();
this.logger.log(`.........pong...`);
} catch (err) {
this.logger.error(`---heartbeat-error---`, err);
throw err;
}
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onOpen(_: Event) {}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
onMessage(_: MessageEvent<any>) {}
onError(ev: Event) {
this.logger.error(`websocket error: `, ev);
}
async onClose() {
try {
this.logger.warn('Websocket connection closed!');
const socketRpcClient = await this.client.transport.getRpcClient();
socketRpcClient.socket.removeEventListener('open', (args: any) =>
this.onOpen(args),
);
socketRpcClient.socket.removeEventListener('message', (args: any) =>
this.onMessage(args),
);
socketRpcClient.socket.removeEventListener('error', (args: any) =>
this.onError(args),
);
socketRpcClient.socket.removeEventListener('close', () => this.onClose());
// NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
// happens on socket level, the same socketClient with the closed websocket will be
// re-used from cache leading to 'Socket is closed.' error.
socketRpcClient.close();
// logs in db, when a disconnection happens
this.connectionEventTracker({
chainId: this.chainId,
trackedEvent: this.eventName,
type: WebsocketConnectionEventType.Disconnection,
});
clearInterval(this.intervalId);
this.logger.log('....Re-establishing connection!..');
this.connectClient();
this.logger.log('....Re-established connection!..');
} catch (err) {
this.logger.error(`Error while closing connection: `, err);
}
}
}
@arpan-jain have you checked https://github.com/websockets/ws/issues/1869? what does netstat tell you?
@arpan-jain have you checked websockets/ws#1869? what does
netstattell you?
Yes. netstat shows me new connections with ESTABLISHED state.
And i checked, setting reconnect: true while creating publicClient, also does the same thing. --> for every reconnection, it creates a new connection, while the earlier ones remain there in the ESTABLISHED state.
So, after while, my node.js process will go out of new connections to create.
@jxom can you assist please?
@arpan-jain @jxom it looks like we should add socketClient.close() here before attempting to reconnect
https://github.com/wevm/viem/blob/main/src/utils/rpc/socket.ts#L168
both for onClose and onError
Hey @jxom will this issue be fixed soon? Looks like @justefg provided the solution.
How I would go about making sure reconnections work properly? You can use a firewall for that.
On mac you can just modify /etc/pf.conf and add this line
block drop out to polygon-mainnet.g.alchemy.com
Then reload the rules
sudo pfctl -f /etc/pf.conf
sudo pfctl -e
then once it attempts reconnecting you can comment the line and reload again
sudo pfctl -f /etc/pf.conf
sudo pfctl -e
On linux you can use iptables or ufw.
Chiming in here to share that including socketClient.close() helps my problem, but still eventually run into websocket errors. The difference is it used to error within a day without that addition, and now it might take a week at best. Would love some more consistency around websockets in viem
This issue has been locked since it has been closed for more than 14 days.
If you found a concrete bug or regression related to it, please open a new bug report with a reproduction against the latest Viem version. If you have any questions or comments you can create a new discussion thread.