beanie
beanie copied to clipboard
Using transactions in order to ensure data consistency
Hey, First of all, I wanted to say thank you very much for all of your hard work on this library! It's great! :blush:
Problem
I want to implement a system like this:
Domain Models
from typing import List, Optional
from pydantic import BaseModel
class PostDetails(BaseModel):
slug: str
title: str
description: Optional[str] = None
body: str
tags: List[str] = []
class Post(PostDetails):
_id: str
class CategoryDetails(BaseModel):
slug: str
title: str
description: Optional[str] = None
class Category(CategoryDetails):
_id: str
Beanie Implementation
from typing import AsyncIterable, List, Optional
from beanie import Document, Indexed
from bson import ObjectId
from pymongo.client_session import ClientSession
IndexedStrField = Indexed(str, unique=True)
class PostDocument(Document):
slug: IndexedStrField # Question 1
title: str
description: Optional[str]
body: str
tags: List[str]
categories: List[ObjectId]
def to_post_model(self) -> Post:
return Post(
_id=str(self.id),
slug=self.slug,
title=self.title,
description=self.description,
body=self.body,
tags=self.tags,
)
class CategoryDocument(Document):
slug: IndexedStrField
title: str
description: Optional[str]
parent_category: Optional[ObjectId]
def to_category_model(self) -> Category:
return Category(
_id=str(self.id),
slug=self.slug,
title=self.title,
description=self.description,
)
async def get_subcategories_recursively(
self, session: Optional[ClientSession] = None
) -> AsyncIterable["CategoryDocument"]:
async for subcategory in CategoryDocument.find(
CategoryDocument.parent_category == self.id, session=session
):
yield subcategory
async for subsubcategory in subcategory.get_subcategories_recursively(
session
):
yield subsubcategory
async def get_posts(
self, session: Optional[ClientSession] = None
) -> AsyncIterable[PostDocument]:
async for post in PostDocument.find(
PostDocument.categories == self.id, session=session
): # Question 2
yield post
async def delete_all_posts(self, session: Optional[ClientSession] = None) -> None:
async for post in self.get_posts(session):
await post.delete(session=session)
async def delete_recursively(self, session: Optional[ClientSession] = None) -> None:
await self.delete_all_posts(session)
async for subcaregory in self.get_subcategories_recursively(session):
await subcaregory.delete_all_posts(session)
await subcaregory.delete(session=session)
Naive Usage
category: Category = ...
await category.delete_recursively()
Questions
- About
Question 1
: mymypy
is not satisfied with this statement. do you know what should I do in order to solve this? - About
Question 2
: is it going to query all of the posts containingcategory.id
in theircategories
field correctly? -
My main question: I want to make my
await category.delete_recursively()
is atomic, in order to ensure data consistency. I thought about transactions. What should I do?
Possible Solution
will it work for me?
client: AsyncIOMotorClient = ... # as declared before
async with await client.start_session() as s:
async with s.start_transaction():
await category.delete_recursively(s)
And another question, is it necessary to pass the session to all of these methods?
Thank you very very much! :smile:
Hi, I would love to know if the method I presented is correct, because if so, I think it can be used by a lot of people. Thank you very much :)
Hey, Sorry. I missed your issue.
- Unfortunately I dnt know. MyPy is tricky and I spent 50% of the whole development time fixing issues with it, but still have no idea about the nature of some of them :)
- It looks correctly
- Yup, it should work as you wrote. I plan to add a transaction context manager to the document class to be able to code it in a less verbose way, but there are some tricks with it. Also yup. for now you should pass session to the each method. I dnt know, if I have to add autodecetor for the sessions as with it it would be impossible to run queries inside session context manager without using this session. But it would be comfier, do agree
Thank you, @roman-right! I implemented a context manager like this:
class MongoClient:
_client: AsyncIOMotorClient
_database: str
def __init__(self, connection_string: str, database: str) -> None:
self._client = AsyncIOMotorClient(connection_string)
self._database = database
async def initialize(self) -> None:
await init_beanie(database=self._client[self._database], document_models=[PostDocument, CategoryDocument])
@asynccontextmanager
async def start_transaction(self) -> AsyncIterator[MongoRepository]:
async with await self._client.start_session() as session:
async with session.start_transaction():
yield MongoRepository(session)
where MongoRepository
is just a class that uses beanie
:
class MongoRepository(Repository):
_session: ClientSession
def __init__(self, session: ClientSession):
self._session = session
async def create_post(self, details: PostDetails) -> Post:
doc = PostDocument(
slug=details.slug,
title=details.title,
description=details.description,
body=details.body,
tags=details.tags,
categories=[],
)
await doc.insert(session=self._session)
return doc.to_post_model()
async def get_all_posts(self) -> AsyncIterable[Post]:
async for doc in PostDocument.find_all(session=self._session):
yield doc.to_post_model()
# more methods...
and usage via
client = MongoClient("...", "...")
await client.initialize()
async with client.start_transaction() as repo:
print(await repo.get_all_posts())
# more operations...
what do you say?
It looks neat. Do you control somehow when you stop the transaction? When endpoint returns the result or it works during another object lifetime?
@mr-blue-sky Your solution looks very neat. I am looking for ways to 'dynamically' append operations to a session/transaction in MongoDB.
Wonder how do you close the transaction when all operations are complete?
This issue is stale because it has been open 30 days with no activity.
This issue was closed because it has been stalled for 14 days with no activity.
I'd like to ping this back up - @mr-blue-sky I really dig this approach - any downsides you've encountered?