langserve
langserve copied to clipboard
Need help to migrate my custom agent to LCEL to use with langserve
Hi, team-langchain,
I have an agent that uses memory, user-authentication as well as function calling. I'd like to migrate it to langserve in production but couldn't find anything as complex as my case in the examples in docs. So I got stuck and need help. Could you please give me an advise how to convert this code to LCEL?
agent.py:
from typing import Type
from langchain.agents import AgentExecutor, OpenAIFunctionsAgent
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import (
AgentTokenBufferMemory,
)
from langchain.callbacks.base import Callbacks
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import MessagesPlaceholder
from langchain.prompts.chat import BaseMessagePromptTemplate
from langchain.schema import SystemMessage
from .src import CustomFirestoreChatMessageHistory, CustomOpenAIFunctionsTool
HUMAN_MESSAGE_TEMPLATE = "..."
class CRMAgent:
tool_classes: list[Type[CustomOpenAIFunctionsTool]]
system_message_template: str
def __init__(self, api_wrapper, crm_user, internal_user):
self.api_wrapper = api_wrapper
self.crm_user = crm_user
self.hints_user = internal_user
self.llm = AzureChatOpenAI(...)
chat_memory = CustomFirestoreChatMessageHistory(
user_id=internal_user["user_id"], session_id=internal_user["integration_id"]
)
self.memory = AgentTokenBufferMemory(chat_memory=chat_memory, llm=self.llm)
self.tools = [
ToolClass.from_api_wrapper(self.api_wrapper, **self.crm_user)
for ToolClass in self.tool_classes
]
system_message = self.system_message_template.format(...)
extra_prompt_messages: list[BaseMessagePromptTemplate] | None = [
MessagesPlaceholder(variable_name=self.memory.memory_key)
]
self.agent = OpenAIFunctionsAgent.from_llm_and_tools(
llm=self.llm,
tools=self.tools,
extra_prompt_messages=extra_prompt_messages,
system_message=SystemMessage(content=system_message),
)
self.executor = AgentExecutor.from_agent_and_tools(
agent=self.agent,
tools=self.tools,
memory=self.memory,
handle_parsing_errors=True,
return_intermediate_steps=True,
metadata=self.hints_user,
)
def run(self, message: str, callbacks: Callbacks) -> str:
"""Run the agent on a human message."""
human_message = HUMAN_MESSAGE_TEMPLATE.format(
timestamp=self.api_wrapper.get_current_timestamp(),
message=message,
)
inputs = {"input": human_message}
return self.executor(inputs, callbacks=callbacks)["output"]
app.py
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Credentials(BaseModel):
access_token: str
class CRMInput(BaseModel):
credentials: Credentials
user_message: str
pipeline_id: str
user_id: str
integration_id: str
@app.post("/crm")
def crm(payload: CRMInput):
agent = CRMAgent(
api_wrapper=APIWrapper(payload.credentials.access_token),
crm_user={
"default_owner_id": None,
"default_pipeline_id": payload.pipeline_id,
},
internal_user={
"user_id": payload.user_id,
"integration_id": payload.integration_id,
},
)
agent.run(payload.user_message, callbacks=None)
TLDR what this code does is:
- On every request create an APIWrapper for a specific user
- Fetch their's data from API to create tools (openai functions), so tools are different for each user
- Fetch their's chat history from Firestore
- Run agent
Do you have any ideas how to turn this into a langserve project?
hi @pokidyshev,
- Create a runnable version of your executor. Check LCEL docs on how to do it. You'll likely need to make it
configurable
to make sure that you can change the memory at run time based on user identity (check example withconfigurable
) - in
add_routes
there's a per request modifier parameter that you can use to add user specific information to theconfig
.
We don't have good documentation yet to show how to do these, but you can look at implementation in https://github.com/langchain-ai/opengpts for reference
Meet same problem and blocked, is it possible to give a sample that can use this memory config through the API?
@JevenZhou will do -- will try to do it this week
@eyurtsev Thanks! Looking forward to it.
Haven't gotten around to full example yet, but we added this to the code base last week which should be fairly helpful: https://api.python.langchain.com/en/latest/schema.runnable/langchain.schema.runnable.history.RunnableWithMessageHistory.html
Hi, @eyurtsev! Thanks for the update!
I've managed to attach message history based on session_id
by using RunnableWithMessageHistory
. Though I had to patch it so it saves intermediate steps too.
I'm now stuck with customizing agent tools based on user's access_token
. I need to create a new instance of APIWrapper(access_token)
on each request and then create a new set of tools from this instance and pass them to the agent. Any ideas how that can be achieved?
Now my code looks like this:
def init_chat_history(destination: str, session_id: str) -> BaseChatMessageHistory:
return FirestoreChatMessageHistory(
destination=destination,
session_id=session_id,
max_messages=5,
)
system_message = SYSTEM_MESSAGE.format(warning="", custom_prompt="")
prompt = ChatPromptTemplate.from_messages(
[
("system", system_message),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
# TODO: tools must be updated on each request
api_wrapper = PipedriveAPIWrapper(access_token)
tools = init_tools(api_wrapper, TOOLS, include_custom_fields=True)
llm = create_azure_gpt4()
llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])
agent = (
{
"input": lambda x: x["input"],
"history": lambda x: x["history"],
"agent_scratchpad": lambda x: format_to_openai_function_messages(
x["intermediate_steps"]
),
}
| prompt
| llm_with_tools
| OpenAIFunctionsAgentOutputParser()
)
executor = AgentExecutor(
agent=agent, # type: ignore
tools=tools,
max_iterations=15,
handle_parsing_errors=True,
return_intermediate_steps=True,
tags=["pipedrive"],
# metadata=hints_user,
verbose=True,
)
executor_with_history = RunnableWithMessageHistory(
executor, # type: ignore
partial(init_chat_history, "pipedrive"),
history_messages_key="history",
)
class Input(BaseModel):
input: str
class Output(BaseModel):
output: str
app = FastAPI(title="LangChain Server", version="1.0")
add_routes(
app,
executor_with_history.with_types(input_type=Input, output_type=Output),
path="/pipedrive",
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)