crossws icon indicating copy to clipboard operation
crossws copied to clipboard

sse message not parsing properly [hono]

Open SOG-web opened this issue 8 months ago • 9 comments

Environment

latest node latest hono latest crossws

Reproduction

a new hono app

sseAdapter({
      bidir: true, // Enable bidirectional messaging support
      hooks: {
        upgrade: (request) => {
          // Extract user context from request headers
          let userContext: UserContext | undefined;
          try {
            const userContextHeader = request.headers.get('userContext');
            // console.log('sse request context', request.context);
            // console.log('sse request header', request.headers);
            if (userContextHeader) {
              console.log('userContextHeader', userContextHeader);
              userContext = JSON.parse(userContextHeader);
            }

            if (!userContext) {
              // return new Response('Authentication required', { status: 401 });
              userContext = {
                userId: '1',
                role: 'user',
                labels: [],
                teams: [],
                permissions: [],
              };
            }
          } catch (error) {
            console.error('Error parsing user context:', error);
            return new Response('Invalid user context', { status: 400 });
          }
          request.context.userContext = userContext;
          // In case of bidirectional mode, extra auth is recommended based on request
          // return {
          //   headers: {},
          // };
        },
        open: (peer) => {
          // Send welcome message
          peer.send(
            JSON.stringify({
              type: 'welcome',
              message: `Welcome ${peer.id}`,
            })
          );
        },
        message: async (peer, message) => {
          console.log('Received message on sse:', message.text());
          try {
            // Read the message content as text first, then parse as JSON
            const messageText = await message.text();
            console.log('Received message text:', messageText); // Log the raw text
            const msg = JSON.parse(messageText) as ClientMessage;
} catch (error) {
            console.error('Error handling message:', error);
            peer.send(
              JSON.stringify({
                type: 'error',
                message: 'Invalid message format',
              })
            );
          }
        },
        close: (peer) => {
          // No need to manually clean up subscriptions
          // crossws handles this automatically
          console.log(`Client ${peer.id} disconnected`);
        },
      },
    });
public async handleRequest(request: Request): Promise<Response> {
    // Check if the SSE adapter is initialized
    if (!this.sseAdapter) {
      console.warn(
        'SSE adapter not initialized yet. Waiting for initialization...'
      );
      await this.initialize();
    }

    // Check if this is an SSE request
    if (
      request.headers.get('accept') === 'text/event-stream' ||
      request.headers.has('x-crossws-id')
    ) {
      return this.sseAdapter.fetch(request);
    }

    // Return 404 for non-SSE requests
    return new Response('Not found', { status: 404 });
  }

hono side

app.get(`${prefix}/sse`, async (c) => {
      const request = c.req.raw;
      // The handleRequest from the sseAdapter handles the underlying request/response
      // User context is typically handled during the 'upgrade' hook within the SSEManager
      // by reading headers, not passed directly here.
      const response = await realtimeAdapter.handleRequest(request);
      return response;
    });
    app.post(`${prefix}/sse`, async (c) => {
      const request = c.req.raw;
      // The handleRequest from the sseAdapter handles the underlying request/response
      // User context is typically handled during the 'upgrade' hook within the SSEManager
      // by reading headers, not passed directly here.
      const response = await realtimeAdapter.handleRequest(request);
      return response;
    });

Describe the bug

sending a message from the client causes this error

this.sseClient.addEventListener('open', async () => {
      console.log('Connected to SSE server');

      // Subscribe to a table
      // Send the user context with the subscription request
      await this.sseClient.send(
        JSON.stringify({
          type: 'subscribe',
          tableName: 'posts',
        })
      );

      console.log('Sent subscription request');
    });

Additional context

No response

Logs

Received message on sse: [object ReadableStream]
Received message text: [object ReadableStream]
Error handling message: SyntaxError: Unexpected token 'o', "[object Rea"... is not valid JSON
    at JSON.parse (<anonymous>)
    at message (/Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/database/src/websocket/SSEManager.ts:105:30)
    at processTicksAndRejections (node:internal/process/task_queues:105:5)

SOG-web avatar Apr 22 '25 18:04 SOG-web

more try still not working

let messageData: string;
            console.log('message', typeof message);
            console.log('message event', message.event);
            console.log('message peer', message.peer);
            console.log('message data', message.data);
            console.log('message blob', message.blob);
            console.log('message array', message.uint8Array().toString());
            console.log('message array buffer', message.arrayBuffer());
            if (message instanceof ReadableStream) {
              const reader = message.getReader();
              const { value } = await reader.read();
              messageData = new TextDecoder().decode(value);
            } else {
              messageData = message.toString();
            }
            console.log('Received message:', messageData);
            const msg = JSON.parse(messageData) as ClientMessage;

logs message object message event undefined message peer { id: '02dc70ad-bc95-4468-9f67-8fa6bc8c4f02', peers: <ref *1> Set(1) { { id: '02dc70ad-bc95-4468-9f67-8fa6bc8c4f02', peers: [Circular *1], webSocket: [SSEWebSocketStub] } }, webSocket: SSEWebSocketStub { readyState: 1 } } message data [object ReadableStream] message blob [Function: blob] message array 91,111,98,106,101,99,116,32,82,101,97,100,97,98,108,101,83,116,114,101,97,109,93 message array buffer ArrayBuffer { [Uint8Contents]: <5b 6f 62 6a 65 63 74 20 52 65 61 64 61 62 6c 65 53 74 72 65 61 6d 5d>, byteLength: 23 } Received message: [object ReadableStream] Error handling message: SyntaxError: Unexpected token 'o', "[object Rea"... is not valid JSON at JSON.parse (<anonymous>) at message (/Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/database/src/websocket/SSEManager.ts:120:30) at AdapterHookable.callHook (file:///Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/crossws/dist/shared/crossws.D9ehKjSh.mjs:8:39) at Object.fetch (file:///Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/crossws/dist/adapters/sse.mjs:26:19) at processTicksAndRejections (node:internal/process/task_queues:105:5) at /Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/api/src/core/hono/forge-api.factory.ts:68:24 at dispatch (/Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/compose.js:52:17) at /Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/src/main.ts:56:3 at dispatch (/Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/compose.js:52:17) at /Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/hono-base.js:217:25

SOG-web avatar Apr 22 '25 18:04 SOG-web

i'm not sure if hono is doing something extra. can you reproduce it with base minimum Node.js + crossws?

pi0 avatar Apr 22 '25 23:04 pi0

i'm not sure if hono is doing something extra. can you reproduce it with base minimum Node.js + crossws?

I will do that today and get back

SOG-web avatar Apr 23 '25 07:04 SOG-web

disabling stream makes it works with hono

sseClient = new WebSocketSSE('http://localhost:3000/api/sse', {
    bidir: true,
    stream: false,
  });

am yet to try with node directly

SOG-web avatar Apr 23 '25 10:04 SOG-web

@pi0 after more digging it seems hono is doing something to the body object

I just tried using it with nestjs, the stream works without any issue

SOG-web avatar Apr 23 '25 13:04 SOG-web

Thanks for confirming, i hope to find a fix but AFAIK, hono patches globals and it probably not be easy path to support hono. sorry :(

pi0 avatar Apr 23 '25 13:04 pi0

Thanks for confirming, i hope to find a fix but AFAIK, hono patches globals and it probably not be easy path to support hono. sorry :(

Am currently also finding a simple hack, I will update this issue once I find one

SOG-web avatar Apr 23 '25 13:04 SOG-web

reading the stream directly from hono before passing it to crossws is not also working

should I close this issue or we leave it open till it is fixed

SOG-web avatar Apr 23 '25 13:04 SOG-web

feel free to keep it if still like to help on investigations (or close if you like)

pi0 avatar Apr 23 '25 13:04 pi0