Issue with new socketsJoin
Migrating from remoteJoin to use the socketsJoin i have notice a strange behavior, seems that the socketsJoin method is not waiting for a response from others server so that if we perform an emit after a socketJoin the added socket is not receiving the message
Notice: the socketId1 maybe is connected to another server
io.in(socketId1).socketsJoin(room1)
io.to(room1).emit('my-event', payload)
in that case the socketId1 is not receiving the message, i saw the code and in the past version the remoteJoin was a Promise so i imagine that the function is waiting for a response, but now i think that is not waiting causing a problem, maybe i miss something, but if i add in the middle a simple fetchSockets it start to work
io.in(socketId1).socketsJoin(room1)
await io.in(room1).fetchSockets() // also sometimes it not return the socketId1
io.to(room1).emit('my-event', payload)
if i add a:
io.in(socketId1).socketsJoin(room1)
io.to(room1).emit('my-event', payload)
setTimeout(async() => {
await io.in(room1).fetchSockets() // in that case it return always the socketId
}, 1000)
can you please help me to understand the problem here ?
Thank you
Hi! Which adapter are you using? The one based on Redis?
i'm using the latest of socket.io-redis-adapter
I have the same issue with this one Is there any workaround to fix it?
my nestjs adapter
import crypto from 'node:crypto';
import { INestApplicationContext, Logger } from '@nestjs/common';
import { isNil } from '@nestjs/common/utils/shared.utils';
import { ConfigService } from '@nestjs/config';
import { IoAdapter } from '@nestjs/platform-socket.io';
import { MessageMappingProperties } from '@nestjs/websockets';
import { DISCONNECT_EVENT } from '@nestjs/websockets/constants';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import {
Observable,
filter,
first,
fromEvent,
map,
mergeMap,
share,
takeUntil,
} from 'rxjs';
import { ServerOptions, Socket } from 'socket.io';
export class SocketIOAdapter extends IoAdapter {
private readonly logger = new Logger(SocketIOAdapter.name);
private adapterConstructor: ReturnType<typeof createAdapter>;
constructor(
private readonly app: INestApplicationContext,
private readonly configService: ConfigService,
) {
super(app);
}
async connectToRedis(): Promise<void> {
const pubClient = createClient({
url: this.configService.get('REDIS_URL'),
});
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.adapterConstructor = createAdapter(pubClient, subClient);
this.logger.log('Connected to Redis');
}
createIOServer(port: number, options?: ServerOptions) {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
this.logger.log('Socket.IO server created');
return server;
}
private createRequestId() {
return crypto.randomUUID();
}
public bindMessageHandlers(
socket: Socket,
handlers: MessageMappingProperties[],
transform: (data: unknown) => Observable<unknown>,
) {
const disconnect$ = fromEvent(socket, DISCONNECT_EVENT).pipe(
share(),
first(),
);
for (const { message, callback } of handlers) {
const source$ = fromEvent(socket, message).pipe(
mergeMap((payload: unknown) => {
const { data, ack } = this.mapPayload(payload);
// <-- Modified Section Start
const requestId = this.createRequestId();
return (
transform(callback(data, ack, requestId))
// Modified Section End -->
.pipe(
filter((response: unknown) => !isNil(response)),
map((response: unknown) => [response, ack]),
)
);
}),
takeUntil(disconnect$),
);
source$.subscribe(([response, ack]) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
if (response.event) {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
return socket.emit(response.event, response.data);
}
if (typeof ack === 'function') ack(response);
});
}
}
}
We have the same issue and will probably build a custom synchronization mechanism and not use socketsJoin at all to tell other nodes to make sockets join a room.
This is very unfortunate as this is typically the promise (no pun intended) of socket.io and its adapters to allow broadcasting events which is not working...
This is indeed an issue with the Redis adapter, as it uses different channels for broadcasting and other commands like socketsJoin().