socket.io icon indicating copy to clipboard operation
socket.io copied to clipboard

Issue with new socketsJoin

Open dalda95 opened this issue 2 years ago • 6 comments

Migrating from remoteJoin to use the socketsJoin i have notice a strange behavior, seems that the socketsJoin method is not waiting for a response from others server so that if we perform an emit after a socketJoin the added socket is not receiving the message

Notice: the socketId1 maybe is connected to another server


io.in(socketId1).socketsJoin(room1)
io.to(room1).emit('my-event', payload)

in that case the socketId1 is not receiving the message, i saw the code and in the past version the remoteJoin was a Promise so i imagine that the function is waiting for a response, but now i think that is not waiting causing a problem, maybe i miss something, but if i add in the middle a simple fetchSockets it start to work

io.in(socketId1).socketsJoin(room1)
await io.in(room1).fetchSockets() // also sometimes it not return the socketId1
io.to(room1).emit('my-event', payload)

if i add a:

io.in(socketId1).socketsJoin(room1)
io.to(room1).emit('my-event', payload)


setTimeout(async() => {
     await io.in(room1).fetchSockets() // in that case it return always the socketId
}, 1000)

can you please help me to understand the problem here ?

Thank you

dalda95 avatar Jun 07 '23 13:06 dalda95

Hi! Which adapter are you using? The one based on Redis?

darrachequesne avatar Jun 20 '23 22:06 darrachequesne

i'm using the latest of socket.io-redis-adapter

dalda95 avatar Jun 27 '23 09:06 dalda95

I have the same issue with this one Is there any workaround to fix it?

my nestjs adapter

import crypto from 'node:crypto';

import { INestApplicationContext, Logger } from '@nestjs/common';
import { isNil } from '@nestjs/common/utils/shared.utils';
import { ConfigService } from '@nestjs/config';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { MessageMappingProperties } from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import {
  Observable,
  filter,
  first,
  fromEvent,
  map,
  mergeMap,
  share,
  takeUntil,
} from 'rxjs';
import { ServerOptions, Socket } from 'socket.io';

export class SocketIOAdapter extends IoAdapter {
  private readonly logger = new Logger(SocketIOAdapter.name);

  private adapterConstructor: ReturnType<typeof createAdapter>;

  constructor(
    private readonly app: INestApplicationContext,
    private readonly configService: ConfigService,
  ) {
    super(app);
  }

  async connectToRedis(): Promise<void> {
    const pubClient = createClient({
      url: this.configService.get('REDIS_URL'),
    });

    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.adapterConstructor = createAdapter(pubClient, subClient);

    this.logger.log('Connected to Redis');
  }

  createIOServer(port: number, options?: ServerOptions) {
    const server = super.createIOServer(port, options);

    server.adapter(this.adapterConstructor);

    this.logger.log('Socket.IO server created');

    return server;
  }

  private createRequestId() {
    return crypto.randomUUID();
  }

  public bindMessageHandlers(
    socket: Socket,
    handlers: MessageMappingProperties[],
    transform: (data: unknown) => Observable<unknown>,
  ) {
    const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    for (const { message, callback } of handlers) {
      const source$ = fromEvent(socket, message).pipe(
        mergeMap((payload: unknown) => {
          const { data, ack } = this.mapPayload(payload);
          // <-- Modified Section Start
          const requestId = this.createRequestId();

          return (
            transform(callback(data, ack, requestId))
              // Modified Section End -->
              .pipe(
                filter((response: unknown) => !isNil(response)),
                map((response: unknown) => [response, ack]),
              )
          );
        }),
        takeUntil(disconnect$),
      );

      source$.subscribe(([response, ack]) => {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-ignore
        if (response.event) {
          // eslint-disable-next-line @typescript-eslint/ban-ts-comment
          // @ts-ignore
          return socket.emit(response.event, response.data);
        }
        if (typeof ack === 'function') ack(response);
      });
    }
  }
}

krutilins avatar Aug 17 '23 14:08 krutilins

We have the same issue and will probably build a custom synchronization mechanism and not use socketsJoin at all to tell other nodes to make sockets join a room.

This is very unfortunate as this is typically the promise (no pun intended) of socket.io and its adapters to allow broadcasting events which is not working...

dhardtke avatar Mar 20 '24 21:03 dhardtke

This is indeed an issue with the Redis adapter, as it uses different channels for broadcasting and other commands like socketsJoin().

darrachequesne avatar Mar 25 '24 07:03 darrachequesne