ormar icon indicating copy to clipboard operation
ormar copied to clipboard

Specify connection to operation

Open zevisert opened this issue 8 months ago • 4 comments

Is your feature request related to a problem? Please describe. I can't work with ormar in concurrent contexts very well since databases^0.8, because databases has changed (my doing in https://github.com/encode/databases/pull/546) to not share connections across asyncio tasks. Databases will now create a new connection for each asyncio.Task. For me this is a problem when I want to do some concurrent work within a transaction.

Describe the solution you'd like A clear and concise description of what you want to happen.

I'd like to be able to specify a databases connection to use for a given operation. Something like this would be great:

  • Model.with(connection=connection).objects.get() or,
  • Model.objects.get(connection=connection) or,
  • and for instances too:
    • Model(...).with(connection=connection).save(), or
    • Model(...).save(connection=connection)

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

I don't have any alternative options right now at all.

Additional context Add any other context or screenshots about the feature request here.

The MVCE below shows this.

To run it:

  1. Run docker run -it --rm -e POSTGRES_PASSWORD=demo-1360 -p 5432:5432 postgres in one terminal
  2. Run python -m venv .demo-1360 && source .demo-1360/bin/activate
  3. Run pip install ormar==0.12.2 asyncpg && pip install databases==0.8.0 See endnote
  4. Save the file below as ormar-connection.py
  5. Then run python ormar-connection.py --concurrent (should error),
    • Running python ormar-connection.py won't error
Source for ormar-connection.py
import asyncio
import sys
import databases
import ormar
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine


DATABASE_URL = "postgresql+asyncpg://postgres:demo-1360@localhost:5432"

db = databases.Database(DATABASE_URL)
meta = sqlalchemy.MetaData()


class BaseMeta(ormar.ModelMeta):
    database = db
    metadata = meta


class Department(ormar.Model):
    class Meta(BaseMeta):
        tablename = "departments"

    id: int = ormar.Integer(primary_key=True)
    name: str = ormar.String(max_length=100)


class Course(ormar.Model):
    class Meta(BaseMeta):
        tablename = "courses"

    id: int = ormar.Integer(primary_key=True)
    name: str = ormar.String(max_length=100)
    department: Department | None = ormar.ForeignKey(Department)


async def main(parallel: bool):

    async with create_async_engine(DATABASE_URL).begin() as conn:
        await conn.run_sync(meta.drop_all)
        await conn.run_sync(meta.create_all)

    async with db:
        async with db.transaction():
            csse = await Department(
                id=1337,
                name="Computer Science & Software Engineering",
            ).save()

            courses = [
                ("Introduction to Computer Science", 101),
                ("Computer Architecture", 255),
                ("Algorithms and Data Structures:I", 225),
                ("Algorithms and Data Structures:II", 226),
                ("Operating Systems", 360),
                ("Database Systems", 370),
                ("Concurrent Programming and Distributed Systems", 461),
                ("Analysis of Algorithms", 425),
                ("Data Analysis and Pattern Recognition", 535),
            ]

            if parallel:
                async with asyncio.TaskGroup() as tasks:
                    for name, id in courses:
                        tasks.create_task(
                            Course(id=id, name=name, department=csse).save()
                        )
            else:
                for name, id in courses:
                    await Course(id=id, name=name, department=csse).save()

        print(await Course.objects.all())


asyncio.run(
    main(
        parallel=bool(
            {"-c", "--concurrent", "--concurrent=true"} & {arg.lower() for arg in sys.argv}
        )
    )
)

The gist is that by running this work in separate asyncio tasks, databases creates a new connection, and in the context of that new connection, there's no active transaction and since that csse department is not yet committed, we get a foreign key error.

# except from complete example in the file above
async with db:
    async with db.transaction():
        csse = await Department(id=1337, name="Computer Science & Software Engineering").save()
        courses = [ ... ]
        async with asyncio.TaskGroup() as tasks:
            for name, id in courses:
                tasks.create_task(Course(id=id, name=name, department=csse).save())

Here I'm just turning the coroutine from .save() into a task as a placeholder for a real world example where there's IO bound work to be done for each "course", and it'd be significantly faster to do that concurrently.

If I could just tell ormar which connection to use, this wouldn't be a problem.

Note: I'm filing this as an enhancement rather than a bug, since [email protected] requires databases<0.6.3 and [email protected] requires databases^0.7.0, and connection use across tasks is fixed in [email protected]; so currently ormar doesn't experience this, but it will

zevisert avatar Jun 19 '24 01:06 zevisert