sse message not parsing properly [hono]
Environment
latest node latest hono latest crossws
Reproduction
a new hono app
sseAdapter({
bidir: true, // Enable bidirectional messaging support
hooks: {
upgrade: (request) => {
// Extract user context from request headers
let userContext: UserContext | undefined;
try {
const userContextHeader = request.headers.get('userContext');
// console.log('sse request context', request.context);
// console.log('sse request header', request.headers);
if (userContextHeader) {
console.log('userContextHeader', userContextHeader);
userContext = JSON.parse(userContextHeader);
}
if (!userContext) {
// return new Response('Authentication required', { status: 401 });
userContext = {
userId: '1',
role: 'user',
labels: [],
teams: [],
permissions: [],
};
}
} catch (error) {
console.error('Error parsing user context:', error);
return new Response('Invalid user context', { status: 400 });
}
request.context.userContext = userContext;
// In case of bidirectional mode, extra auth is recommended based on request
// return {
// headers: {},
// };
},
open: (peer) => {
// Send welcome message
peer.send(
JSON.stringify({
type: 'welcome',
message: `Welcome ${peer.id}`,
})
);
},
message: async (peer, message) => {
console.log('Received message on sse:', message.text());
try {
// Read the message content as text first, then parse as JSON
const messageText = await message.text();
console.log('Received message text:', messageText); // Log the raw text
const msg = JSON.parse(messageText) as ClientMessage;
} catch (error) {
console.error('Error handling message:', error);
peer.send(
JSON.stringify({
type: 'error',
message: 'Invalid message format',
})
);
}
},
close: (peer) => {
// No need to manually clean up subscriptions
// crossws handles this automatically
console.log(`Client ${peer.id} disconnected`);
},
},
});
public async handleRequest(request: Request): Promise<Response> {
// Check if the SSE adapter is initialized
if (!this.sseAdapter) {
console.warn(
'SSE adapter not initialized yet. Waiting for initialization...'
);
await this.initialize();
}
// Check if this is an SSE request
if (
request.headers.get('accept') === 'text/event-stream' ||
request.headers.has('x-crossws-id')
) {
return this.sseAdapter.fetch(request);
}
// Return 404 for non-SSE requests
return new Response('Not found', { status: 404 });
}
hono side
app.get(`${prefix}/sse`, async (c) => {
const request = c.req.raw;
// The handleRequest from the sseAdapter handles the underlying request/response
// User context is typically handled during the 'upgrade' hook within the SSEManager
// by reading headers, not passed directly here.
const response = await realtimeAdapter.handleRequest(request);
return response;
});
app.post(`${prefix}/sse`, async (c) => {
const request = c.req.raw;
// The handleRequest from the sseAdapter handles the underlying request/response
// User context is typically handled during the 'upgrade' hook within the SSEManager
// by reading headers, not passed directly here.
const response = await realtimeAdapter.handleRequest(request);
return response;
});
Describe the bug
sending a message from the client causes this error
this.sseClient.addEventListener('open', async () => {
console.log('Connected to SSE server');
// Subscribe to a table
// Send the user context with the subscription request
await this.sseClient.send(
JSON.stringify({
type: 'subscribe',
tableName: 'posts',
})
);
console.log('Sent subscription request');
});
Additional context
No response
Logs
Received message on sse: [object ReadableStream]
Received message text: [object ReadableStream]
Error handling message: SyntaxError: Unexpected token 'o', "[object Rea"... is not valid JSON
at JSON.parse (<anonymous>)
at message (/Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/database/src/websocket/SSEManager.ts:105:30)
at processTicksAndRejections (node:internal/process/task_queues:105:5)
more try still not working
let messageData: string;
console.log('message', typeof message);
console.log('message event', message.event);
console.log('message peer', message.peer);
console.log('message data', message.data);
console.log('message blob', message.blob);
console.log('message array', message.uint8Array().toString());
console.log('message array buffer', message.arrayBuffer());
if (message instanceof ReadableStream) {
const reader = message.getReader();
const { value } = await reader.read();
messageData = new TextDecoder().decode(value);
} else {
messageData = message.toString();
}
console.log('Received message:', messageData);
const msg = JSON.parse(messageData) as ClientMessage;
logs
message object message event undefined message peer { id: '02dc70ad-bc95-4468-9f67-8fa6bc8c4f02', peers: <ref *1> Set(1) { { id: '02dc70ad-bc95-4468-9f67-8fa6bc8c4f02', peers: [Circular *1], webSocket: [SSEWebSocketStub] } }, webSocket: SSEWebSocketStub { readyState: 1 } } message data [object ReadableStream] message blob [Function: blob] message array 91,111,98,106,101,99,116,32,82,101,97,100,97,98,108,101,83,116,114,101,97,109,93 message array buffer ArrayBuffer { [Uint8Contents]: <5b 6f 62 6a 65 63 74 20 52 65 61 64 61 62 6c 65 53 74 72 65 61 6d 5d>, byteLength: 23 } Received message: [object ReadableStream] Error handling message: SyntaxError: Unexpected token 'o', "[object Rea"... is not valid JSON at JSON.parse (<anonymous>) at message (/Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/database/src/websocket/SSEManager.ts:120:30) at AdapterHookable.callHook (file:///Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/crossws/dist/shared/crossws.D9ehKjSh.mjs:8:39) at Object.fetch (file:///Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/crossws/dist/adapters/sse.mjs:26:19) at processTicksAndRejections (node:internal/process/task_queues:105:5) at /Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/libs/api/src/core/hono/forge-api.factory.ts:68:24 at dispatch (/Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/compose.js:52:17) at /Users/rou/Desktop/bass/forgebase-ts/dist/apps/hono-test/webpack:/src/main.ts:56:3 at dispatch (/Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/compose.js:52:17) at /Users/rou/Desktop/bass/forgebase-ts/node_modules/.pnpm/[email protected]/node_modules/hono/dist/cjs/hono-base.js:217:25
i'm not sure if hono is doing something extra. can you reproduce it with base minimum Node.js + crossws?
i'm not sure if hono is doing something extra. can you reproduce it with base minimum Node.js + crossws?
I will do that today and get back
disabling stream makes it works with hono
sseClient = new WebSocketSSE('http://localhost:3000/api/sse', {
bidir: true,
stream: false,
});
am yet to try with node directly
@pi0 after more digging it seems hono is doing something to the body object
I just tried using it with nestjs, the stream works without any issue
Thanks for confirming, i hope to find a fix but AFAIK, hono patches globals and it probably not be easy path to support hono. sorry :(
Thanks for confirming, i hope to find a fix but AFAIK, hono patches globals and it probably not be easy path to support hono. sorry :(
Am currently also finding a simple hack, I will update this issue once I find one
reading the stream directly from hono before passing it to crossws is not also working
should I close this issue or we leave it open till it is fixed
feel free to keep it if still like to help on investigations (or close if you like)