viem icon indicating copy to clipboard operation
viem copied to clipboard

Websocket reconnect doesn't work

Open justefg opened this issue 1 year ago • 6 comments

Check existing issues

Viem Version

2.18.5

Current Behavior

Websocket doesn't reconnect after a connection is dropped preventing users from receiving events.

Expected Behavior

[2024-07-30 22:26:24.411] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d50dad340","0x000000000000000000000000000000000000000000000000000000000006444e"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f810","blockNumber":"59991410","transactionHash":"0xd01e7b361907762ed05bf7a460cfebee5b81022e160eae979f5d8cc1941e9cbd","transactionIndex":65,"blockHash":"0x6827b8a4059d7e188fcd3a1338a6209bb563d259252fdb067f91949cc4bef26a","logIndex":304,"removed":false,"args":{"current":"332069000000","roundId":"410702","updatedAt":"1722349584"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:34.342] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5a1d27f0","0x000000000000000000000000000000000000000000000000000000000006444f"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f81a","blockNumber":"59991415","transactionHash":"0x06c8fafeeadc8e269a91af5a8ecd7475f388b01176a3a78ec8a23548e6885565","transactionIndex":9,"blockHash":"0x430ec10a97a9bc4280d1320966d8136599f4edb8b73a67e02afc3efa572f5426","logIndex":51,"removed":false,"args":{"current":"332224342000","roundId":"410703","updatedAt":"1722349594"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:44.392] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5ea35808","0x0000000000000000000000000000000000000000000000000000000000064450"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f824","blockNumber":"59991420","transactionHash":"0xd4a47f2d47ed475879ad1cd263c2360b4789af27ef6f459d709075cf4ad080ae","transactionIndex":28,"blockHash":"0x6710a37394365575d07c9253f7133045f2d2db982f603a8ca55366289c9f505b","logIndex":166,"removed":false,"args":{"current":"332300245000","roundId":"410704","updatedAt":"1722349604"},"eventName":"AnswerUpdated"}]

Steps To Reproduce

Run minimal reproducible example usingts-node: ts-node example.ts

It works fine for about an hour but then after a connection drop events are no longer received. Tested with multiple websocket providers.

Link to Minimal Reproducible Example

https://gist.github.com/justefg/95acdcb5d8cbee6930b6177120c24cbc

Anything else?

No response

justefg avatar Jul 30 '24 14:07 justefg

Hooked up onError to WebSocket subscriptions here: https://github.com/wevm/viem/commit/889371e3d62b8c6044c463f933bc49c4f00e742b, so you should be able to receive socket closure errors on your onError callback in the next release. However, more work is needed to get reconnection to work properly, will work on that next. For now, you can manually reinvoke watchContractEvent when you receive a socket closure error.

jxom avatar Jul 30 '24 20:07 jxom

thank you @jxom

justefg avatar Jul 31 '24 11:07 justefg

@justefg I suppose you are using chainstack as your node provider? They do close websocket connections after a while and viem does not catch that case. This is my hack. Perhaps it helps:

  /**
   * NOTE: We are using a custom re-connect mechanism as node providers
   * might close the websocket even if keep-alives are used. Viem does not
   * support this reconnect mechanism out of the box, so we have to implement
   * this ourselves.
   */
  private async connect() {
    this._logger.info("Connecting...");

    const socketRpcClient = await this._client.transport.getRpcClient();

    const heartbeat = () => {
      this._logger.info("ping ->");
      this._client
        .getBlockNumber()
        .then((_) => {
          this._logger.info("<- pong");
        })
        .catch((err) => this._logger.error(err));
    };

    const intervalId = setInterval(heartbeat, 5 * 60 * 1000);

    const onOpen = (_: Event) => {
      // drop
    };
    const onMessage = (_: MessageEvent<any>) => {
      // drop
    };
    const onError = (ev: Event) => {
      this._logger.error(ev);
    };
    const onClose = async () => {
      this._logger.warn("Websocket connection closed!");
      socketRpcClient.socket.removeEventListener("open", onOpen);
      socketRpcClient.socket.removeEventListener("message", onMessage);
      socketRpcClient.socket.removeEventListener("error", onError);
      socketRpcClient.socket.removeEventListener("close", onClose);
      // NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
      // happens on socket level, the same socketClient with the closed websocket will be
      // re-used from cache leading to 'Socket is closed.' error.
      socketRpcClient.close();
      clearInterval(intervalId);
      this._client = this._clientManager.getClient(this._chain);
      this._logger.info("Re-establishing connection!");
      this.connect();
    };

    const setupEventListeners = () => {
      socketRpcClient.socket.addEventListener("open", onOpen);
      socketRpcClient.socket.addEventListener("message", onMessage);
      socketRpcClient.socket.addEventListener("error", onError);
      socketRpcClient.socket.addEventListener("close", onClose);
    };

    setupEventListeners();

    heartbeat();
  }

nstylo avatar Aug 07 '24 18:08 nstylo

Hey @jxom, hows progress on that? Just run into the issue today. Is is already fixed in latest release?

40818419 avatar Aug 20 '24 15:08 40818419

@nstylo interesting. have you tested it?

justefg avatar Aug 21 '24 16:08 justefg

@justefg yes, works very reliably. I have to see on which version of view I am using this solution.

nstylo avatar Aug 21 '24 16:08 nstylo

Hey @nstylo I am running a node.js application, which listens to events on websocket, via the viem client,. and then inserts these events in a mongoDb collection. I am re-using the code you pasted above to fix the connection close issue, but i am facing a very weird issue on my side. After few days of activity, my node.js application runs out of sockets/file-descriptors, and all the network requests (viem ws rpc and outbound mongodb requests) fail with socket timeout error. It seems that after a reconnection attempt, node.js GC is not able to clean up the closed sockets.

Have anyone else also faced a similar issue? Or am i doing something wrong here?

export class WsListener {
  private logger = new Logger(WsListener.name);
  private client: PublicClient;
  private contractAddress;
  private eventAbi;
  private handlerFn;
  private chainId;
  private wsUrl;
  private intervalId: NodeJS.Timeout;
  private connectionEventTracker;
  private eventName;

  constructor({
    chainId,
    wsUrl,
    contractAddress,
    eventName,
    eventAbi,
    handlerFn,
    connectionEventTracker,
  }: {
    chainId: number;
    wsUrl: string;
    contractAddress: `0x${string}`;
    eventName: string;
    eventAbi: string;
    handlerFn: (chainId: number, log: EVENT_LOG[]) => any;
    connectionEventTracker: (params: {
      chainId: number;
      trackedEvent: string;
      type: WebsocketConnectionEventType;
    }) => Promise<void>;
  }) {
    this.wsUrl = wsUrl;
    this.contractAddress = contractAddress;
    this.eventName = eventName;
    this.eventAbi = eventAbi;
    this.handlerFn = handlerFn;
    this.chainId = chainId;
    this.connectionEventTracker = connectionEventTracker;
    this.connectClient();
  }

  private async connectClient() {
    try {
      // add a log in db, when a new connection is created 
      this.connectionEventTracker({
        chainId: this.chainId,
        trackedEvent: this.eventName,
        type: WebsocketConnectionEventType.Connection,
      });
      this.client = createPublicClient({
        transport: webSocket(this.wsUrl, {
          keepAlive: { interval: 1_000 }, // 1000 ms (will send keep alive ping messages every 1 sec)
          reconnect: true,
          retryCount: 5,
          timeout: 60_000, // 60 secs
        }),
      });
      this.attachHandler();
      await this.setupEventListeners();
      // ping every min
      this.intervalId = setInterval(() => this.heartbeat(), 1 * 60 * 1000);
    } catch (err) {
      this.logger.error(`Error while connecting client: `, err);
    }
  }

  private attachHandler() {
    // listen to event
    this.client.watchContractEvent({
      address: this.contractAddress,
      abi: parseAbi([this.eventAbi]),
      eventName: this.eventName,
      onLogs: async (logs: EVENT_LOG[]) => {
        this.logger.log(
          `...received event ${this.eventName} on ${this.chainId}...`,
        );
        await this.handlerFn(this.chainId, logs);
      },
    });
  }

  private async setupEventListeners() {
    const socketRpcClient = await this.client.transport.getRpcClient();

    // using arrow function wrappers over the event listeners, to preserve `this` context
    socketRpcClient.socket.addEventListener('open', (args: any) =>
      this.onOpen(args),
    );
    socketRpcClient.socket.addEventListener('message', (args: any) =>
      this.onMessage(args),
    );
    socketRpcClient.socket.addEventListener('error', (args: any) =>
      this.onError(args),
    );
    socketRpcClient.socket.addEventListener('close', () => this.onClose());
  }

  getClient() {
    return this.client;
  }

  async heartbeat() {
    try {
      this.logger.log(`...ping.........`);
      await this.client.getBlockNumber();
      this.logger.log(`.........pong...`);
    } catch (err) {
      this.logger.error(`---heartbeat-error---`, err);
      throw err;
    }
  }

  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  onOpen(_: Event) {}

  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  onMessage(_: MessageEvent<any>) {}

  onError(ev: Event) {
    this.logger.error(`websocket error: `, ev);
  }

  async onClose() {
    try {
      this.logger.warn('Websocket connection closed!');
      const socketRpcClient = await this.client.transport.getRpcClient();
      socketRpcClient.socket.removeEventListener('open', (args: any) =>
        this.onOpen(args),
      );
      socketRpcClient.socket.removeEventListener('message', (args: any) =>
        this.onMessage(args),
      );
      socketRpcClient.socket.removeEventListener('error', (args: any) =>
        this.onError(args),
      );
      socketRpcClient.socket.removeEventListener('close', () => this.onClose());
      // NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
      // happens on socket level, the same socketClient with the closed websocket will be
      // re-used from cache leading to 'Socket is closed.' error.
      socketRpcClient.close();
      
      // logs in db, when a disconnection happens
      this.connectionEventTracker({
        chainId: this.chainId,
        trackedEvent: this.eventName,
        type: WebsocketConnectionEventType.Disconnection,
      });

      clearInterval(this.intervalId);
      this.logger.log('....Re-establishing connection!..');
      this.connectClient();
      this.logger.log('....Re-established connection!..');
    } catch (err) {
      this.logger.error(`Error while closing connection: `, err);
    }
  }
}

arpan-jain avatar Oct 21 '24 05:10 arpan-jain

@arpan-jain have you checked https://github.com/websockets/ws/issues/1869? what does netstat tell you?

justefg avatar Oct 21 '24 18:10 justefg

@arpan-jain have you checked websockets/ws#1869? what does netstat tell you?

Yes. netstat shows me new connections with ESTABLISHED state. And i checked, setting reconnect: true while creating publicClient, also does the same thing. --> for every reconnection, it creates a new connection, while the earlier ones remain there in the ESTABLISHED state. So, after while, my node.js process will go out of new connections to create.

arpan-jain avatar Oct 22 '24 13:10 arpan-jain

@jxom can you assist please?

justefg avatar Oct 22 '24 13:10 justefg

@arpan-jain @jxom it looks like we should add socketClient.close() here before attempting to reconnect https://github.com/wevm/viem/blob/main/src/utils/rpc/socket.ts#L168 both for onClose and onError

justefg avatar Oct 22 '24 14:10 justefg

Hey @jxom will this issue be fixed soon? Looks like @justefg provided the solution.

adrian-crpt avatar Oct 25 '24 21:10 adrian-crpt

How I would go about making sure reconnections work properly? You can use a firewall for that. On mac you can just modify /etc/pf.conf and add this line block drop out to polygon-mainnet.g.alchemy.com Then reload the rules

sudo pfctl -f /etc/pf.conf
sudo pfctl -e

then once it attempts reconnecting you can comment the line and reload again

sudo pfctl -f /etc/pf.conf
sudo pfctl -e

On linux you can use iptables or ufw.

justefg avatar Oct 31 '24 05:10 justefg

Chiming in here to share that including socketClient.close() helps my problem, but still eventually run into websocket errors. The difference is it used to error within a day without that addition, and now it might take a week at best. Would love some more consistency around websockets in viem

slmatthiesen avatar Dec 06 '24 04:12 slmatthiesen

This issue has been locked since it has been closed for more than 14 days.

If you found a concrete bug or regression related to it, please open a new bug report with a reproduction against the latest Viem version. If you have any questions or comments you can create a new discussion thread.

github-actions[bot] avatar Feb 21 '25 00:02 github-actions[bot]