pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

class LangchainProcessor() do it supports multi agentic workflow ...sort of Langgraph?

Open geekofycoder opened this issue 1 year ago • 6 comments

If we want to connect multiple chatbots with different prompt context,multiple agents is something that comes to mind.Can we achieve this presently through any means or we have to wait for langgraph processor.

Basically my question is that is there any way to achieve multi agentic workflow in pipecat as of now?

geekofycoder avatar Jul 17 '24 03:07 geekofycoder

I am working on a Langgraph processor as we speak. I can push this into a feature branch this week, for a full PR I still need to create a test case and example. Stay tuned!

TomTom101 avatar Jul 17 '24 09:07 TomTom101

For the time being, check out https://github.com/TomTom101/pipecat/tree/feat/langgraph Instantiate a processor and place it in the pipe with a compiled graph:

workflow = StateGraph(AgentState)
graph: CompiledGraph = workflow.compile()
lg = LanggraphProcessor(graph)

Let me know how well this works for you, I have only really used and tested it for a specific use case so far.

TomTom101 avatar Jul 17 '24 09:07 TomTom101

sure waiting to use multi agentic workflow 😃

geekofycoder avatar Jul 17 '24 10:07 geekofycoder

Well, your agents are all in the graph, see e.g. https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb?ref=blog.langchain.dev

TomTom101 avatar Jul 17 '24 11:07 TomTom101

Well, your agents are all in the graph, see e.g. https://github.com/langchain-ai/langgraph/blob/main/examples/multi_agent/agent_supervisor.ipynb?ref=blog.langchain.dev

any updates on the Langgraph processor. I had a chatbot and I wanted to replicate with pipecat.

geekofycoder avatar Jul 24 '24 12:07 geekofycoder

Just for the quick start the chatbot I am following is https://github.com/langchain-ai/langgraph/blob/main/examples/chatbots/information-gather-prompting.ipynb you can show me some light of how to automating the above.

geekofycoder avatar Jul 25 '24 08:07 geekofycoder

Hi @TomTom101, Is there any update on the langgraph processor? I really need it for an application, I am willing to put in some time to develop it as well if you can point me in the right direction.

Ridhwanluthra avatar Nov 12 '24 18:11 Ridhwanluthra

related to pipecat-flows, #720

vr000m avatar Dec 02 '24 23:12 vr000m

I have a multi agent application where I need to call the agent from browser-use: https://github.com/browser-use/browser-use which is built on top of langchain...

The question is how can I guarantee that the master agent of pipecat "talks" with the browser-use one and waits for its action to complete? can I simply instantiate the agent in a function call in pipecat and it will automatically wait for the browser-use agent to complete its task?

tropxy avatar Jan 04 '25 00:01 tropxy

FWIW, livekit has https://github.com/dqbd/langgraph-livekit-agents, so maybe it's possible to do something similar for pipecat? Is that project similar to what you built? @TomTom101 If so, I'd be happy to help publishing it as a package if you don't have the time :)

marctorsoc avatar Jul 01 '25 19:07 marctorsoc

I can give you a starting point that I did some time ago. It might not work with current pipecat versions but should be easy to get working.

import os
from dataclasses import dataclass
from typing import Any, Union

from langchain.schema import Document
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import HumanMessage

try:
    from langfuse.callback import CallbackHandler
except ModuleNotFoundError as e:
    pass
from langgraph.graph.graph import CompiledGraph
from loguru import logger
from pipecat.frames.frames import (
    DataFrame,
    Frame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMMessagesFrame,
    TextFrame,
    TranscriptionFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

try:
    from langchain_core.messages import AIMessageChunk
except ModuleNotFoundError as e:
    logger.exception(
        "In order to use Langchain, you need to `pip install pipecat-ai[langchain]`. "
    )
    raise Exception(f"Missing module: {e}")


@dataclass
class ToolResultMessage(DataFrame):
    result: Any
    type: str = "tool_result"

    def __str__(self):
        return f"{self.name}(result: {self.result})"


class LanggraphProcessor(FrameProcessor):
    def __init__(
        self,
        graph: CompiledGraph,
        *,
        init_state: dict[str, str] | None = None,
    ):
        super().__init__()
        self._graph = graph
        self._participant_id: str | None = None
        self._init_state = init_state or {}
        """Amend the state with arbitrary data, for instance the context to be loaded into the prompt"""
        self._callbacks: list[BaseCallbackHandler] = []
        if os.getenv("LANGFUSE_HOST"):
            from langfuse.callback import CallbackHandler

            self._callbacks.append(CallbackHandler())

    def set_participant_id(self, participant_id: str):
        self._participant_id = participant_id

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)
        if isinstance(frame, LLMMessagesFrame):
            # Messages are accumulated by the `LLMUserResponseAggregator` in a list of messages.
            # The last one by the human is the one we want to send to the LLM.

            logger.debug(f"Got LLM messages frame {frame}")
            text: str = frame.messages[-1]["content"]

            await self._ainvoke(text.strip())
        elif isinstance(frame, TranscriptionFrame):
            logger.debug(f"Got transcription frame {frame}")

            await self._ainvoke(frame.text.strip())
        else:
            await self.push_frame(frame, direction)

    @staticmethod
    def __get_token_value(text: Union[str, AIMessageChunk]) -> str:
        match text:
            case str():
                return text
            case AIMessageChunk():
                return str(text.content)
            case _:
                return ""

    async def _ainvoke(self, text: str):
        logger.info(f"Invoking agent with {text}")
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            state = {"messages": [HumanMessage(content=text)], **self._init_state}
            async for event in self._graph.astream_events(
                state,
                config={
                    "configurable": {"thread_id": self._participant_id},
                    "callbacks": self._callbacks,
                },
                version="v1",
            ):
                match event["event"]:
                    case "on_chat_model_stream":
                        await self.push_frame(
                            TextFrame(self.__get_token_value(event["data"]["chunk"]))
                        )
                    case "on_tool_start":
                        logger.info(
                            f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}"
                        )
                    # TODO Include name and link in the meta data, no need to send page_content at all
                    case "on_retriever_end":
                        docs: list[Document] = event["data"]["output"]["documents"]
                        logger.info(f"Sending {len(docs)} docs")
                        for doc in docs:
                            await self.push_frame(
                                ToolResultMessage(doc.dict(exclude_none=True))
                            )
                    case _:
                        pass
        except GeneratorExit:
            logger.exception(f"{self} generator was closed prematurely")
        except Exception as e:
            logger.exception(f"{self} an unknown error occurred: {e}")

        await self.push_frame(LLMFullResponseEndFrame())

In the bot, you need to provide your compiled workflow and add this to your pipeline right after transport.input() (init_state is optional):

lg = LanggraphProcessor(graph, init_state={"context": context})

Hope that helps! I never found the time for a proper PR with tests and documentation. I'd be happy if you can continue where I left off!

TomTom101 avatar Jul 02 '25 06:07 TomTom101

thanks a lot @TomTom101 . I see you also wrote in here. We're actually discussing whether to go with pipecat vs livekit and this was a point for livekit. But if we integrate this, could stop being a decision factor.

If we go with pipecat, I'll definitely make a PR to pipecat once we make it work with the current version

marctorsoc avatar Jul 03 '25 10:07 marctorsoc

If we go with pipecat, I'll definitely make a PR to pipecat once we make it work with the current version

@marctorsoc, if you do, we'd love the submission!

markbackman avatar Jul 03 '25 17:07 markbackman

@marctorsoc i made a custom langraph processor working with a simple langraph agent and pipecat. did you go with pipecat? https://gist.github.com/akashicMarga/95cc387e6a9415ad29f6b59d9b22bfd5

akashicMarga avatar Sep 05 '25 06:09 akashicMarga

thanks for the heads up @akashicMarga . Yeah, in the end we decided to move to livekit for a variety of reasons, but the reality is we do not use langgraph either but the simple livekit agents framework. One of the main points for us to use langgraph was being able to do evals. So we thought we'd do evals on langgraph, and then plug that same langgraph into livekit / pipecat. But the reality is that livekit agents does provide a testing framework that is enough for our current needs, and makes everything simpler. I don't think pipecat does AFAIK

marctorsoc avatar Sep 06 '25 07:09 marctorsoc

@marctorsoc would you mind sharing more about the testing framework?

markbackman avatar Sep 10 '25 03:09 markbackman

@markbackman https://docs.livekit.io/agents/build/testing. Feel free to correct me if I missed it in pipecat :)

marctorsoc avatar Sep 10 '25 10:09 marctorsoc