ai
ai copied to clipboard
[AsistantResponse] - to save streaming data to external database
Feature Description
Save streaming thread message data to external DB
Use Case
Is there a way to save thread messages to external database after the completion of streaming via AssistantResponse like how OpenAiStream has a callback for completion?
Additional context
No response
Hi @ayepRahman ,
you can use "sendMessage" function present on assistantresponsecallback to save the current response to your external database
https://sdk.vercel.ai/docs/api-reference/providers/assistant-response#process-assistantresponsecallback
as of now, the documentation on "sendMessage" is not updated yet.
and if you want to get the whole chat history then you can use the threadId to get all the messages https://platform.openai.com/docs/api-reference/messages/listMessages
do u by any chance have any example on using sendMessage()?
opps my bad, sendMessage is something else.
you can use finalMessages() method returned from stream to get the response result.
runStream.finalMessages().then((finalMessages) => {
console.log("finalMessages", finalMessages);
});
hope this works for your usecase, let me know.
@ravvi-kumar do you know how to map the response of openai.beta.threads.messages.list
to the useAssistant
hook's messages
properly?
const { setMessages } = useAssistant({
// ...
})
useEffect(() => {
if (props.messages) {
const mapped = props.messages.map((message) => {
// ...
})
setMessages(mapped)
}
}, [props.message, setMessages])
I just save the string that the model output at the very end of my action.tsx
const result = await experimental_streamText({
model: anthropic('claude-3-opus-20240229'),
maxTokens: 4000,
temperature: 0,
frequencyPenalty: 0.5,
system: systemPromptTemplate,
messages: [
...aiState.get().map((info: any) => ({
role: info.role,
content: info.content,
name: info.name
}))
]
});
let fullResponse = '';
for await (const textDelta of result.textStream) {
fullResponse += textDelta;
uiStream.update(<BotMessage>{fullResponse}</BotMessage>);
}
uiStream.done();
aiState.done([
...aiState.get(),
{
role: 'assistant',
content: fullResponse,
id: uuidv4()
}
]);
saveChatToRedis(
CurrentChatSessionId,
session.id,
currnetUserMessage,
fullResponse,
Array.from(uniqueReferences)
);
})();
return {
id: Date.now(),
display: uiStream.value,
chatId: CurrentChatSessionId
};
}
I have not had any issues with saying them.
In an api route i used the
let partialCompletion = '';
const { stream, handlers } = LangChainStream({
onToken: (token: string) => {
partialCompletion += token;
},
onFinal: (completion) => {
try {
saveChatToRedis(
chatSessionId,
userId,
messages[messages.length - 1].content,
partialCompletion,
Array.from(uniqueReferences)
);
cacheChatCompletionVectorDB(
messages[messages.length - 1].content,
completion,
'cache-chat',
Array.from(uniqueReferences)
);
} catch (error) {
console.error('Error saving chat to database:', error);
}
revalidatePath('/chatai', 'layout');
}
});
the partial on token is if the user stops the chat in teh middle of the streaming response so we always store the latest token in the database :)
Hope this helps
@ravvi-kumar do you know how to map the response of
openai.beta.threads.messages.list
to theuseAssistant
hook'smessages
properly?const { setMessages } = useAssistant({ // ... }) useEffect(() => { if (props.messages) { const mapped = props.messages.map((message) => { // ... }) setMessages(mapped) } }, [props.message, setMessages])
Something like this here:
for (
let i = 0;
i < Math.max(userMessages.length, assistantMessages.length);
i++
) {
if (userMessages[i]) {
combinedMessages.push({
role: 'user',
id: `user-${i}`,
content: userMessages[i]
});
}
if (assistantMessages[i]) {
combinedMessages.push({
role: 'assistant',
id: `assistant-${i}`,
content: assistantMessages[i]
});
}
}
opps my bad, sendMessage is something else. you can use finalMessages() method returned from stream to get the response result.
runStream.finalMessages().then((finalMessages) => { console.log("finalMessages", finalMessages); });
hope this works for your usecase, let me know.
Yes, it works, but at the same time, I was wondering. How do I convert the messages coming openai into the same format as forwardStream() does properly?
the LangChainStream
from Vercel SDK does not work (still today) for langchain's llm.stream()
... for your api route, you can create your own LangChainStream to work with it like this:
This is what LangChainStreamCustom() looks like. It only has the onCompletion but you get the jist and can implement the other callbacks like onToken etc.. if you need it.
export const LangChainStreamCustom= (
stream: any,
{ onCompletion }: { onCompletion: (completion: string) => Promise<void> }
) => {
let completion = ''
const transformStream = new TransformStream({
transform(chunk, controller) {
completion += new TextDecoder('utf-8').decode(chunk)
controller.enqueue(chunk)
},
flush(controller) {
onCompletion(completion)
.then(() => {
controller.terminate()
})
.catch((e: any) => {
console.error('Error', e)
controller.terminate()
})
}
})
stream.pipeThrough(transformStream)
return transformStream.readable
}
Then in your api route.. you get the stream or response from the LangChain call .stream()
//....
response = llm.stream({}) // assuming this comes from a typical LangChain
Then you can use it like all the other Vercel "OpenAIStream, AnthropicStream" .. etc.
const stream = LangChainStreamCustom(response, {
onCompletion: async (completion: string) => {
console.log('COMPLETE!', completion)
}
})
return new StreamingTextResponse(stream)