langchain icon indicating copy to clipboard operation
langchain copied to clipboard

add dynamodb based memory class

Open KBB99 opened this issue 2 years ago • 22 comments

Hello, I have created a serverless memory class that uses DynamoDB that worked successfully on my local machine. However when I was trying to add boto3 as a dependency using poetry I ran into some issues. If someone could help me with that it would be great! The rest of the class is solid and working as expected and can be tested as explained in the serverless memory documentation I added.

KBB99 avatar Feb 15 '23 03:02 KBB99

are all these files suppose to be here? theres conversation/memory and then three more files in serverless/

Good point I've removed all the unnecessary files and changes. Please let me know if anything else is unclear.

KBB99 avatar Feb 16 '23 15:02 KBB99

Any updates on this? Looking forward to some DynamodDB based memory class which will be especially useful when using in combination with Langchain running on AWS Lambda.

hajowieland avatar Mar 21 '23 17:03 hajowieland

I was actually thinking to implement something of a ~"DBMemory" module where the memory would be stored in an external database (whether serverless or not is a different story) and read it from there at every message. In my case I need it for Redis.

If that is implemented to be general enough (like it was recommended above), then later any database could be added as a backend, including serverless ones like DynamoDB .

This is all needed to be able to have an application which handles multiple chat sessions in parallel and also the application can be scaled out as the chat sessions' state is stored in an external database (like Redis or DynamoDB) independently from the application. See a similar use-case described here.

@hwchase17, are you okey with me making a PR for such a new memory module - initially supporting Redis.

Then potentially this DynamoDB one could be just a subset of that.

zoltan-fedor avatar Mar 21 '23 20:03 zoltan-fedor

@zoltan-fedor I added what was recommend above as you can see here. I made a ServerlessMemory class that is then used with the DynamoDB buffer. You should therefore be able to create a Redis buffer (with read, write, and clear operations) and just plug it in as the ServerlessMemory buffer.

If it essentially allows any database, including SQL and No-SQL, then we can change the name to something like DatabaseMemory.

KBB99 avatar Mar 21 '23 21:03 KBB99

Thanks @KBB99, I understand, but I would rather use the standard ConversationBufferMemory and ConversationStringBuffer with a buffer stored in a database than a newly created ServerlessMemory, which - in case of non-serverless databases - is not serverless. :-)

I don't think there is a need for a separate ServerlessMemory, just the current ConversationBufferMemory and ConversationStringBuffer need to be extended to write into a database ABC class, which then can be subclassed for DynamoDB, Redis or any other database.

If you can make this PR to be more general and integrate it into the current ConversationBufferMemory and ConversationStringBuffer methods, abstract out the database class, then I think that could work.

zoltan-fedor avatar Mar 21 '23 21:03 zoltan-fedor

Thanks @KBB99, I understand, but I would rather use the standard ConversationBufferMemory and ConversationStringBuffer with a buffer stored in a database than a newly created ServerlessMemory, which - in case of non-serverless databases - is not serverless. :-)

I don't think there is a need for a separate ServerlessMemory, just the current ConversationBufferMemory and ConversationStringBuffer need to be extended to write into a database ABC class, which then can be subclassed for DynamoDB, Redis or any other database.

If you can make this PR to be more general and integrate it into the current ConversationBufferMemory and ConversationStringBuffer methods, abstract out the database class, then I think that could work.

Oh I see what you mean. I will dig deeper and try to implement.

KBB99 avatar Mar 21 '23 21:03 KBB99

I am throwing together an implementation like that myself - as I need Redis support urgently, which I can make into a PR and then you can take a look or decide to use that for DynamoDB. If you want I can include your DynamoDB database connector code in that, so it should work out of the box for you.

zoltan-fedor avatar Mar 21 '23 21:03 zoltan-fedor

@zoltan-fedor @KBB99 Agree with some of the comments that you have. Here is a proposal I had in mind. Let me know what you think, this should work for both DynamoDB or Redis.

Current Design

class ChatMessageHistory(BaseModel):
    messages: List[BaseMessage] = Field(default_factory=list)

    def add_user_message(self, message: str) -> None:
        self.messages.append(HumanMessage(content=message))

    def add_ai_message(self, message: str) -> None:
        self.messages.append(AIMessage(content=message))

    def clear(self) -> None:
        self.messages = []

The current design relies on the ChatMessageHistory to store the conversation history in the messages which is a simple list.

Proposed Design

Introduce a new interface for a message store, that provides methods similar to ChatMessageHistory but allows implementors to replace the default primitive list with a data store, for example in this case backing a NoSQL database.

from abc import ABC, abstractmethod

class MessageStore(ABC):
    session_id: str
    
    @abstractmethod
    def read(self) -> List[BaseMessage]:
        ...
    @abstractmethod
    def add_user_message(self, message: HumanMessage) -> None:
        ...
    @abstractmethod
    def add_ai_message(self, message: AIMessage) -> None:
        ...
    @abstractmethod
    def clear(self) -> None:
        ...

Change the implementation in ChatMessageHistory to refer to a MessageStore rather than messages as list. Update the methods that refer to the messages directly to refer the corresponding methods in the store.

class ChatMessageHistory(BaseModel):
    messages: List[BaseMessage]
    store: MessageStore = Field(default_factory=DefaultMessageStore)

    @property
    def messages(self) -> List[BaseMessage]:
        self.store.read()    

    def add_user_message(self, message: str) -> None:
        self.store.add_user_message(HumanMessage(content=message))

    def add_ai_message(self, message: str) -> None:
        self.store.add_ai_message(AIMessage(content=message))

    def clear(self) -> None:
        self.store.clear()

To continue using the list as memory for messages, create a DefaultMessageStore.

class DefaultMessageStore(MessageStore):
    messages: List[BaseMessage] = Field(default_factory=list)
    
    def read(self):
        return self.messages
    
    def add_user_message(self, message: HumanMessage):
        self.messages.append(message)
        
    def add_ai_message(self, message: AIMessage):
        self.messages.append(message)   
        
    def clear(self):
        self.messages = []

Existing memory classes, for example ConversationBufferMemory will continue to work without any more changes. Users who want to use a persistent store with ConversationBufferMemory can just create a new store, and ChatMessageHistory with this new store.

Here is an example implementation that serializes and stores messages in DynamoDB.

class DynamoDBMessageStore(MessageStore):
    def __init__(self, table_name, session_id: str):
        client = boto3.resource("dynamodb")
        self.table = client.Table(table_name)
        self.session_id = session_id

    def _read(self) -> List[Dict]:
        try: 
            response = self.table.get_item(
                Key={'SessionId': self.session_id}
            )
        except ClientError as error:
            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                logger.warn("No record found with session id: %s", self.session_id)
            else:
                logger.error(error)
        
        if response and 'Item' in response:
            return response['Item']['History']
        
        return []

    def read(self) -> List[BaseMessage]:
        items = self._read()
        messages = messages_from_dict(items)
        return messages
    
    def add_user_message(self, message: HumanMessage) -> None:
        self._add_message(message)

    def add_ai_message(self, message: AIMessage) -> None:
       self._add_message(message)

    def _add_message(self, message: BaseMessage) -> None:
        messages = self._read()
        _message = _message_to_dict(message)
        messages.append(_message)
        self._write(self.session_id, messages)

    def _write(self, session_id: str, messages: List[Dict]):
        try:
            self.table.put_item(
                Item={
                    'SessionId': session_id,
                    'History': messages
                }
            )
        except ClientError as err:
            logger.error(err)
    
    def clear(self):
        try:
            self.table.delete_item(
                Key={'SessionId': self.session_id}
            )
        except ClientError as err:
            logger.error(err)

Here is a new ChatMessageHistory that uses the DynamoDB store.

store = DynamoDBMessageStore(
    table_name="table-name",
    session_id="session-id"
)

message_store = ChatMessageHistory(store=store)
memory = ConversationBufferMemory(chat_memory=message_store)

3coins avatar Mar 21 '23 21:03 3coins

@3coins, I mostly agree and started to implement a similar pattern.

The part where I differ is that the existing ConversationBufferMemory and ConversationStringBufferMemory should be extended so it supports both the current operation (storing the session in memory) and also a database (in your implementation called MessageStore) as a backend. This way we don't need to ask the users to implement their own BufferMemory for the database they want to use.

This way all new type of memory in the future - they should all automatically work with any database implementation that follows that ABC.

EDIT: I have read your proposal again and it seems that you might be addressing this by "Users who want to use a persistent store with ConversationBufferMemory can just create a new store, and ChatMessageHistory with this new store."

zoltan-fedor avatar Mar 21 '23 22:03 zoltan-fedor

@zoltan-fedor

EDIT: I have read your proposal again and it seems that you might be addressing this by "Users who want to use a persistent store with ConversationBufferMemory can just create a new store, and ChatMessageHistory with this new store."

Correct, with the way ChatMessageHistory was introduced in the recent changes, the intent seemed to be that history object was separated from the actual buffer class. What I am proposing is an extension of that, and separating the actual store from the ChatMessageHistory; it keeps the existing buffers behave as is, but allows to change the backend storage by just passing the right store, favoring composition over inheritance.With this approach, you can interchangaebly use stores and buffer classes, without recreating new extensions of memory classes.

3coins avatar Mar 21 '23 22:03 3coins

@zoltan-fedor I also want to add, that we can also add common store classes, so users don't have to create them, for example RedisMessageStore, DynamoDBMessageStore etc.

3coins avatar Mar 21 '23 22:03 3coins

@3coins, then this is VERY close to what I have started to implement already. Let me change the class names and some other things and make a PR about this.

zoltan-fedor avatar Mar 21 '23 22:03 zoltan-fedor

@3coins, Sorry, one thing. The proposed MessageStore class is per chat session, which will not be a good idea if we are opening a separate database connection for each session, rather we should have one db connection and all sessions should be using the same, so I would propose to create a MessageDB on top of the MessageStore, which would hold the database connection object and perform the base read/write/delete operation.

Your proposal:

from abc import ABC, abstractmethod

class MessageStore(ABC):
    session_id: str
    
    @abstractmethod
    def read(self) -> List[BaseMessage]:
        ...
    @abstractmethod
    def add_user_message(self, message: HumanMessage) -> None:
        ...
    @abstractmethod
    def add_ai_message(self, message: AIMessage) -> None:
        ...
    @abstractmethod
    def clear(self) -> None:
        ...

My proposal:

from abc import ABC, abstractmethod

class MessageDB(ABC):
    @abstractmethod
    def __init__(self, table_name) -> None:
        """Initialize the database."""

    @abstractmethod
    def read(self, session_id: str) -> str:
        ...

    @abstractmethod
    def append(self, session_id: str, message: Union[HumanMessage, AIMessage]) -> None:
        ...

    @abstractmethod
    def clear(self, session_id: str) -> None:
        ...

class MessageStore(ABC):
    session_id: str
    message_db: Optional[MessageDB] = None
    
    @abstractmethod
    def read(self) -> List[BaseMessage]:
        ...

    @abstractmethod
    def add_user_message(self, message: HumanMessage) -> None:
        ...
    @abstractmethod
    def add_ai_message(self, message: AIMessage) -> None:
        ...
    @abstractmethod
    def clear(self) -> None:
        ...

zoltan-fedor avatar Mar 21 '23 22:03 zoltan-fedor

Updated the implementation to that suggested by @3coins . @zoltan-fedor if you want you can try to further improve the implementation by adding the MessageDB.

Shout out to 3coins for providing the updated implementation!

KBB99 avatar Mar 22 '23 00:03 KBB99

@zoltan-fedor I agree that database connection should be cached, which can be handled outside of the message store, my suggestion is to refrain from adding another interface layer to handle this though, and have different implementations handle it on their own. There are libraries, for example SQLAlchemy that already provide an abstraction for managing connections, so adding another layer is kind of redundant in those cases. Hope that helps!

3coins avatar Mar 22 '23 00:03 3coins

@KBB99 You current changes doesn't have all the changes suggested in my previous comments. If you follow the design proposal, I pretty much have all the code changes required and is smaller in size than the ones from the langchain-aws-template repo.

3coins avatar Mar 22 '23 00:03 3coins

@3coins, I have included the database connection abstraction, so it can be cached and not recreated for each session (as the MessageStore is session-specific, the database connection can be shared across session). I know that most db libraries would handle that automatically, but I am not sure that all will, hence I thought it made sense separating it out not knowing that what future database clients will be added.

See my implementation at https://github.com/hwchase17/langchain/compare/master...zoltan-fedor:langchain:feature-memory-buffer-db

If you really want, I can remove the db abstraction.

Example use:

from langchain.memory.message_stores import DBMessageStore
from langchain.memory.message_stores.message_db import RedisMessageDB
from langchain.memory.chat_memory import ChatMessageHistory
redis_message_db = RedisMessageDB(url='redis://:PASSWORD@localhost:6379/0')  # this can be reused between session
store = DBMessageStore(message_db=redis_message_db, session_id='my-session')  # this is session-specific
message_store = ChatMessageHistory(store=store)
memory = ConversationBufferMemory(memory_key="chat_history", chat_memory=message_store, return_messages=True)

I have tested it locally on Redis with multiple sessions running in parallel, it worked well.

zoltan-fedor avatar Mar 22 '23 00:03 zoltan-fedor

@zoltan-fedor Thanks for making the updates. I still feel that having DB class is adding too many layers that shouldn't be part of langchain, but would like to get opinion from others as well.

@hwchase17 I know you must be very busy, can you help review this change, or help us get in touch with other community members who are familiar with the codebase. Having persistent messages is a common use case that many applications will need, so moving this forward will help a lot of users. Thanks for all the help 🙏 .

3coins avatar Mar 22 '23 03:03 3coins

Thanks @3coins.

Let's see what @hwchase17 says.

If he prefers then I am okey with collapsing the DB class into the per-session message store and relying purely on the db drivers to do the right thing and reuse db connections instead of recreating them for each session & message.

zoltan-fedor avatar Mar 22 '23 03:03 zoltan-fedor

I have prettied up my code and made a PR out of it, so it is easier to review and comment on it. See https://github.com/hwchase17/langchain/pull/1895

zoltan-fedor avatar Mar 22 '23 18:03 zoltan-fedor

Sounds good zoltan. I will test your implementation with DynamoDB and let you know how it goes.

KBB99 avatar Mar 22 '23 22:03 KBB99

@KBB99, I just pushed a big refactor, see https://github.com/hwchase17/langchain/pull/1895. So if you are testing, please pull the latest version. I have tested the Redis side and that looks good, but couldn't test the DynamoDB one.

zoltan-fedor avatar Mar 22 '23 23:03 zoltan-fedor

Closing pull request as 1895 was merged. Thank you for your help @zoltan-fedor @3coins and @hwchase17 !

KBB99 avatar Mar 29 '23 20:03 KBB99