pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

Any example for advanced RAG and History Flow?

Open hdduytran opened this issue 8 months ago • 8 comments

Hi everyone,

I’ve been experimenting with LangChain, but it seems a bit outdated for my current needs.

I noticed there’s a foundational example of using RAG with Gemini, which is great! However, I’m looking for a more advanced implementation — specifically one that integrates with a vector database and history/

Has anyone already implemented something like this, or can share a more advanced example?

Would really appreciate any shared resources or guidance!

Thanks in advance 🙏

hdduytran avatar Apr 04 '25 09:04 hdduytran

Hey @hdduytran

Thanks for sharing your use case!

Just to better understand and help out, could you clarify what exactly you're trying to achieve? Are you aiming to build a voice assistant or a chatbot that answers questions from an external knowledge source using RAG?

If that's the case, you might consider setting up an agent that routes queries to a custom tool when the user’s input is related to that external source. Inside the tool's handler function, you can implement your RAG pipeline, connect it with a vector database like FAISS, Weaviate, or Pinecone, and generate responses accordingly.

Would love to hear more about your setup and what you're envisioning, happy to help further! 🙌

Ahmer967 avatar Apr 23 '25 15:04 Ahmer967

I'm trying to build something like that too! If there are any examples available, please share

marctorsoc avatar May 01 '25 15:05 marctorsoc

to @Ahmer967 's point -- have you found that implementing a vector DB search in a tool call isn't enough for your use case?

In terms of history, long-term memory of conversation history can be done with vector DB search as well.

d-towns avatar May 08 '25 04:05 d-towns

@Ahmer967 @d-towns IMO using a tool call to inject external knowledge via RAG would be a bandaid. As you'd always first goto LLM service trigger tool call and then goto LLM Service again which would exhaust more tokens unnecessarily. Would be cool to have a separate service that could sit between Speech2text and LLM and can do RAG injection or could be an extension to LLM service.

Is something like this on the roadmap @markbackman ? Would be happy to contribute here with a PR :)

shahrukhx01 avatar May 17 '25 15:05 shahrukhx01

@shahrukhx01 you can avoid the double response generation that by passing False to the run_llm parameter.

 properties = FunctionCallResultProperties(run_llm=False)
await result_callback(result, properties=properties)

Moreover, the best way to do that is to have your tool retrieve only the relevant data, which can then be passed to the LLM to generate a response based on that data. In this way, your bot will not use unnecessary tokens.

Ahmer967 avatar May 17 '25 17:05 Ahmer967

@Ahmer967 thanks for directing me to this API option. Is there also a way to programmatically force trigger a function always? 🤔

shahrukhx01 avatar May 17 '25 18:05 shahrukhx01

@shahrukhx01 In OpenAI's LLM, we can achieve that by using the tool_choice parameter. By setting tool_choice to { "type": "function", "function": { "name": "my_function" } }, you can explicitly instruct the model to call a specific function. This ensures the specified function is always invoked, regardless of the user's input.

However, I haven't used this with Pipecat. I believe @markbackman might be able to help you better with that.

Ahmer967 avatar May 19 '25 13:05 Ahmer967

I haven't used what @Ahmer967 has suggested.

Is something like this on the roadmap @markbackman ? Would be happy to contribute here with a PR :)

We don't have a RAG service planned at the moment, but I'd be interested to hear what you have in mind. Thus far, having the LLM decide when it needs supplementary input has been an effective way of performing RAG. This would be via a tool call.

If the LLM weren't to do it, you'd need some other logic or functionality that understand when to contextually query for information. This might not be so easy.

markbackman avatar May 19 '25 18:05 markbackman

@markbackman Thanks for your response. In general what I have seen NLP-based agents do it for each user input instead of LLM doing it (via tool call) and even ElevenLabs' voice agents do the Vector DB look up for each user interaction without a tool call. Then result could be filtered based on cosine similarity scores using a threshold. Here's an example below:

Image

I was wondering if adding a Frameprocessor-based AIService would be feasible to add which process each incoming transcription frame post aggregation or a parallel background task (not sure about the specifics here) and enrich it with retrieved context. I am right now trying to familiarize myself with the codebase so ofcourse this is a naive assumption if this would be feasible or not from technical standpoint :)

shahrukhx01 avatar Jun 06 '25 10:06 shahrukhx01

@markbackman FYI: I did a small PoC to try out the idea, by using a custom FrameProcessor. If we make XML tags the standard for injecting context then it'd be possible to post-process and discard the context from final transcripts (using context aggregators). For RAG itself LlamaIndex vector stores implementation could be used to support multiple vector DBs out of the box which have Async support. I'd appreciate your feedback on this :)

PS: We'd also need embedding support, which could be self-contained in the RAGService

class RAGService(FrameProcessor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, TranscriptionFrame):
            logger.info(f"Intercepted transcription {frame.text=}")
            # TODO: Insert Async RAG logic here
            if "weather" in frame.text.lower():
                frame.text = (
                    "<context>Weather in Berlin is 30 degrees Celsius.</context>"
                    + frame.text
                )
            await self.push_frame(frame, direction)
        else:
            await self.push_frame(frame, direction)

# LLM service with `<context>` tags

  first_message_chat = [
      {
          "role": "system",
          "content": (
              "You are a helpful assistant. Please note the content in <context>, </context> tags is "
              "retrieved from a external source and is not part of the conversation."
          ),
      },
      {"role": "assistant", "content": first_message},
  ]

  pipeline = Pipeline(
      [
          transport.input(),
          stt_service,
          RAGService(),
          context_aggregator.user(),
          llm_service,
          tts_service,
          transport.output(),
          context_aggregator.assistant(),
      ]
  )

shahrukhx01 avatar Jun 12 '25 06:06 shahrukhx01

@markbackman After giving a bit more thought about it, I have the following RAGService implementation that works with any vector database. Right now to orchestrate retrieval and embedding queries I have used llama-index. I have following questions:

  • Would you be open to have something in pipecat? If yes, would the vector db integrations and embedding model integrations would need to be written from ground up or using a wrapper like llama-index etc.
  • I can already notice that passing OpenAILLMContext to an AIService class is kinda like anti-pattern in pipecat, would this have to alleviated by a new context_aggregator.system()?

Please let me know if you find this a promising direction, I will happy to iterate over this draft and improve it. Thanks!

Below is the RAGService implementation, the full runnable code snippet can be found here

class RAGService(AIService):
    def __init__(self, context: OpenAILLMContext, **kwargs):
        super().__init__(**kwargs)
        self.context = context
        self.current_rag_task: asyncio.Task[None] | None = None

        # Done in a hacky way via llama_index, can be implemented with native embedding model provider client
        # Additional dependencies:
        #   "llama-index>=0.12.38",
        #   "llama-index-embeddings-openai>=0.3.1"
        dummy_docs = [
            Document(text="Weather in San Francisco is sunny"),
            Document(text="Weather in New York is rainy"),
            Document(text="Weather in London is cloudy"),
            Document(text="Weather in Paris is sunny"),
            Document(text="Weather in Tokyo is rainy"),
            Document(text="Weather in Sydney is sunny"),
            Document(text="Weather in Berlin is cloudy"),
            Document(text="Weather in Rome is sunny"),
        ]

        vector_index = VectorStoreIndex.from_documents(documents=dummy_docs)
        embedding_model = OpenAIEmbedding(model="text-embedding-3-small")
        self._similarity_threshold = 0.7
        self.retriever = vector_index.as_retriever(similarity_top_k=1, embedding_model=embedding_model)

    def _cancel_current_rag_task(self) -> None:
        """Cancel the RAG task if it is running."""
        if self.current_rag_task:
            try:
                self.current_rag_task.cancel()
                self.current_rag_task = None
                logger.info("RAG task canceled successfully")
            except Exception as e:
                logger.error(f"Error canceling RAG task: {e}")

    async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
        """Process the frame and add the relevant documents to the context as a system message.

        Args:
            frame (Frame): The frame to process.
            direction (FrameDirection): The direction of the frame.
        """
        await super().process_frame(frame, direction)

        if isinstance(frame, TranscriptionFrame):
            # if a new transcription is detected, cancel the RAG task and start a new one
            logger.info(f"User transcription: {frame.text=}")
            self._cancel_current_rag_task()
            self.current_rag_task = asyncio.create_task(self._rag_task(frame))
            await self.current_rag_task
            await self.push_frame(frame, direction)
        elif isinstance(frame, StartInterruptionFrame):
            # if the user starts speaking again, cancel the RAG task
            self._cancel_current_rag_task()
            await self.push_frame(frame, direction)
        else:
            await self.push_frame(frame, direction)

    async def _rag_task(self, frame: TranscriptionFrame) -> None:
        """Query the Vector DB for the transcription and add the relevant documents to the context as a system message.

        Args:
            frame (TranscriptionFrame): The transcription frame.
        """
        logger.info(f"Querying Vector DB for transcription: {frame.text}")

        retrieved_nodes = await self.retriever.aretrieve(frame.text)
        retrieved_docs = [node.get_text() for node in retrieved_nodes if node.score > self._similarity_threshold]
        logger.info(
            f"Retrieved {len(retrieved_docs)} documents for transcription: {frame.text}"
        )
        if retrieved_docs:
            context_text = "\n".join(doc for doc in retrieved_docs)
            self.context.add_message(
                ChatCompletionSystemMessageParam(
                    content=context_text, name="RAG", role="system"
                )
            )
        logger.info("Rag task completed, cleaning up the current task.")
        self.current_rag_task = None


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
    """WebSocket endpoint for streaming audio data."""
 
    context = OpenAILLMContext(messages)
    context_aggregator = llm.create_context_aggregator(context)

    pipeline = Pipeline(
        [
            transport.input(),  # Transport user input
            stt,  # STT,
            RAGService(context),  # RAG
            context_aggregator.user(),  # User responses
            llm,  # LLM
            tts,  # TTS
            transport.output(),  # Transport bot output
            context_aggregator.assistant(),  # Assistant spoken responses
        ]
    )
  ...

FYI: @hdduytran @Ahmer967 @marctorsoc @d-towns

shahrukhx01 avatar Jul 09 '25 16:07 shahrukhx01

@markbackman any solutions for this?

Sajal0208 avatar Oct 08 '25 21:10 Sajal0208

Somehow I totally missed this thread. Apologies! One question is why a standalone RAG service vs having the LLM make a tool call with a RAG query?

The benefits of the LLM tool call is that the LLM can take the input and context and structure an input for RAG that's appropriate. LLMs, after all, are great natural language processors. This is theoretically better than just raw, unprocessed input from the user (if I understand the RAGService approach correctly).

markbackman avatar Oct 22 '25 03:10 markbackman