Question: is upsert possible with this library.
First Check
- [X] I added a very descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the SQLModel documentation, with the integrated search.
- [X] I already searched in Google "How to X in SQLModel" and didn't find any information.
- [X] I already read and followed all the tutorial in the docs and didn't find an answer.
- [X] I already checked if it is not related to SQLModel but to Pydantic.
- [X] I already checked if it is not related to SQLModel but to SQLAlchemy.
Commit to Help
- [X] I commit to help with one of those options 👆
Example Code
from typing import Optional
from sqlmodel import Field, SQLModel
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
secret_name: str
hero_1 = Hero(id=7, name="Deadpond", secret_name="Dive Wilson")
Description
Let say I have an instance of a model
I want to upsert hero_1 into the database: insert if this id does not exist yet, otherwise update. Is there a pattern to do that?
Operating System
Linux, macOS
Operating System Details
No response
SQLModel Version
0.0.4
Python Version
3.9.6
Additional Context
No response
Yes, it is You can find an example of implementation (combined with FastAPI) here
@pcorbel thanks, that's a way of working around the problem, but not very performant and also not efficient to have to write this code for each insert..
SQLITE and PostgreSQL both have upsert and there has been a "INSERT OR REPLACE" in sqlite3 for a long time.
Of course it's still early in the game for this library, but it would be nice if there was an upsert parameter to add.
@mborus this is one way of doing a bulk_upsert, though it has the internal overhead of requiring a select for each record. (maybe someone can improve on that?) It could also be improved with iterators instead of lists.
It will work on any model object representing a table and any list of column attributes/names you designate to match against. The benefit of this version is you don't have to rewrite if for different situations, it should work universally. Uncomment the print statements to verify it is working as required.
My vote is for a dedicated upsert also, but in the mean time...
from typing import Union, Sequence, List, Dict
from sqlmodel import Session, SQLModel, and_, create_engine, or_, select
def bulk_upsert(data: Union[Dict, Sequence[Dict]], model: str, columns: Union[str, List[str]]) -> None:
""" Given a dict record or list of records, update existing records on specified columns, or
insert new record. This works by building clauses and reducing them into the statement.
Usage:
data = {"name": John, "ticket": "A95134"}
# or
data = [{"name": John, "ticket": "A95134"}, {"name": Jane, "ticket": "S87221"}]"
# perform upsert matching on a single column
bulk_upsert(data, "Entries", "ticket")
# perform upsert matching on multiple columns
bulk_upsert(data, "Entries", ["name", "ticket"])
Parameters:
:param data: Union[Dict, Sequence[Dict]]: A dictionary or list of dictionary representing
records in a table.
:param model: str: The name of the SQLModel representing a table.
:param columns: Union[str, List[str]]: Column(s) to match on for upsert.
:return: None: Takes internal action; bulk upsert.
"""
# ensure listyness
if isinstance(data, Dict):
data = [data]
if isinstance(columns, str):
columns = [columns]
# open a session with the database engine
with Session(engine) as session:
# get specified model
obj = getattr(models, model)
for record in data:
# get obj columns and designate matches
match_on = [getattr(obj, col) == record.get(col) for col in columns]
# reduce clauses into statement
statement = select(obj).where(and_(*match_on))
# check for existing record
upsert = session.exec(statement).first()
# prepare the upsert
if upsert:
_ = [setattr(upsert, key, record[key]) for key in record]
# print(f"Upserting {upsert.id} with {record}")
else:
upsert = obj(**record)
# print(f"Inserting {record}")
# add to the session
session.add(upsert)
# commit session
session.commit()
Relevant SO discussion concerning SQLAlchemy's facility to accomplish upserts. How to do an upsert with SqlAlchemy?
Another relevant SO answer in the same question. This one is interesting because it adds functionality to the model at time of definition. I imagine building something like this that is used as a base model would make the behavior available by default for all models. https://stackoverflow.com/a/67969355/1663382
I implemented something that allows modifying the SQL on the go:
from uuid import UUID
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, BigInteger, ForeignKey
from sqlmodel import Field, SQLModel, UniqueConstraint, create_engine, Session, select, delete, insert
class Files(SQLModel, table=True):
__table_args__ = dict(info=dict(magic_insert=True))
id: Optional[int] = Field(default=None, sa_column=Column(BigInteger(), primary_key=True, autoincrement=True))
hash: str = Field(unique=True)
@compiles(Insert, "postgresql")
def visit_insert_postgresql(element, compiler, **kw):
sql: str = compiler.visit_insert(element, **kw)
magic_insert = element.entity_description["table"].info.get("magic_insert", False)
if magic_insert:
returning_idx = sql.find(" RETURNING")
if returning_idx != -1:
sql = sql[:returning_idx] + " ON CONFLICT DO NOTHING" + sql[returning_idx:]
else:
sql += " ON CONFLICT DO NOTHING"
print(f"visit_insert_postgresql: {sql}")
return sql
But this fails because of the RETURNS statement and I know absolutely nothing about postgres so I gave up and switched to manually writing my bulk insert statements. Note that the trick is to use __table_args__ and set the (unused?) info field to include the magic_insert boolean.
Might be useful for someone who wants to implement this properly 🤷🏻♂️ The 'upsert' or 'insert if not exists' operations seem extremely fundamental to me, so too bad they are missing. It would not be possible for me to do a select for 10 million rows, just to know if they were already inserted...
i have a system that supports sqlite and postgresql databases. i started with the simple select, if None: session.add, commit flow. as load went up the race condition where the row would be inserted by another request in between the select and the commit began to cause errors.
a solution that works well for me is:
- insert with "on conflict ignore"
- select with "for update"
- update
- commit
this is slightly less performant than doing an on_conflict_update, but for my scenario it's a good trade-off, ie. the cost of the additional update in trade for what i view as simpler code that is also less database-type dependent.
note that i'm using AsyncSession, but this should work with sync Session as well.
# db.py
from sqlalchemy.dialects import postgresql
from sqlmodel import Field, SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
async def insert_if_not_exists(session: AsyncSession, model: SQLModel) -> bool:
"""
Inserts the provided record if a row with the same primary key(s) does already exist in the table.
Returns True if the record was inserted, False if it already existed.
"""
# the postgresql.insert function is used to generate an INSERT statement with an ON CONFLICT DO NOTHING clause.
# note that sqlite also supports ON CONFLICT DO NOTHING, so this works with both database types.
statement = (
postgresql.insert(model.__class__).values(**model.model_dump(exclude_unset=True)).on_conflict_do_nothing()
)
conn = await session.connection()
result = await conn.execute(statement)
return result.rowcount > 0
class User(SQLModel, table=True):
user_id: str = Field(primary_key=True)
name: str
example usage for upsert equivalent behavior:
async def add_or_update_user(
session: AsyncSession,
user_id: str,
user_name: str
) -> None:
inserted = await db.insert_if_not_exists(session, db.User(user_id=user_id, name=user_name))
if inserted:
return await session.commit()
user = (
await session.exec(select(db.User).where(db.User.user_id == user_id).with_for_update())
).one()
user.name = user_name
session.add(user)
await session.commit()