ai icon indicating copy to clipboard operation
ai copied to clipboard

[AsistantResponse] - to save streaming data to external database

Open ayepRahman opened this issue 9 months ago • 8 comments

Feature Description

Save streaming thread message data to external DB

Use Case

Is there a way to save thread messages to external database after the completion of streaming via AssistantResponse like how OpenAiStream has a callback for completion?

Additional context

No response

ayepRahman avatar Apr 28 '24 06:04 ayepRahman

Hi @ayepRahman ,

you can use "sendMessage" function present on assistantresponsecallback to save the current response to your external database https://sdk.vercel.ai/docs/api-reference/providers/assistant-response#process-assistantresponsecallback image as of now, the documentation on "sendMessage" is not updated yet.

and if you want to get the whole chat history then you can use the threadId to get all the messages https://platform.openai.com/docs/api-reference/messages/listMessages

ravvi-kumar avatar Apr 28 '24 10:04 ravvi-kumar

do u by any chance have any example on using sendMessage()?

ayepRahman avatar Apr 28 '24 13:04 ayepRahman

opps my bad, sendMessage is something else. you can use finalMessages() method returned from stream to get the response result. image

runStream.finalMessages().then((finalMessages) => {
    console.log("finalMessages", finalMessages);
});

hope this works for your usecase, let me know.

ravvi-kumar avatar Apr 28 '24 14:04 ravvi-kumar

@ravvi-kumar do you know how to map the response of openai.beta.threads.messages.list to the useAssistant hook's messages properly?

const { setMessages } = useAssistant({
  // ...
})
useEffect(() => {
   if (props.messages) {
     const mapped = props.messages.map((message) => {
       // ...
     })
     setMessages(mapped)
   }
}, [props.message, setMessages])

mattp0123 avatar Apr 28 '24 14:04 mattp0123

I just save the string that the model output at the very end of my action.tsx

 const result = await experimental_streamText({
      model: anthropic('claude-3-opus-20240229'),
      maxTokens: 4000,
      temperature: 0,
      frequencyPenalty: 0.5,
      system: systemPromptTemplate,
      messages: [
        ...aiState.get().map((info: any) => ({
          role: info.role,
          content: info.content,
          name: info.name
        }))
      ]
    });

    let fullResponse = '';
    for await (const textDelta of result.textStream) {
      fullResponse += textDelta;
      uiStream.update(<BotMessage>{fullResponse}</BotMessage>);
    }

    uiStream.done();
    aiState.done([
      ...aiState.get(),
      {
        role: 'assistant',
        content: fullResponse,
        id: uuidv4()
      }
    ]);
    saveChatToRedis(
      CurrentChatSessionId,
      session.id,
      currnetUserMessage,
      fullResponse,
      Array.from(uniqueReferences)
    );
  })();

  return {
    id: Date.now(),
    display: uiStream.value,
    chatId: CurrentChatSessionId
  };
}

I have not had any issues with saying them.

In an api route i used the

let partialCompletion = '';
  const { stream, handlers } = LangChainStream({
    onToken: (token: string) => {
      partialCompletion += token;
    },
    onFinal: (completion) => {
      try {
        saveChatToRedis(
          chatSessionId,
          userId,
          messages[messages.length - 1].content,
          partialCompletion,
          Array.from(uniqueReferences)
        );
        cacheChatCompletionVectorDB(
          messages[messages.length - 1].content,
          completion,
          'cache-chat',
          Array.from(uniqueReferences)
        );
      } catch (error) {
        console.error('Error saving chat to database:', error);
      }
      revalidatePath('/chatai', 'layout');
    }
  });

the partial on token is if the user stops the chat in teh middle of the streaming response so we always store the latest token in the database :)

Hope this helps

ElectricCodeGuy avatar Apr 28 '24 23:04 ElectricCodeGuy

@ravvi-kumar do you know how to map the response of openai.beta.threads.messages.list to the useAssistant hook's messages properly?

const { setMessages } = useAssistant({
  // ...
})
useEffect(() => {
   if (props.messages) {
     const mapped = props.messages.map((message) => {
       // ...
     })
     setMessages(mapped)
   }
}, [props.message, setMessages])

Something like this here:

for (
    let i = 0;
    i < Math.max(userMessages.length, assistantMessages.length);
    i++
  ) {
    if (userMessages[i]) {
      combinedMessages.push({
        role: 'user',
        id: `user-${i}`,
        content: userMessages[i]
      });
    }
    if (assistantMessages[i]) {
      combinedMessages.push({
        role: 'assistant',
        id: `assistant-${i}`,
        content: assistantMessages[i]
      });
    }
  }

ElectricCodeGuy avatar Apr 28 '24 23:04 ElectricCodeGuy

opps my bad, sendMessage is something else. you can use finalMessages() method returned from stream to get the response result. image

runStream.finalMessages().then((finalMessages) => {
    console.log("finalMessages", finalMessages);
});

hope this works for your usecase, let me know.

Yes, it works, but at the same time, I was wondering. How do I convert the messages coming openai into the same format as forwardStream() does properly?

ayepRahman avatar Apr 29 '24 05:04 ayepRahman

the LangChainStream from Vercel SDK does not work (still today) for langchain's llm.stream() ... for your api route, you can create your own LangChainStream to work with it like this:

This is what LangChainStreamCustom() looks like. It only has the onCompletion but you get the jist and can implement the other callbacks like onToken etc.. if you need it.

export const LangChainStreamCustom= (
  stream: any,
  { onCompletion }: { onCompletion: (completion: string) => Promise<void> }
) => {
  let completion = ''
  const transformStream = new TransformStream({
    transform(chunk, controller) {
      completion += new TextDecoder('utf-8').decode(chunk)
      controller.enqueue(chunk)
    },
    flush(controller) {
      onCompletion(completion)
        .then(() => {
          controller.terminate()
        })
        .catch((e: any) => {
          console.error('Error', e)
          controller.terminate()
        })
    }
  })

  stream.pipeThrough(transformStream)

  return transformStream.readable
}

Then in your api route.. you get the stream or response from the LangChain call .stream()

//....
response = llm.stream({}) // assuming this comes from a typical LangChain

Then you can use it like all the other Vercel "OpenAIStream, AnthropicStream" .. etc.

const stream = LangChainStreamCustom(response, {
    onCompletion: async (completion: string) => {
      console.log('COMPLETE!', completion)
    }
})

return new StreamingTextResponse(stream)

oliviermills avatar Apr 29 '24 23:04 oliviermills