pipecat
pipecat copied to clipboard
LangchainProcessor can't use custom invocation parameters as config
This is a strategically important issue for me to use pipecat for good.
pipecat:LangchainProcessor
wraps langchain:RunnableWithHistory
. I need to replace session_id
with two fields user_id
and conversation_id
as shown in the Langchain documentation.
My code
chain_runnable = RunnableWithMessageHistory(
prompted_chat_model,
# lambda session_id: get_session_history(database_label, session_id),
lambda session_id: get_session_history(database_label, user_id, conversation_id),
history_messages_key="chat_history",
input_messages_key="input",
**memory_kwargs,
history_factory_config=[
ConfigurableFieldSpec(
id="user_id",
annotation=str,
name="User ID",
description="Unique identifier for the user.",
default="",
is_shared=True,
),
ConfigurableFieldSpec(
id="conversation_id",
annotation=str,
name="Conversation ID",
description="Unique identifier for the conversation.",
default="",
is_shared=True,
),
],
)
Error
The above code throws
ValueError: Missing keys ['conversation_id', 'user_id'] in config['configurable'] Expected keys are ['conversation_id', 'user_id'].When using via .invoke() or .stream(), pass in a config; e.g., chain.invoke({'input': 'foo'}, {'configurable': {'conversation_id': '[your-value-here]', 'user_id': '[your-value-here]'}})
Tried changing the pipecat code to replace session_id in langchain.py
as:
async def _ainvoke(self, text: str):
logger.debug(f"Invoking chain with {text}")
await self.push_frame(LLMFullResponseStartFrame())
try:
async for token in self._chain.astream(
{self._transcript_key: text},
# config={"configurable": {"session_id": self._participant_id}},
config={"configurable": {
'user_id': self._user_id,
'conversation_id': self._conversation_id
}},
):
await self.push_frame(TextFrame(self.__get_token_value(t
but this throws
ValueError: Expected keys ['conversation_id', 'user_id'] do not match parameter names ['session_id'] of get_session_history.
I could extend that to accept the custom parameters but i was looking for a more general solution which established pipecat contributors like @TomTom101 could think of.
Langchain code
As I understand the langchain code, this is how the custom config can be passed:
langchain_core/runnables/base.py
, line 5242, in astream():
self._merge_configs(config)
which calls langchain_core/runnables/history.py
, lines 533ff:
expected_keys = [field_spec.id for field_spec in self.history_factory_config]
...
message_history = self.get_session_history(
**{key: configurable[key] for key in expected_keys}
)
Pipecat code
So in pipecat/processors/frame_processor.py
, the _ainvoke
method uses only the session_id
async def _ainvoke(self, text: str):
logger.debug(f"Invoking chain with {text}")
await self.push_frame(LLMFullResponseStartFrame())
try:
async for token in self._chain.astream(
{self._transcript_key: text},
config={"configurable": {"session_id": self._participant_id}},
):
Suggested Solution
So _ainvoke
could be extended to pass the the keys from langchain's self.history_factory_config
?
Maybe something along this snippet:
async def _ainvoke(self, text: str):
logger.debug(f"Invoking chain with {text}")
await self.push_frame(LLMFullResponseStartFrame())
if hasattr(self, '_configurable'):
configurable = self._configurable
else:
configurable = {"configurable": {"session_id": self._participant_id}}
try:
async for token in self._chain.astream(
{self._transcript_key: text},
config=configurable
):
await self.push_frame(TextFrame(self.__get_token_value(token)))
OR
is there another way to pass the config
dict for the new parameters to LangchainProcessor?