ai
ai copied to clipboard
Stream never closes with LangChainStream using postman
Have a few issues with LangChainStream:
- Appears stream never closes when API is called
- Tokens are not getting set or sent down stream, but are coming through when adding callback functions to LangChainStream
Example code:
import { ChatAnthropic } from "langchain/chat_models/anthropic";
import { ChatOpenAI } from 'langchain/chat_models/openai';
import { BaseChatMessage } from "langchain/dist/schema";
import {
ChatPromptTemplate,
HumanMessagePromptTemplate,
PromptTemplate,
SystemMessagePromptTemplate,
} from "langchain/prompts";
import { StreamingTextResponse, LangChainStream } from "ai";
import { CallbackManager } from "langchain/callbacks";
export const runtime = "edge";
if (!process.env.ANTHROPIC_API_KEY) {
throw new Error("Missing env var from Anthropic");
}
const createSummaryPrompt = async (
persona: string,
preprocessed_data: string
): Promise<BaseChatMessage[]> => {
const chatPrompt = PromptTemplate.fromTemplate(
`This is your title: {title}. blah blah blah. \User Data: {userData}.`
);
const formattedPrompt = await chatPrompt.formatPromptValue({
persona,
transcriptData: JSON.stringify(preprocessed_data),
});
return formattedPrompt.toChatMessages();
};
export const POST = async (req: Request) => {
const { title, user_data } =
(await req.json()) as { title?: string; user_data: string };
if (!preprocessed_data) {
return new Response("No transcript in the request", { status: 400 });
}
const { stream, handlers } = LangChainStream({
onStart: async () => console.log("start"),
onToken: async (token) => console.log(token),
onCompletion: async () => console.log("end"),
});
const llm = new ChatAnthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
modelName: "claude-v1.3-100k",
temperature: 0.1,
maxTokensToSample: 100000,
streaming: true,
verbose: true,
callbacks: [handlers],
});
const prompt = await createSummaryPrompt(persona, preprocessed_data);
llm
.call(prompt)
.then((res) => console.log(stream))
.catch(console.error).finally(() => console.log("done"));
console.log(stream)
return new StreamingTextResponse(stream);
};
Example cURL:
curl --location 'http://localhost:3000/api/generate-summary' \
--header 'Content-Type: text/plain' \
--header 'Cookie: <secret>' \
--data '{"user_data":"blah blah blah","title":"sales chief"}'
Postman screenshot - Connection stays open and never ends. No data returns or streams.
Console output:
start
<ref *1> ReadableStream {
_state: 'readable',
_reader: undefined,
_storedError: undefined,
_disturbed: false,
_readableStreamController: ReadableStreamDefaultController {
_controlledReadableStream: [Circular *1],
_queue: S {
_cursor: 0,
_size: 0,
_front: { _elements: [], _next: undefined },
_back: { _elements: [], _next: undefined }
},
_queueTotalSize: 0,
_started: true,
_closeRequested: false,
_pullAgain: false,
_pulling: false,
_strategySizeAlgorithm: [Function],
_strategyHWM: 0,
_pullAlgorithm: [Function],
_cancelAlgorithm: [Function]
}
}
[llm/start] [1:llm:ChatAnthropic] Entering LLM run with input: {
"messages": [
[
{
"type": "human",
"data": {
"content": "redacted",
"additional_kwargs": {}
}
}
]
]
}
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
token!
[llm/end] [1:llm:ChatAnthropic] [7.56s] Exiting LLM run with output: {
"generations": [
[
{
"text": "redacted",
"message": {
"type": "ai",
"data": {
"content": " redacted",
"additional_kwargs": {}
}
}
}
]
]
}
<ref *1> ReadableStream {
_state: 'readable',
_reader: ReadableStreamDefaultReader {
_ownerReadableStream: [Circular *1],
_closedPromise_resolve: [Function],
_closedPromise_reject: [Function],
_closedPromise: Promise {
[Symbol(async_id_symbol)]: 1974,
[Symbol(trigger_async_id_symbol)]: 0,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined
},
_readRequests: S {
_cursor: 77,
_size: 1,
_front: {
_elements: [
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
_chunkSteps: [Function: _chunkSteps],
_closeSteps: [Function: _closeSteps],
_errorSteps: [Function: _errorSteps]
}
],
_next: undefined
},
_back: {
_elements: [
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
_chunkSteps: [Function: _chunkSteps],
_closeSteps: [Function: _closeSteps],
_errorSteps: [Function: _errorSteps]
}
],
_next: undefined
}
}
},
_storedError: undefined,
_disturbed: true,
_readableStreamController: ReadableStreamDefaultController {
_controlledReadableStream: [Circular *1],
_queue: S {
_cursor: 0,
_size: 0,
_front: { _elements: [], _next: undefined },
_back: { _elements: [], _next: undefined }
},
_queueTotalSize: 0,
_started: true,
_closeRequested: false,
_pullAgain: false,
_pulling: true,
_strategySizeAlgorithm: [Function],
_strategyHWM: 0,
_pullAlgorithm: [Function],
_cancelAlgorithm: [Function]
}
}
done
token!
token!
token!
token!
token!
token!
done!!!
token!
token!
token!
token!
token!
token!
token!
[llm/end] [1:llm:ChatAnthropic] [8.54s] Exiting LLM run with output: {
"generations": [
[
{
"text": " redacted",
"message": {
"type": "ai",
"data": {
"content": " redactedl",
"additional_kwargs": {}
}
}
}
]
]
}
<ref *1> ReadableStream {
_state: 'readable',
_reader: ReadableStreamDefaultReader {
_ownerReadableStream: [Circular *1],
_closedPromise_resolve: [Function],
_closedPromise_reject: [Function],
_closedPromise: Promise {
[Symbol(async_id_symbol)]: 2452,
[Symbol(trigger_async_id_symbol)]: 2239,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined,
[Symbol(kResourceStore)]: undefined
},
_readRequests: S {
_cursor: 100,
_size: 1,
_front: {
_elements: [
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
_chunkSteps: [Function: _chunkSteps],
_closeSteps: [Function: _closeSteps],
_errorSteps: [Function: _errorSteps]
}
],
_next: undefined
},
_back: {
_elements: [
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
_chunkSteps: [Function: _chunkSteps],
_closeSteps: [Function: _closeSteps],
_errorSteps: [Function: _errorSteps]
}
],
_next: undefined
}
}
},
_storedError: undefined,
_disturbed: true,
_readableStreamController: ReadableStreamDefaultController {
_controlledReadableStream: [Circular *1],
_queue: S {
_cursor: 0,
_size: 0,
_front: { _elements: [], _next: undefined },
_back: { _elements: [], _next: undefined }
},
_queueTotalSize: 0,
_started: true,
_closeRequested: false,
_pullAgain: false,
_pulling: true,
_strategySizeAlgorithm: [Function],
_strategyHWM: 0,
_pullAlgorithm: [Function],
_cancelAlgorithm: [Function]
}
}
done
Something isn't connecting here I guess if we're getting an array of undefined. Any ideas where I'm going wrong here? Any help much appreciated!
( additional side note: Would be so cool to have a useStream() for features that are not chat and are not completions! e.g. one off API calls formed from app data like mine. I am passing things to useChat({body: {title:, user_data}}) )
Had the same issue and the workaround I used was to just overwrite the callback after llm.call
@Itsnotaka Could I trouble you to share an example?
I noticed the same issue with the example.
One workaround to get to work is add an wrappedCall
const { messages } = await req.json()
const { stream, handlers } = LangChainStream({
onStart: async () => console.log('start'),
onToken: async token => console.log(token),
onCompletion: async () => console.log('end')
})
const llm = new ChatOpenAI({
streaming: true,
callbackManager: CallbackManager.fromHandlers(handlers)
})
async function wrappedCall(messages: Message[], onCompletion: () => void) {
try {
await llm.call(
messages.map(m =>
m.role == 'user'
? new HumanChatMessage(m.content)
: new AIChatMessage(m.content)
)
)
} catch (error) {
console.error(error)
} finally {
onCompletion()
}
}
wrappedCall(messages as Message[], () => {
console.log('end')
})
return new StreamingTextResponse(stream)
The issue might be within the models in the langchain library to expose a completion event or callback.
I can confirm Langchain does call handleLLMEnd, see the screenshot attached, so it must be an issue with something in this vercel library
Here is another approach to trigger onCompletion with no wrapper
import { StreamingTextResponse, LangChainStream, Message } from 'ai'
import { CallbackManager } from 'langchain/callbacks'
import { ChatOpenAI } from 'langchain/chat_models/openai'
import { AIChatMessage, HumanChatMessage } from 'langchain/schema'
export const runtime = 'edge'
export async function POST(req: Request) {
const { messages } = await req.json()
const { stream, handlers } = LangChainStream({
onStart: async () => console.log('start'),
// onToken: async token => console.log(token),
onCompletion: async () => console.log('end')
})
const llm = new ChatOpenAI({
streaming: true,
callbackManager: CallbackManager.fromHandlers(handlers)
})
llm
.call(
(messages as Message[]).map(m =>
m.role == 'user'
? new HumanChatMessage(m.content)
: new AIChatMessage(m.content)
)
)
.catch(console.error)
.finally(() => {
// Call handleStreamEnd when the chat or stream ends
handlers.handleChainEnd()
})
return new StreamingTextResponse(stream)
}
Have had the same issue which makes onCompletion not work but also the onLoading in the UI.
Thanks e-roy.
.finally(() => {
// Call handleStreamEnd when the chat or stream ends
handlers.handleChainEnd()
})
Is a good workaround 🙂
@e-roy thanks for the snippet, also solved my problem 👏
another solution:
new ChatOpenAI({
temperature: 0,
modelName: 'gpt-3.5-turbo',
maxTokens: 512, // Choose the max allowed tokens in completion
streaming: true,
callbackManager: CallbackManager.fromHandlers({
async handleLLMNewToken(token: string) {
// console.log({ token })
handlers.handleLLMNewToken(token)
},
async handleLLMEnd(output) {
console.log('End of stream...', output)
handlers.handleChainEnd(output)
},
async handleLLMError(e) {
console.log('Error in stream...', e)
handlers.handleLLMError(e)
},
}),
})
This also causes isLoading from useChat() to never go back to false when using LangChainStream.
However, e-roy's fix seems to work.
I think the issue is caused by https://github.com/vercel-labs/ai/commit/be90740a03cae91609cae075c232c7460239f64c
When you do what @nfcampos suggested overwriting the returned handlers
const { stream, handlers } = LangChainStream();
const model = new ChatOpenAI({
temperature: 0,
streaming: true,
callbacks: CallbackManager.fromHandlers({
handleLLMNewToken: handlers.handleLLMNewToken,
handleChainEnd: async () => {
console.log("handleChainEnd");
await handlers.handleChainEnd();
},
handleLLMEnd: async () => {
console.log("handleLLMEnd");
await handlers.handleChainEnd();
},
handleLLMError: handlers.handleLLMError,
}),
});
handleChainEnd never ends up in showing in the logs. (For my use case, using ConversationalRetrievalQAChain)
It seems that depending on which chain/model you use, handleChainEnd is not guaranteed to be called.
It looks like adding handleLLMEnd (keeping handleChainEnd) should fix this issue, but I'm not sure what issue @jaredpalmer might have been fixing when switching handleLLMEnd -> handleChainEnd
@aranlucas the issue is actually that you're passing handlers for chain events to the constructor of a chat model, therefore they will never be called. You should pass the handlers either to the constructor of the chain you're using or to the .call() method
Yeah took me a while to get to that conclusion in https://github.com/vercel-labs/ai/issues/205#issuecomment-1603269455.