class LangchainProcessor() do it supports multi agentic workflow ...sort of Langgraph?
If we want to connect multiple chatbots with different prompt context,multiple agents is something that comes to mind.Can we achieve this presently through any means or we have to wait for langgraph processor.
Basically my question is that is there any way to achieve multi agentic workflow in pipecat as of now?
I am working on a Langgraph processor as we speak. I can push this into a feature branch this week, for a full PR I still need to create a test case and example. Stay tuned!
For the time being, check out https://github.com/TomTom101/pipecat/tree/feat/langgraph Instantiate a processor and place it in the pipe with a compiled graph:
workflow = StateGraph(AgentState)
graph: CompiledGraph = workflow.compile()
lg = LanggraphProcessor(graph)
Let me know how well this works for you, I have only really used and tested it for a specific use case so far.
sure waiting to use multi agentic workflow 😃
Well, your agents are all in the graph, see e.g. https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb?ref=blog.langchain.dev
Well, your agents are all in the graph, see e.g. https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb?ref=blog.langchain.dev
any updates on the Langgraph processor. I had a chatbot and I wanted to replicate with pipecat.
Just for the quick start the chatbot I am following is https://github.com/langchain-ai/langgraph/blob/main/examples/chatbots/information-gather-prompting.ipynb you can show me some light of how to automating the above.
Hi @TomTom101, Is there any update on the langgraph processor? I really need it for an application, I am willing to put in some time to develop it as well if you can point me in the right direction.
related to pipecat-flows, #720
I have a multi agent application where I need to call the agent from browser-use: https://github.com/browser-use/browser-use which is built on top of langchain...
The question is how can I guarantee that the master agent of pipecat "talks" with the browser-use one and waits for its action to complete? can I simply instantiate the agent in a function call in pipecat and it will automatically wait for the browser-use agent to complete its task?
FWIW, livekit has https://github.com/dqbd/langgraph-livekit-agents, so maybe it's possible to do something similar for pipecat? Is that project similar to what you built? @TomTom101 If so, I'd be happy to help publishing it as a package if you don't have the time :)
I can give you a starting point that I did some time ago. It might not work with current pipecat versions but should be easy to get working.
import os
from dataclasses import dataclass
from typing import Any, Union
from langchain.schema import Document
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import HumanMessage
try:
from langfuse.callback import CallbackHandler
except ModuleNotFoundError as e:
pass
from langgraph.graph.graph import CompiledGraph
from loguru import logger
from pipecat.frames.frames import (
DataFrame,
Frame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesFrame,
TextFrame,
TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
try:
from langchain_core.messages import AIMessageChunk
except ModuleNotFoundError as e:
logger.exception(
"In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. "
)
raise Exception(f"Missing module: {e}")
@dataclass
class ToolResultMessage(DataFrame):
result: Any
type: str = "tool_result"
def __str__(self):
return f"{self.name}(result: {self.result})"
class LanggraphProcessor(FrameProcessor):
def __init__(
self,
graph: CompiledGraph,
*,
init_state: dict[str, str] | None = None,
):
super().__init__()
self._graph = graph
self._participant_id: str | None = None
self._init_state = init_state or {}
"""Amend the state with arbitrary data, for instance the context to be loaded into the prompt"""
self._callbacks: list[BaseCallbackHandler] = []
if os.getenv("LANGFUSE_HOST"):
from langfuse.callback import CallbackHandler
self._callbacks.append(CallbackHandler())
def set_participant_id(self, participant_id: str):
self._participant_id = participant_id
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, LLMMessagesFrame):
# Messages are accumulated by the `LLMUserResponseAggregator` in a list of messages.
# The last one by the human is the one we want to send to the LLM.
logger.debug(f"Got LLM messages frame {frame}")
text: str = frame.messages[-1]["content"]
await self._ainvoke(text.strip())
elif isinstance(frame, TranscriptionFrame):
logger.debug(f"Got transcription frame {frame}")
await self._ainvoke(frame.text.strip())
else:
await self.push_frame(frame, direction)
@staticmethod
def __get_token_value(text: Union[str, AIMessageChunk]) -> str:
match text:
case str():
return text
case AIMessageChunk():
return str(text.content)
case _:
return ""
async def _ainvoke(self, text: str):
logger.info(f"Invoking agent with {text}")
await self.push_frame(LLMFullResponseStartFrame())
try:
state = {"messages": [HumanMessage(content=text)], **self._init_state}
async for event in self._graph.astream_events(
state,
config={
"configurable": {"thread_id": self._participant_id},
"callbacks": self._callbacks,
},
version="v1",
):
match event["event"]:
case "on_chat_model_stream":
await self.push_frame(
TextFrame(self.__get_token_value(event["data"]["chunk"]))
)
case "on_tool_start":
logger.info(
f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
)
# TODO Include name and link in the meta data, no need to send page_content at all
case "on_retriever_end":
docs: list[Document] = event["data"]["output"]["documents"]
logger.info(f"Sending {len(docs)} docs")
for doc in docs:
await self.push_frame(
ToolResultMessage(doc.dict(exclude_none=True))
)
case _:
pass
except GeneratorExit:
logger.exception(f"{self} generator was closed prematurely")
except Exception as e:
logger.exception(f"{self} an unknown error occurred: {e}")
await self.push_frame(LLMFullResponseEndFrame())
In the bot, you need to provide your compiled workflow and add this to your pipeline right after transport.input() (init_state is optional):
lg = LanggraphProcessor(graph, init_state={"context": context})
Hope that helps! I never found the time for a proper PR with tests and documentation. I'd be happy if you can continue where I left off!
thanks a lot @TomTom101 . I see you also wrote in here. We're actually discussing whether to go with pipecat vs livekit and this was a point for livekit. But if we integrate this, could stop being a decision factor.
If we go with pipecat, I'll definitely make a PR to pipecat once we make it work with the current version
If we go with pipecat, I'll definitely make a PR to pipecat once we make it work with the current version
@marctorsoc, if you do, we'd love the submission!
@marctorsoc i made a custom langraph processor working with a simple langraph agent and pipecat. did you go with pipecat? https://gist.github.com/akashicMarga/95cc387e6a9415ad29f6b59d9b22bfd5
thanks for the heads up @akashicMarga . Yeah, in the end we decided to move to livekit for a variety of reasons, but the reality is we do not use langgraph either but the simple livekit agents framework. One of the main points for us to use langgraph was being able to do evals. So we thought we'd do evals on langgraph, and then plug that same langgraph into livekit / pipecat. But the reality is that livekit agents does provide a testing framework that is enough for our current needs, and makes everything simpler. I don't think pipecat does AFAIK
@marctorsoc would you mind sharing more about the testing framework?
@markbackman https://docs.livekit.io/agents/build/testing. Feel free to correct me if I missed it in pipecat :)