pydantic-ai
pydantic-ai copied to clipboard
Ability to Persist Messages in External Stores
I am wondering if there is a plan to add ability to persist messages in remote stores like Redis or document stores instead of using plain memory to cache the messages [1]. There are scenarios where the application memory alone is not sufficient and we need to cache the state in a remote store and pick thing up later.
If this is already possible and I just have to add a plugin or dependency to do this, please let me know as well.
Thanks.
[1] https://ai.pydantic.dev/message-history/
I think this is something that could be supported in user code for the most part, though we'd be happy to consider support for loading externally stored message history.
Thank you Sydney for sharing your thoughts. I appreciate it.
I think it would be good to have a sort of interface or abstract parent class that users can implement or inherit from and extend to have their own persistent store (Redis, MongoDB, Postgres, MySQL etc) to cache these messages. This can then be injected in and used internally to cache the message history.
This will help with checkpointing and caching in the event of errors or restarts and we dont lose all the messages in the history so far. I will also simplify things so that the user does not have to spend too much energy to figure out this persistence implementation.
Those were my thoughts behind this suggestion.
I am open to collaborating to bring this into the framework
Hi @izzyacademy I agree we need this.
I think we need an ABC, with implementations for:
- memory
- file
- sqlite
- postgres
and an example of a custom implementation. I would need very strong evidence that people want another specific database before we implemented it.
I think it's best if I hav a crack at a first implementation of this, since I'll know how I want it to work.
@samuelcolvin thanks for the update. I will wait for your initial draft and then I will share my feedback. After you start with the memory and SQL Lite, I can work on Redis and PostgreSQL.
I believe there will be a strong need for Redis as the synchronous and async libraries have 45M and 800K downloads per month respectively. I will check back later on this.
Hi @izzyacademy @samuelcolvin Even we are looking for similar type of implementation, Redis is preferred! for short-term memory, user-pref, user-details, etc.
Crew AI has similar - https://docs.crewai.com/concepts/memory (short-term memory, long-term memory, entity memory, and contextual memory)
I saw mem0, but it requires LLM, did not fit all our use-cases - https://github.com/mem0ai/mem0
@sandeep540 thanks for your comments. I agree. Redis is a great key-value store for this use case.
I recently came across an interesting video by Adam Lucek about implementing different memory types that referenced two academic papers:
- Cognitive Architectures for Language Agents, 2024
- A Survey on the Memory Mechanism of Large Language Model based Agents
The types of memory that were implemented in that video are:
- Working Memory - Current conversation and immediate context
- Episodic Memory - Historical experiences and their takeaways
- Semantic Memory - Knowledge context and factual grounding
- Procedural Memory - The "rules" and "skills" for interaction
Could such memory types be considered for the API/ABC/implementation you are planning?
TBH, I'm not entirely sure what you consider as suitable "memory" to be integrated into PydanticAI, but I'll share some additional references to "long-term" memory information/tools:
Long-Term Memory Support in LangGraph mem0 Memory layer for your AI apps Zep Memory Foundation For Your AI Stack
Like the other external integrations, I think this could be [pydantic-ai-persistence] and we could integrate different data stores that implement the ABC.
Here are my initial thoughts based on the agent or graph workflow interactions with the persistent store
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any
@dataclass
class PersistentStore(ABC):
"""A persistent store ABC """
conversation_id: str = None
"""This unique string is used to track different interactions with the persistent store.
Generate unique one is None is specfied
"""
@abstractmethod
async def append_entry(self, entry: Any):
"""Adds a record to the end of the list"""
@abstractmethod
async def append_entries(self, entries: list[Any]):
"""Adds records to the end of the list"""
@abstractmethod
async def prepend_entry(self, entry: Any):
"""Adds a record to the beginning of the list"""
@abstractmethod
async def get_all_entries(self) -> list[Any]:
"""Retrieves all the entries for this conversation"""
@abstractmethod
async def get_entries(self, start: int, end: int) -> list[Any]:
"""Retrieves a subset of the entries for this conversation"""
@abstractmethod
async def get_first_entry(self) -> Any:
"""Retrieves the first entry for this conversation"""
@abstractmethod
async def get_last_entry(self) -> Any:
"""Retrieves the last entry for this conversation"""
@abstractmethod
async def clear(self):
"""Wipes the list clean to start from an empty list"""
@abstractmethod
async def remove_first_entry(self) -> Any:
"""Removes the first entry for this conversation"""
@abstractmethod
async def remove_last_entry(self) -> Any:
"""Removes the last entry for this conversation"""
@abstractmethod
async def remove_entries(self, start: int, end: int) -> list[Any]:
"""Removes a subset of the entries for this conversation"""
@abstractmethod
async def count(self) -> int:
"""Returns the total num of messages for this conversation"""
class Memory(PersistentStore):
pass
class PostgreSQLPersistence(PersistentStore):
pass
class RedisPersistent(PersistentStore):
pass
Just be careful not to end with a library that looks more like a platform :)
I think it should be designed in relation with https://github.com/pydantic/pydantic-ai/issues/695
If not technically then at least with best practice how to maintain the persistency when using graphs,
For example Langchain best practice is to push al messages into the graph's state, then there's no requirement to store messages on the agent level any longer.
Another thing to consider is the fact that messages may be shared and modified via multiple agents.
Mem0 uses two components to represent memory: (1) short-term memory, and (2) long-term memory. Vector database for the former and traditional database for the latter.
Google, on the other hand, uses three components: (1) core, (2) contextual (long-term) memory, and (3) persistent memory, as outlined in their Titans: Learning to Memorize at Test Time paper. Why three you might ask?
Inspired by human long-term memory system, we design this memory module so an event that violates the expectations (being surprising) is more memorable
To begin persisting messages in external stores, you need to ask yourself:
- How many memory components do you want to use? One (consider Message History), two (consider mem0 architecture), three (consider Titans architecture)
- Which datastores do you want to use? Traditional (consider libSQL or Postgres for local datastore), vector (consider Qdrant, Chroma, FAISS)
Disclaimer: I am not affiliated with these companies but I do have a professional opinion
Here are my initial thoughts based on the agent or graph workflow interactions with the persistent store
from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any
@dataclass class PersistentStore(ABC): """A persistent store ABC """
conversation_id: str = None """This unique string is used to track different interactions with the persistent store. Generate unique one is None is specfied """ @abstractmethod async def append_entry(self, entry: Any): """Adds a record to the end of the list""" ...
I see similar AgentMemory usage with different MemoryDB implementations in another Agentic framework - Agno (rebranded Phidata):
Hi, any updates on this work? I will be implementing memory strategies with our pydantic-ai agents soon and I don't want to reinvent the wheel if there's already a good solution available.
I am currently working on a Redis based state persistance plugin, I already finished the implementation and I just need to write and pass some tests to make it more reliable :)
https://github.com/opsie-dev/pydantic-graph-persistance-redis
I would gladly develop more if needed, also I would love to have some feedback on the initial implementation. Feel free to ping me when message history spec would be more fixed and i will add implementation for that as well :)
Hi @samuelcolvin,
Like @Faylixe, I have created a pydantic-graph-persistence-postgresql (as of now, private) repository that hosts a Python package to help get pydantic-graphs persisted on PostgreSQL.
I would like to understand where such integrations should live. Is the Pydantic (AI) team open to having a similar design/separation logic of pydantic-graph to that of pydantic-ai in that pydantic-graph can have, e.g., a "slim" version with only in-memory and file-based persistence implemented, whereas the external stores are supported through optional dependencies such as redis and/or postgresql? If yes, how can one start creating PRs and what should be the acceptence criteria to get the code into pydantic-graph (e.g., examples, documentation, testing)?
Looking forward to your guidance/suggestions...
Cc: @pinodeca and @AbeOmor
@aytekinar I think for us to upstream work on graph persistence, I'd want it to be compatible with the new beta API. The problem is that having parallel node execution makes it really hard to do graph persistence right.
Also, the most common/important use case for graph persistence is durable execution, which we now have support for through integrations with third-party tools like Temporal, DBOS, etc. Note that durable execution works well for both handling transient errors but also for patterns like human-in-the-loop.
The other obvious use case I'm aware of that isn't as straightforward to handle by just "throwing durable execution at the problem" is resuming a graph run from part-way through. (Though I will note that I believe at least temporal has machinery for this, though I am not very familiar with it; I suspect other durable execution providers have related functionality as well.) This is the strongest argument in favor of adding some form of more advanced graph/agent run persistence but I fear that trying to do that well would end up with us building a pydantic-ai-specific poor excuse for a durable execution framework, which I'm not keen to do.
In short, I think that to justify additional graph-run-persistence work, we would need to:
- Identify use cases that are not better-addressed through the use of existing durable execution frameworks
- Have a proposal for what it looks like to persist graph run state when the graph run involves parallel step execution
Thanks @dmontagu for your thoughts. I don't personally see a groundswelling of demand from the community for using a 'durable execution' framework here (but maybe I'm misreading it). Perhaps this push is driven by a strategic partnership for Pydantic Logfire? That's, of course, fine and healthy as OSS projects need to be funded.
LangGraph checkpoints may be a relevant comparison to what's being asked for here. Many users likely have existing queues/stores that they are comfortable with (e.g. Postgres, DynamoDB, Redis, etc.) and would prefer to leverage those vs. adopting new technology.
@sirianni It actually isn't related to Logfire, it's probably a wash but if anything I'd guess durable execution slightly reduces the need for observability. Most of the people I have spoken to about this at conferences etc. have just want to use persistence or "checkpoint" like functionality as a way to deal with application failures — network requests failing, transient db issues, etc. — and durable execution frameworks are the "right" tool for that. Note that several of these tools are built specifically on top of postgres (e.g., DBOS, https://github.com/earendil-works/absurd, etc.).
I am not opposed to offering some form of checkpointing functionality but it's really complicated to get it right when parallel execution is involved without massively sacrificing on ergonomics, and I just think durable execution is a fairly clear-cut solution for this that I expect is designed/maintained by full (third-party) teams to be more robust than something we tack onto the framework and only "battle test" against pydantic-ai use cases.
To be clear, I'm totally open to considering approaches to persistence/"snapshotting" graph execution state that do this successfully, I just think durable execution is the practical solution to most practical end-user challenges in the short (and possibly long) term.
The main challenge I see with making graph persistence work is — look at the implementation of pydantic_graph.beta.graph._GraphIterator.iter_graph and the state of that object (specifically, state, active_tasks and active_reducers) and think about how multiple tasks could be in flight that will modify the state object at different points in their execution. When and how do I snapshot things in a way that guarantees consistent execution behavior downstream of a given point in execution? We can snapshot the active_reducers and active_tasks, but really for consistent behavior, we need to snapshot precisely what is going on inside the active_tasks — what lines of code have we reached and what updates to state have been performed, etc.? If a given task makes three changes to state, how many of them have happened by the time we reach a given point? Even if a given task makes only one change to state, how do we ensure that change is executed precisely once when I am resuming from another snapshot? Like, when I took the last snapshot, had active task 1 already run far enough to make its modification to the state value, or not?
I thought about this hard when implementing — one option was to make it so that state can only be updated inside of handlers that are executed in serial (and tied to snapshotting). This has the downside of needing to add a bunch of fairly unintuitive structure to execution if you aren't making use of snapshotting (specifically, having some new mechanism for specifying state updates), which either ends up losing all type safety and looking more or less like what langgraph does for state updates with reducer function annotations etc., or ends up introducing even more complex types and type-checking boilerplate than the library already has, which is already butting up against ergonomics imo (though, and admittedly I'm biased, I think the tradeoff is currently well worth it, but I couldn't come up with a version that was snapshotting-friendly that didn't make me want to puke).
And I'll note that while I'm not familiar with all the durable execution frameworks out there, ensuring you get consistent execution is precisely the use case of temporal — the sandbox ensures deterministic execution even of async tasks and the execution of the full graph run is deterministic other than the "activities", which are basically auto-checkpointed in the temporal database — it basically nukes the problem of execution consistency here.
Hi @izzyacademy @Isopolito, for my use cases, I ended with something like this: https://github.com/pgalilea/memx/blob/main/examples/py_ai_example.py I tried to organize and package the utils in an extremely small library. The goal was to be extensible, framework-agnostic, and with sync/async support. I hope it helps!