Any example for advanced RAG and History Flow?
Hi everyone,
I’ve been experimenting with LangChain, but it seems a bit outdated for my current needs.
I noticed there’s a foundational example of using RAG with Gemini, which is great! However, I’m looking for a more advanced implementation — specifically one that integrates with a vector database and history/
Has anyone already implemented something like this, or can share a more advanced example?
Would really appreciate any shared resources or guidance!
Thanks in advance 🙏
Hey @hdduytran
Thanks for sharing your use case!
Just to better understand and help out, could you clarify what exactly you're trying to achieve? Are you aiming to build a voice assistant or a chatbot that answers questions from an external knowledge source using RAG?
If that's the case, you might consider setting up an agent that routes queries to a custom tool when the user’s input is related to that external source. Inside the tool's handler function, you can implement your RAG pipeline, connect it with a vector database like FAISS, Weaviate, or Pinecone, and generate responses accordingly.
Would love to hear more about your setup and what you're envisioning, happy to help further! 🙌
I'm trying to build something like that too! If there are any examples available, please share
to @Ahmer967 's point -- have you found that implementing a vector DB search in a tool call isn't enough for your use case?
In terms of history, long-term memory of conversation history can be done with vector DB search as well.
@Ahmer967 @d-towns IMO using a tool call to inject external knowledge via RAG would be a bandaid. As you'd always first goto LLM service trigger tool call and then goto LLM Service again which would exhaust more tokens unnecessarily. Would be cool to have a separate service that could sit between Speech2text and LLM and can do RAG injection or could be an extension to LLM service.
Is something like this on the roadmap @markbackman ? Would be happy to contribute here with a PR :)
@shahrukhx01 you can avoid the double response generation that by passing False to the run_llm parameter.
properties = FunctionCallResultProperties(run_llm=False)
await result_callback(result, properties=properties)
Moreover, the best way to do that is to have your tool retrieve only the relevant data, which can then be passed to the LLM to generate a response based on that data. In this way, your bot will not use unnecessary tokens.
@Ahmer967 thanks for directing me to this API option. Is there also a way to programmatically force trigger a function always? 🤔
@shahrukhx01 In OpenAI's LLM, we can achieve that by using the tool_choice parameter. By setting tool_choice to { "type": "function", "function": { "name": "my_function" } }, you can explicitly instruct the model to call a specific function. This ensures the specified function is always invoked, regardless of the user's input.
However, I haven't used this with Pipecat. I believe @markbackman might be able to help you better with that.
I haven't used what @Ahmer967 has suggested.
Is something like this on the roadmap @markbackman ? Would be happy to contribute here with a PR :)
We don't have a RAG service planned at the moment, but I'd be interested to hear what you have in mind. Thus far, having the LLM decide when it needs supplementary input has been an effective way of performing RAG. This would be via a tool call.
If the LLM weren't to do it, you'd need some other logic or functionality that understand when to contextually query for information. This might not be so easy.
@markbackman Thanks for your response. In general what I have seen NLP-based agents do it for each user input instead of LLM doing it (via tool call) and even ElevenLabs' voice agents do the Vector DB look up for each user interaction without a tool call. Then result could be filtered based on cosine similarity scores using a threshold. Here's an example below:
I was wondering if adding a Frameprocessor-based AIService would be feasible to add which process each incoming transcription frame post aggregation or a parallel background task (not sure about the specifics here) and enrich it with retrieved context. I am right now trying to familiarize myself with the codebase so ofcourse this is a naive assumption if this would be feasible or not from technical standpoint :)
@markbackman FYI: I did a small PoC to try out the idea, by using a custom FrameProcessor. If we make XML tags the standard for injecting context then it'd be possible to post-process and discard the context from final transcripts (using context aggregators). For RAG itself LlamaIndex vector stores implementation could be used to support multiple vector DBs out of the box which have Async support. I'd appreciate your feedback on this :)
PS: We'd also need embedding support, which could be self-contained in the RAGService
class RAGService(FrameProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.info(f"Intercepted transcription {frame.text=}")
# TODO: Insert Async RAG logic here
if "weather" in frame.text.lower():
frame.text = (
"<context>Weather in Berlin is 30 degrees Celsius.</context>"
+ frame.text
)
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
# LLM service with `<context>` tags
first_message_chat = [
{
"role": "system",
"content": (
"You are a helpful assistant. Please note the content in <context>, </context> tags is "
"retrieved from a external source and is not part of the conversation."
),
},
{"role": "assistant", "content": first_message},
]
pipeline = Pipeline(
[
transport.input(),
stt_service,
RAGService(),
context_aggregator.user(),
llm_service,
tts_service,
transport.output(),
context_aggregator.assistant(),
]
)
@markbackman After giving a bit more thought about it, I have the following RAGService implementation that works with any vector database. Right now to orchestrate retrieval and embedding queries I have used llama-index. I have following questions:
- Would you be open to have something in pipecat? If yes, would the vector db integrations and embedding model integrations would need to be written from ground up or using a wrapper like llama-index etc.
- I can already notice that passing
OpenAILLMContextto anAIServiceclass is kinda like anti-pattern in pipecat, would this have to alleviated by a newcontext_aggregator.system()?
Please let me know if you find this a promising direction, I will happy to iterate over this draft and improve it. Thanks!
Below is the RAGService implementation, the full runnable code snippet can be found here
class RAGService(AIService):
def __init__(self, context: OpenAILLMContext, **kwargs):
super().__init__(**kwargs)
self.context = context
self.current_rag_task: asyncio.Task[None] | None = None
# Done in a hacky way via llama_index, can be implemented with native embedding model provider client
# Additional dependencies:
# "llama-index>=0.12.38",
# "llama-index-embeddings-openai>=0.3.1"
dummy_docs = [
Document(text="Weather in San Francisco is sunny"),
Document(text="Weather in New York is rainy"),
Document(text="Weather in London is cloudy"),
Document(text="Weather in Paris is sunny"),
Document(text="Weather in Tokyo is rainy"),
Document(text="Weather in Sydney is sunny"),
Document(text="Weather in Berlin is cloudy"),
Document(text="Weather in Rome is sunny"),
]
vector_index = VectorStoreIndex.from_documents(documents=dummy_docs)
embedding_model = OpenAIEmbedding(model="text-embedding-3-small")
self._similarity_threshold = 0.7
self.retriever = vector_index.as_retriever(similarity_top_k=1, embedding_model=embedding_model)
def _cancel_current_rag_task(self) -> None:
"""Cancel the RAG task if it is running."""
if self.current_rag_task:
try:
self.current_rag_task.cancel()
self.current_rag_task = None
logger.info("RAG task canceled successfully")
except Exception as e:
logger.error(f"Error canceling RAG task: {e}")
async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Process the frame and add the relevant documents to the context as a system message.
Args:
frame (Frame): The frame to process.
direction (FrameDirection): The direction of the frame.
"""
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
# if a new transcription is detected, cancel the RAG task and start a new one
logger.info(f"User transcription: {frame.text=}")
self._cancel_current_rag_task()
self.current_rag_task = asyncio.create_task(self._rag_task(frame))
await self.current_rag_task
await self.push_frame(frame, direction)
elif isinstance(frame, StartInterruptionFrame):
# if the user starts speaking again, cancel the RAG task
self._cancel_current_rag_task()
await self.push_frame(frame, direction)
else:
await self.push_frame(frame, direction)
async def _rag_task(self, frame: TranscriptionFrame) -> None:
"""Query the Vector DB for the transcription and add the relevant documents to the context as a system message.
Args:
frame (TranscriptionFrame): The transcription frame.
"""
logger.info(f"Querying Vector DB for transcription: {frame.text}")
retrieved_nodes = await self.retriever.aretrieve(frame.text)
retrieved_docs = [node.get_text() for node in retrieved_nodes if node.score > self._similarity_threshold]
logger.info(
f"Retrieved {len(retrieved_docs)} documents for transcription: {frame.text}"
)
if retrieved_docs:
context_text = "\n".join(doc for doc in retrieved_docs)
self.context.add_message(
ChatCompletionSystemMessageParam(
content=context_text, name="RAG", role="system"
)
)
logger.info("Rag task completed, cleaning up the current task.")
self.current_rag_task = None
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
"""WebSocket endpoint for streaming audio data."""
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT,
RAGService(context), # RAG
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
...
FYI: @hdduytran @Ahmer967 @marctorsoc @d-towns
@markbackman any solutions for this?
Somehow I totally missed this thread. Apologies! One question is why a standalone RAG service vs having the LLM make a tool call with a RAG query?
The benefits of the LLM tool call is that the LLM can take the input and context and structure an input for RAG that's appropriate. LLMs, after all, are great natural language processors. This is theoretically better than just raw, unprocessed input from the user (if I understand the RAGService approach correctly).