langserve icon indicating copy to clipboard operation
langserve copied to clipboard

Need help to migrate my custom agent to LCEL to use with langserve

Open pokidyshev opened this issue 1 year ago • 6 comments

Hi, team-langchain,

I have an agent that uses memory, user-authentication as well as function calling. I'd like to migrate it to langserve in production but couldn't find anything as complex as my case in the examples in docs. So I got stuck and need help. Could you please give me an advise how to convert this code to LCEL?

agent.py:

from typing import Type

from langchain.agents import AgentExecutor, OpenAIFunctionsAgent
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import (
    AgentTokenBufferMemory,
)
from langchain.callbacks.base import Callbacks
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import MessagesPlaceholder
from langchain.prompts.chat import BaseMessagePromptTemplate
from langchain.schema import SystemMessage

from .src import CustomFirestoreChatMessageHistory, CustomOpenAIFunctionsTool

HUMAN_MESSAGE_TEMPLATE = "..."


class CRMAgent:
    tool_classes: list[Type[CustomOpenAIFunctionsTool]]
    system_message_template: str

    def __init__(self, api_wrapper, crm_user, internal_user):
        self.api_wrapper = api_wrapper
        self.crm_user = crm_user
        self.hints_user = internal_user

        self.llm = AzureChatOpenAI(...)

        chat_memory = CustomFirestoreChatMessageHistory(
            user_id=internal_user["user_id"], session_id=internal_user["integration_id"]
        )
        self.memory = AgentTokenBufferMemory(chat_memory=chat_memory, llm=self.llm)

        self.tools = [
            ToolClass.from_api_wrapper(self.api_wrapper, **self.crm_user)
            for ToolClass in self.tool_classes
        ]

        system_message = self.system_message_template.format(...)
        extra_prompt_messages: list[BaseMessagePromptTemplate] | None = [
            MessagesPlaceholder(variable_name=self.memory.memory_key)
        ]
        self.agent = OpenAIFunctionsAgent.from_llm_and_tools(
            llm=self.llm,
            tools=self.tools,
            extra_prompt_messages=extra_prompt_messages,
            system_message=SystemMessage(content=system_message),
        )

        self.executor = AgentExecutor.from_agent_and_tools(
            agent=self.agent,
            tools=self.tools,
            memory=self.memory,
            handle_parsing_errors=True,
            return_intermediate_steps=True,
            metadata=self.hints_user,
        )

    def run(self, message: str, callbacks: Callbacks) -> str:
        """Run the agent on a human message."""
        human_message = HUMAN_MESSAGE_TEMPLATE.format(
            timestamp=self.api_wrapper.get_current_timestamp(),
            message=message,
        )
        inputs = {"input": human_message}
        return self.executor(inputs, callbacks=callbacks)["output"]

app.py

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()


class Credentials(BaseModel):
    access_token: str


class CRMInput(BaseModel):
    credentials: Credentials
    user_message: str
    pipeline_id: str
    user_id: str
    integration_id: str


@app.post("/crm")
def crm(payload: CRMInput):
    agent = CRMAgent(
        api_wrapper=APIWrapper(payload.credentials.access_token),
        crm_user={
            "default_owner_id": None,
            "default_pipeline_id": payload.pipeline_id,
        },
        internal_user={
            "user_id": payload.user_id,
            "integration_id": payload.integration_id,
        },
    )
    agent.run(payload.user_message, callbacks=None)

TLDR what this code does is:

  • On every request create an APIWrapper for a specific user
  • Fetch their's data from API to create tools (openai functions), so tools are different for each user
  • Fetch their's chat history from Firestore
  • Run agent

Do you have any ideas how to turn this into a langserve project?

pokidyshev avatar Nov 13 '23 17:11 pokidyshev

hi @pokidyshev,

  1. Create a runnable version of your executor. Check LCEL docs on how to do it. You'll likely need to make it configurable to make sure that you can change the memory at run time based on user identity (check example with configurable)
  2. in add_routes there's a per request modifier parameter that you can use to add user specific information to the config.

We don't have good documentation yet to show how to do these, but you can look at implementation in https://github.com/langchain-ai/opengpts for reference

eyurtsev avatar Nov 14 '23 22:11 eyurtsev

Meet same problem and blocked, is it possible to give a sample that can use this memory config through the API?

JevenZhou avatar Nov 15 '23 14:11 JevenZhou

@JevenZhou will do -- will try to do it this week

eyurtsev avatar Nov 16 '23 21:11 eyurtsev

@eyurtsev Thanks! Looking forward to it.

pokidyshev avatar Nov 17 '23 12:11 pokidyshev

Haven't gotten around to full example yet, but we added this to the code base last week which should be fairly helpful: https://api.python.langchain.com/en/latest/schema.runnable/langchain.schema.runnable.history.RunnableWithMessageHistory.html

eyurtsev avatar Nov 20 '23 16:11 eyurtsev

Hi, @eyurtsev! Thanks for the update!

I've managed to attach message history based on session_id by using RunnableWithMessageHistory. Though I had to patch it so it saves intermediate steps too.

I'm now stuck with customizing agent tools based on user's access_token. I need to create a new instance of APIWrapper(access_token) on each request and then create a new set of tools from this instance and pass them to the agent. Any ideas how that can be achieved?

Now my code looks like this:

def init_chat_history(destination: str, session_id: str) -> BaseChatMessageHistory:
    return FirestoreChatMessageHistory(
        destination=destination,
        session_id=session_id,
        max_messages=5,
    )


system_message = SYSTEM_MESSAGE.format(warning="", custom_prompt="")
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_message),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

# TODO: tools must be updated on each request
api_wrapper = PipedriveAPIWrapper(access_token)
tools = init_tools(api_wrapper, TOOLS, include_custom_fields=True)

llm = create_azure_gpt4()
llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])

agent = (
    {
        "input": lambda x: x["input"],
        "history": lambda x: x["history"],
        "agent_scratchpad": lambda x: format_to_openai_function_messages(
            x["intermediate_steps"]
        ),
    }
    | prompt
    | llm_with_tools
    | OpenAIFunctionsAgentOutputParser()
)

executor = AgentExecutor(
    agent=agent,  # type: ignore
    tools=tools,
    max_iterations=15,
    handle_parsing_errors=True,
    return_intermediate_steps=True,
    tags=["pipedrive"],
    # metadata=hints_user,
    verbose=True,
)

executor_with_history = RunnableWithMessageHistory(
    executor,  # type: ignore
    partial(init_chat_history, "pipedrive"),
    history_messages_key="history",
)


class Input(BaseModel):
    input: str


class Output(BaseModel):
    output: str


app = FastAPI(title="LangChain Server", version="1.0")

add_routes(
    app,
    executor_with_history.with_types(input_type=Input, output_type=Output),
    path="/pipedrive",
)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8000)

pokidyshev avatar Dec 10 '23 05:12 pokidyshev