openai-node
openai-node copied to clipboard
How to use stream: true?
I'm a bit lost as to how to actually use stream: true
in this library.
Example incorrect syntax:
const res = await openai.createCompletion({
model: "text-davinci-002",
prompt: "Say this is a test",
max_tokens: 6,
temperature: 0,
stream: true,
});
res.onmessage = (event) => {
console.log(event.data);
}
Unfortunately streaming is not currently supported by this library ๐ข
I'm not sure if the SDK auto-generation tool we use (openai-generator) is able to support event streams. Will have to do more research.
The python openai package does support it: https://pypi.org/project/openai/
If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!
If anyone knows of a good way to consume server-sent events in Node (that also supports POST requests), please share!
This can be done with the request method of Node's https API. You can create a request with the options you want (such as POST as a method) and then read the streamed data using the data
event on the response. You can also use the close
event to know when the request has finished.
Thanks @keraf, we'll try to look into getting this working soon.
You can use axios stream response type. But you still need to parse the returned data.
const res = await openai.createCompletion({
model: "text-davinci-002",
prompt: "Say this is a test",
max_tokens: 6,
temperature: 0,
stream: true,
}, { responseType: 'stream' });
res.on('data', console.log)
Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?
Thanks! @smervs currently getting: Property 'on' does not exist on type 'AxiosResponse<CreateCompletionResponse, any>' when trying though - have you had any luck?
can you try this?
res.data.on('data', console.log)
@smervs your code is working for me, but it logs as
<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>
Do you know how to parse this response?
@smervs your code is working for me, but it logs as
<Buffer 64 61 74 61 3a 20 7b 22 69 64 22 3a 20 22 63 6d 70 6c 2d 36 4a 6e 56 35 4d 70 4d 41 44 4f 41 61 56 74 50 64 30 56 50 72 45 42 4f 62 34 48 54 6c 22 2c ... 155 more bytes>
Do you know how to parse this response?
here
res.data.on('data', data => console.log(data.toString()))
This format still waits and gives you the entire response at the end though no? Is there not a way to get the results as they stream back as per the OpenAI frontend?
I second this, streaming experience is currently not good and only seems to return all chunks in bulk instead of as they come in.
This is especially problematic with large responses, where it takes a long time to finish - a much better user experience would be to show early tokens as they come in - really just being able to match Playground UX.
A pure HTTP example using request / curl would also be fine for now, would be happy to create a higher level utility function once I see a working example
I solved it using the inbuilt node http / https module:
const prompt = "Sample prompt. What's 2+2?"
const req = https.request({
hostname:"api.openai.com",
port:443,
path:"/v1/completions",
method:"POST",
headers:{
"Content-Type":"application/json",
"Authorization":"Bearer "+ KEY_API
}
}, function(res){
res.on('data', (chunk) => {
console.log("BODY: "+chunk);
});
res.on('end', () => {
console.log('No more data in response.');
});
})
const body = JSON.stringify({
model:"text-davinci-003",
prompt:prompt,
temperature:0.6,
max_tokens:512,
top_p:1.0,
frequency_penalty:0.5,
presence_penalty:0.7,
stream:true
})
req.on('error', (e) => {
console.error("problem with request:"+e.message);
});
req.write(body)
req.end()
java okHttpClient
BufferedSource source = response.body().source(); Buffer buffer = new Buffer(); StringBuilder result = new StringBuilder(); while (!source.exhausted()) { long count = response.body().source().read(buffer, 8192); // handle data in buffer. String r = buffer.readUtf8(); log.info("result:" + r); result.append(r); buffer.clear(); }
result eg ๏ผ (้ๅธธๅค็่ฟๆ ท็ๆฐๆฎ) data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\u672f", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"} data: {"id": "cmpl-xxxx", "object": "text_completion", "created": 1672230176, "choices": [{"text": "\uff1a", "index": 0, "logprobs": null, "finish_reason": null}], "model": "text-davinci-003"}
Yes I also found this strange, sometimes the OpenAI API returns multiple segments of
data: {}
that are not comma seperated and hence hard to parse as JSON
What I did:
string replace all "data: {" with ", {" instead of the first occurence (there just use "{")
then it can be parsed via JSON.parse, and one can extract all the text parts via .choices[0].text
In my use case streams is more useful for the request data though, so that you can concatenate the results from different requests.
There is no dependency here apart from dotenv
.
This is for the response anyways. Uses fetch
which is now built into node v19 (and prev. versions using experimental flags)
See code
import * as dotenv from 'dotenv';
// I just used a story as a string with backticks
import { text } from './string.mjs';
dotenv.config();
const apiUrl = 'https://api.openai.com/v1/completions';
const apiKey = process.env.OPENAI_API_KEY;
const fetchOptions = {
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: 'text-davinci-003',
//queues the model to return a summary, works fine.
prompt: `Full Text: ${text}
Summary:`,
temperature: 0,
max_tokens: 1000,
presence_penalty: 0.0,
stream: true,
// stop: ['\n'],
}),
};
fetch(apiUrl, fetchOptions).then(async (response) => {
const r = response.body;
if (!r) throw new Error('No response body');
const d = new TextDecoder('utf8');
const reader = await r.getReader();
let fullText = ''
while (true) {
const { value, done } = await reader.read();
if (done) {
console.log('done');
break;
} else {
const decodedString = d.decode(value);
console.log(decodedString);
try {
//fixes string not json-parseable otherwise
fullText += JSON.parse(decodedString.slice(6)).choices[0].text;
} catch (e) {
// the last line is data: [DONE] which is not parseable either, so we catch that.
console.log(
e, '\n\n\n\n'
'But parsed string is below\n\n\n\n',
);
console.log(fullText);
}
}
}
});
Also simplest code without any library:
See code
/* eslint-disable camelcase */
import * as dotenv from 'dotenv';
import { text } from './string.mjs';
//populates `process.env` with .env variables
dotenv.config();
const apiUrl = 'https://api.openai.com/v1/completions';
const apiKey = process.env.OPENAI_API_KEY;
const fetchOptions = {
method: 'POST',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: 'text-davinci-003',
prompt: `Full Text: ${text}
Summary:`,
temperature: 0,
max_tokens: 1000,
presence_penalty: 0.0,
// stream: true,
// stop: ['\n'],
}),
};
fetch(apiUrl, fetchOptions).then(async (response) => {
const r = await response.json();
console.log(r);
});
Many thanks for this very insightful discussion ๐
As a side note, it looks like that one could consume Server-Sent Events in Node and at the same supports POST requests (even if it is not spec compliant given that only GET requests should be allowed) cc @schnerd :
โข @microsoft/fetch-event-source
However, it appears that we would lose all the benefits of SDK auto-generation tool. Moreover, it seems that the only TS generator supporting stream at the time of writing is the axios one (typescript-fetch doesnโt expose a method to consume the body as stream).
Hence, @smervs' answer is perfectly valid and should be the accepted one. However, we could enhance it, especially regarding the parser because a few options exist. By example, if we take the one from a customized @microsoft/fetch-event-source (note : the package has been specially retrofitted for the purpose by exporting ./parse
), here is the result :
http://www.github.com/gfortaine/fortbot
import { Configuration, OpenAIApi } from "openai";
import * as parse from "@fortaine/fetch-event-source/parse";
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
const prompt = "Hello world";
// https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them
const max_tokens = 4097 - prompt.length;
const completion = await openai.createCompletion(
{
model: "text-davinci-003",
max_tokens,
prompt,
stream: true,
},
{ responseType: "stream" }
);
completion.data.on(
"data",
parse.getLines(
parse.getMessages((event) => {
const { data } = event;
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (data === "[DONE]") {
process.stdout.write("\n");
return;
}
const { text } = JSON.parse(data).choices[0];
process.stdout.write(text);
})
)
);
@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST ๐
Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the openai-node
package. Here's a more complete example with proper error handling and no extra dependencies:
try {
const res = await openai.createCompletion({
model: "text-davinci-002",
prompt: "It was the best of times",
max_tokens: 100,
temperature: 0,
stream: true,
}, { responseType: 'stream' });
res.data.on('data', data => {
const lines = data.toString().split('\n').filter(line => line.trim() !== '');
for (const line of lines) {
const message = line.replace(/^data: /, '');
if (message === '[DONE]') {
return; // Stream finished
}
try {
const parsed = JSON.parse(message);
console.log(parsed.choices[0].text);
} catch(error) {
console.error('Could not JSON parse stream message', message, error);
}
}
});
} catch (error) {
if (error.response?.status) {
console.error(error.response.status, error.message);
error.response.data.on('data', data => {
const message = data.toString();
try {
const parsed = JSON.parse(message);
console.error('An error occurred during OpenAI request: ', parsed);
} catch(error) {
console.error('An error occurred during OpenAI request: ', message);
}
});
} else {
console.error('An error occurred during OpenAI request', error);
}
}
This could probably be refactored into a streamCompletion
helper function (that uses either callbacks or es6 generators to emit new messages).
Apologies there's not an easier way to do this within the SDK itself โ the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.
@schnerd Please find a PR : https://github.com/openai/openai-node/pull/45, as well as an updated example. Comments are welcome ๐ :
http://www.github.com/gfortaine/fortbot
import { Configuration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";
const configuration = new Configuration({
apiKey: process.env.OPENAI_API_KEY,
});
const openai = new OpenAIApi(configuration);
try {
const completion = await openai.createCompletion(
{
model: "text-davinci-003",
max_tokens: 100,
prompt: "It was the best of times",
stream: true,
},
{ responseType: "stream" }
);
for await (const message of streamCompletion(completion.data)) {
try {
const parsed = JSON.parse(message);
const {ย text } = parsed.choices[0];
process.stdout.write(text);
} catch (error) {
console.error("Could not JSON parse stream message", message, error);
}
}
process.stdout.write("\n");
} catch (error) {
if (error.response?.status) {
console.error(error.response.status, error.message);
for await (const data of error.response.data) {
const message = data.toString();
try {
const parsed = JSON.parse(message);
console.error("An error occurred during OpenAI request: ", parsed);
} catch (error) {
console.error("An error occurred during OpenAI request: ", message);
}
}
} else {
console.error("An error occurred during OpenAI request", error);
}
}
@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST ๐
Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the
openai-node
package. Here's a more complete example with proper error handling and no extra dependencies:try { const res = await openai.createCompletion({ model: "text-davinci-002", prompt: "It was the best of times", max_tokens: 100, temperature: 0, stream: true, }, { responseType: 'stream' }); res.data.on('data', data => { const lines = data.toString().split('\n').filter(line => line.trim() !== ''); for (const line of lines) { const message = line.replace(/^data: /, ''); if (message === '[DONE]') { return; // Stream finished } try { const parsed = JSON.parse(message); console.log(parsed.choices[0].text); } catch(error) { console.error('Could not JSON parse stream message', message, error); } } }); } catch (error) { if (error.response?.status) { console.error(error.response.status, error.message); error.response.data.on('data', data => { const message = data.toString(); try { const parsed = JSON.parse(message); console.error('An error occurred during OpenAI request: ', parsed); } catch(error) { console.error('An error occurred during OpenAI request: ', message); } }); } else { console.error('An error occurred during OpenAI request', error); } }
This could probably be refactored into a
streamCompletion
helper function (that uses either callbacks or es6 generators to emit new messages).Apologies there's not an easier way to do this within the SDK itself โ the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.
@schnerd Here it is (streamCompletion
helper function code inspired by this snippet, courtesy of @rauschma) ๐ :
// https://2ality.com/2018/04/async-iter-nodejs.html#generator-%231%3A-from-chunks-to-lines
async function* chunksToLines(chunksAsync) {
let previous = "";
for await (const chunk of chunksAsync) {
const bufferChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
previous += bufferChunk;
let eolIndex;
while ((eolIndex = previous.indexOf("\n")) >= 0) {
// line includes the EOL
const line = previous.slice(0, eolIndex + 1).trimEnd();
if (line === "data: [DONE]") break;
if (line.startsWith("data: ")) yield line;
previous = previous.slice(eolIndex + 1);
}
}
}
async function* linesToMessages(linesAsync) {
for await (const line of linesAsync) {
const message = line.substring("data :".length);
yield message;
}
}
async function* streamCompletion(data) {
yield* linesToMessages(chunksToLines(data));
}
try {
const completion = await openai.createCompletion(
{
model: "text-davinci-003",
max_tokens: 100,
prompt: "It was the best of times",
stream: true,
},
{ responseType: "stream" }
);
for await (const message of streamCompletion(completion.data)) {
try {
const parsed = JSON.parse(message);
const { text } = parsed.choices[0];
process.stdout.write(text);
} catch (error) {
console.error("Could not JSON parse stream message", message, error);
}
}
process.stdout.write("\n");
} catch (error) {
if (error.response?.status) {
console.error(error.response.status, error.message);
for await (const data of error.response.data) {
const message = data.toString();
try {
const parsed = JSON.parse(message);
console.error("An error occurred during OpenAI request: ", parsed);
} catch (error) {
console.error("An error occurred during OpenAI request: ", message);
}
}
} else {
console.error("An error occurred during OpenAI request", error);
}
}
@gfortaine This solution works great with next.js API endpoints running on localhost. But once you deploy to Vercel, streaming responses via serverless functions are prohibited by AWS Lambda. You can get around this limitation by switching to next.js' experimental new Edge runtime, but then as far as I can tell that doesn't work with axios... which your solution relies on. So I still haven't found a way to actually stream openAI responses via next.js in production. Any ideas?
@gfortaine Have got it working using fetch directly instead of the openAI lib but I believe there's a bug with chunksToLine. It appears to assume that chunks will be >= 1 line, but chunks can actually be part of a line. @rauschma's original implementation addresses this.
@blakeross do you have any sample code on how you got it to work with next.js and vercel? Wouldn't the lambda finish if you sent a response back to the client?
@gtokman it works if you use Vercel's new Edge runtime functions
@gtokman @blakeross may be useful: https://github.com/dan-kwiat/openai-edge
Here is a fetch
based client fully generated from SDK auto-generation tool ๐ cc @schnerd @santimirandarp @blakeross @gtokman @dan-kwiat : https://github.com/openai/openai-node/pull/45#issuecomment-1371569799
(Bonus : it is wrapped by @vercel/fetch to provide retry (429 Network Error, ...) & DNS caching)
import { createConfiguration, OpenAIApi } from "@fortaine/openai";
import { streamCompletion } from "@fortaine/openai/stream";
import dotenv from "dotenv-flow";
dotenv.config({
node_env: process.env.APP_ENV || process.env.NODE_ENV || "development",
silent: true,
});
const configurationOpts = {
authMethods: {
apiKeyAuth: {
accessToken: process.env.OPENAI_API_KEY,
},
},
};
const configuration = createConfiguration(configurationOpts);
const openai = new OpenAIApi(configuration);
try {
const completion = await openai.createCompletion({
model: "text-davinci-003",
prompt: "1,2,3,",
max_tokens: 193,
temperature: 0,
stream: true,
});
for await (const message of streamCompletion(completion)) {
try {
const parsed = JSON.parse(message);
const { text } = parsed.choices[0];
process.stdout.write(text);
} catch (error) {
console.error("Could not JSON parse stream message", message, error);
}
}
process.stdout.write("\n");
} catch (error) {
if (error.code) {
try {
const parsed = JSON.parse(error.body);
console.error("An error occurred during OpenAI request: ", parsed);
} catch (error) {
console.error("An error occurred during OpenAI request: ", error);
}
} else {
console.error("An error occurred during OpenAI request", error);
}
}
@gfortaine we actually use @microsoft/fetch-event-source for the playground to do streaming with POST ๐
Thank you all for sharing your solutions here! I agree that @smervs solution currently looks like the best option available for the
openai-node
package. Here's a more complete example with proper error handling and no extra dependencies:try { const res = await openai.createCompletion({ model: "text-davinci-002", prompt: "It was the best of times", max_tokens: 100, temperature: 0, stream: true, }, { responseType: 'stream' }); res.data.on('data', data => { const lines = data.toString().split('\n').filter(line => line.trim() !== ''); for (const line of lines) { const message = line.replace(/^data: /, ''); if (message === '[DONE]') { return; // Stream finished } try { const parsed = JSON.parse(message); console.log(parsed.choices[0].text); } catch(error) { console.error('Could not JSON parse stream message', message, error); } } }); } catch (error) { if (error.response?.status) { console.error(error.response.status, error.message); error.response.data.on('data', data => { const message = data.toString(); try { const parsed = JSON.parse(message); console.error('An error occurred during OpenAI request: ', parsed); } catch(error) { console.error('An error occurred during OpenAI request: ', message); } }); } else { console.error('An error occurred during OpenAI request', error); } }
This could probably be refactored into a
streamCompletion
helper function (that uses either callbacks or es6 generators to emit new messages).Apologies there's not an easier way to do this within the SDK itself โ the team will continue evaluating how to get this added natively, despite the lack of support in the current sdk generator tool we're using.
Hi. Thanks for the great code. It works great in straight Node.js but in React it throws a 'res.data.on is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.
Hi everyone.@smervs solution works great with straight Node.js but in React it throws a 'res.data.on() is not a function error. Maybe something to do with Webpack. Any insight would be appreciated. Thanks again.
@shawnswed I am facing the same issue: Property 'on' does not exist on type 'CreateCompletionResponse' ๐ค I assume that we all using "openai": "^3.1.0", I saw the pr from @gfortaine https://github.com/openai/openai-node/pull/45 so hopefully this one will soon be in In the mean time I will try to somehow trick ts to ignore type and try to see if it works anyway. I hope I remember to update you ^^
Thanks, DerBasler. Please keep me in the loop.
Here's a quick and dirty workaround.
Edit: If you are using NextJS, a better solution can be found here https://vercel.com/blog/gpt-3-app-next-js-vercel-edge-functions.
Server-Side:
// Import the Readable stream module
import { Readable } from "stream";
// Set the response headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
// Generate the response using the OpenAI API
const response = await openai.createCompletion({
prompt: "It was the best of times",
stream: true,
...
}, { responseType: 'stream' });
// Convert the response to a Readable stream (this is a temporary workaround)
const stream = response.data as any as Readable;
// Process the data stream
let streamHead = true; // Flag to indicate whether a message begins the stream or is a continuation
stream.on("data", (chunk) => {
try {
// Parse the chunk as a JSON object
const data = JSON.parse(chunk.toString().trim().replace("data: ", ""));
console.log(data);
// Write the text from the response to the output stream
res.write(JSON.stringify({text: data.choices[0].text, streamHead: streamHead}));
streamHead = false;
// Send immediately to allow chunks to be sent as they arrive
res.flush();
} catch (error) {
// End the stream but do not send the error, as this is likely the DONE message from createCompletion
console.error(error);
res.end();
}
});
// Send the end of the stream on stream end
stream.on("end", () => {
res.end();
});
// If an error is received from the completion stream, send an error message and end the response stream
stream.on("error", (error) => {
console.error(error);
res.end(JSON.stringify({ error: true, message: "Error generating response." }));
});
Client-Side:
// Query your endpoint
const res = await fetch('/yourapi/', {...})
// Create a reader for the response body
const reader = res.body.getReader();
// Create a decoder for UTF-8 encoded text
const decoder = new TextDecoder("utf-8");
let result = "";
// Function to read chunks of the response body
const readChunk = async () => {
return reader.read().then(({ value, done }) => {
if (!done) {
const dataString = decoder.decode(value);
const data = JSON.parse(dataString);
console.log(data);
if (data.error) {
console.error("Error while generating content: " + data.message);
} else {
result = data.streamHead ? data.text : result + data.text;
return readChunk();
}
} else {
console.log("done");
}
});
};
await readChunk();
The result
variable is updated as the content arrives.
Thanks for the neat implementation @schnerd
I am using this with the listFineTuneEvents() and getting similar error as reported by @DerBasler :
Property 'on' does not exist on type 'ListFineTuneEventsResponse'.
Currently on "openai": "^3.1.0"