Strange buffering when using toReadableStream()
Confirm this is a Node library issue and not an underlying OpenAI API issue
- [x] This is an issue with the Node library
Describe the bug
Hi people, I want to show you an interesting issue I'm having using openai-node on a Remix project.
High level, the server part makes the request to OpenAI api and returned stream is embedded in the response. The client side which made the request using fetch parse the streamed response.
Speaking of code, the server part is this:
const stream = openai.beta.threads.runs
.stream(threadId, {
assistant_id: assistantId,
include: ['step_details.tool_calls[*].file_search.results[*].content'],
})
.on('messageDelta', (message) => {
console.log('messageDelta', JSON.stringify(message));
})
.toReadableStream();
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
Thread: threadId,
},
});
The client part is
const response = await fetch(localizedChatAPIURL, {
method: 'POST',
body: thePayload,
});
const reader = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineDecoderStream())
.pipeThrough<AssistantStreamEvent>(new JSONDecoderStream())
.getReader();
The strange issue is that messages appears to be different from server to client.
The output of that console.log on messageDelta, server side is:
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"E","annotations":[]}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"x"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":","}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" come"}}]}
While the network inspector on the browser side logs these messages:
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Eddy Merckx, noto","annotations":[]}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"x"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":","}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" come"}}]}}}
The issue here is that this inconsistency is causing malformations of the first sentences. The example is in italian, I'm sorry, but the beginning of that sentence should be "Eddy Merkx , noto come". Instead, the client part prints "Eddy Merckx, notoddy Merckx, noto come".
I noticed the first message on the client side is longer than the first message on the server side so I'm suspecting some buffering happening somewhere.
Do you people have already encounter this issue? Do you have feedbacks about how to handle this and avoid the malformations?
Thank you
To Reproduce
Unclear steps to reproduce at the moment. Try creating a thread run as a stream, pipe it as a http response and consume it from a browser client.
Code snippets
OS
macOS
Node version
20.13.1
Library version
4.83.0
Could this be related to the default highWaterMark for streams?
- https://dev.to/andersonjoseph/understanding-highwatermark-in-nodejs-streams-4fmb
- https://nodejs.org/en/learn/modules/backpressuring-in-streams#how-does-backpressure-resolve-these-issues
Hi @spaceemotion not sure but could be. For sure there's a transformation somewhere, it's not just a piping. Or some buffer in the middle not cleared properly.
These messages
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"E","annotations":[]}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":"x"}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":","}}]}
messageDelta {"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}
come like so to the browser:
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"Eddy Merckx, noto","annotations":[]}}]}}}
but from the second message onwards they're also sent like they were in original
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ddy"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" Mer"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"ck"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":"x"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":","}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" noto"}}]}}}
{"event":"thread.message.delta","data":{"id":"msg_lb6hMW0y8YccR8wnkPtfkScL","object":"thread.message.delta","delta":{"content":[{"index":0,"type":"text","text":{"value":" come"}}]}}}
I'd say it's something in the middle from the stream creation openai.beta.threads.runs.stream() to the toReadableStream() call.
Hi, I'm having a similar problem but in a different context. Basically, this code has the exact same problem:
const run = this.openai.beta.threads.runs.stream(this.openAiThread.id, {
assistant_id: this.assistant.id,
});
for await (const event of this.assistantStream) {
console.log(event);
}
The problem does not happen if the events are handled with ".on":
const run = this.openai.beta.threads.runs.stream(this.openAiThread.id, {
assistant_id: this.assistant.id,
}).on('event', (event) => console.log(event))
Or if awaiting for ".create" method instead:
const run = await this.openai.beta.threads.runs.create(this.openAiThread.id, {
assistant_id: this.assistant.id,
});
for await (const event of this.assistantStream) {
console.log(event);
}
I was trying to replicate this in the simplest way possible but didn't manage to, here is my backend.
import OpenAI from "openai";
import { createServer } from 'http';
const openai = new OpenAI();
async function runRepro() {
const assistant = await openai.beta.assistants.create({
model: 'gpt-4o',
instructions: 'Your are a math answer generator, you receive and equation and simply return the result with no explanation',
});
const thread = await openai.beta.threads.create({
messages: [
{ role: 'user', content: '"I need to solve the equation `3x + 11 = 14`."' },
],
});
const stream = openai.beta.threads.runs
.stream(thread.id, {
assistant_id: assistant.id,
})
return stream.toReadableStream();
}
const server = createServer(async (req, res) => {
res.writeHead(200, {
'Access-Control-Allow-Origin': '*',
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive'
})
const stream = await runRepro();
for await (const value of stream.values()) {
console.log(value);
res.write(value);
}
res.end();
})
server.listen(3000, () => console.log('server listening on port: 3000'));
and this is the HTML
<!DOCTYPE html>
<html>
<head>
<meta charset='utf-8'>
<title>Stream test</title>
<meta name='viewport' content='width=device-width, initial-scale=1'>
</head>
<body>
<div style="display: flex; gap: 10px; justify-content: center; align-items: center;">
<h3>Status</h3><p id="status" style="background-color: aqua; padding: 10px 20px;">not started</p>
</div>
<div style="display: flex; flex-direction: column; gap: 2px; justify-content: center; align-items: center;">
<h3 style="margin-bottom: 0;">Result</h3><p id="result" style="padding: 10px 20px; border: 1px solid green; border-radius: 5px; margin-top: 0;"></p>
</div>
<div style="display: flex; flex-direction: column; gap: 10px; justify-content: center; align-items: center;">
<h3>Processing Content</h3><div id="content" style="max-height: 60vh; max-width: 80vw; flex-wrap: nowrap; overflow: auto; padding: 10px 20px; border: solid 1px; border-radius: 5px; display: flex; flex-direction: column; text-align: center;"></div>
</div>
</body>
<script>
const url = 'http://localhost:3000';
async function main() {
const content = document.querySelector('#content');
const status = document.querySelector('#status');
const resultP = document.querySelector('#result');
const response = await fetch(url);
status.textContent = 'Started';
const reader = response.body
.pipeThrough(new TextDecoderStream())
for await(const value of reader.values()) {
const json = JSON.parse(value);
const ruler = document.createElement("hr");
const event = document.createElement("h3");
event.textContent = json.event;
const contentP = document.createElement("p")
contentP.textContent = value;
if(json.event === 'thread.message.delta') {
resultP.textContent += json.data.delta.content[0].text.value;
}
content.append (ruler, event, contentP, ruler)
await new Promise((resolve) => setTimeout(resolve, 500));
}
status.textContent = 'Finished'
}
main();
</script>
</html>
if you copy and paste those you will see it working just fine, did I miss something?