flask-sqlalchemy
flask-sqlalchemy copied to clipboard
Multiple binds doesn't connect to external transaction in tests
I have a Flask app configured only with multiple binds using SQLALCHEMY_BINDS config option. The app runs fine, however, I am struggling to get external transaction to rollback in tests.
I have following setup:
# Config
SQLALCHEMY_BINDS = {
'db1': 'sqlite:////my.db'
}
# My model
class MyModel(db.model):
__bind_key__ = 'db1'
email = db.Column(db.String)
# Fixtures
@pytest.fixture(scope='session')
def app():
"""Session wide Flask app"""
return create_app()
@pytest.fixture(scope='session')
def db(app, request):
"""Session-wide test database."""
_db.app = app
_db.create_all()
yield _db
_db.drop_all()
# On windows close the file first.
os.close(app.config['TEST_DB_FD'])
os.unlink(app.config['TEST_DB'])
@pytest.fixture(scope='function')
def session(db, request):
"""Creates a new database session for a test."""
connection = db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection, binds={})
_session = db.create_scoped_session(options=options)
db.session = _session
yield _session
_session.remove()
transaction.rollback()
connection.close()
@pytest.fixture
def data(session):
session.add(MyModel(email='bob'))
session.add(MyModel(email='alice'))
session.comit()
After test is done, the models bob and alice are still in the database.
As discussed in #468 it appears there is an issue with joining to external transaction, very useful in tests. I tried the "hack":
class _dict(dict):
def __nonzero__(self):
return True
But still nothing. I also tried using binds={} and no binds and the behaviour is different but still rollback doesn't work. I ended up going through with the debugger and it looks like external transaction is not detected, so it creates a new one for my fixture and commits it to disk, instead of external transaction.
- sqlalhemy==1.2.14
- flask-sqlalchemy==2.3.2
After digging around the codebase it appears the following happens:
get_binds()returns:state.db.get_engine()ifbind_keyis present in themappersqlalchemy.orm.session.Session.get_bind()when nobind_keyis present or whenmapperisNone.
This means that although we can give external Connection object to bind parameter, for binds an Engine is returned, which then get's it's own connection, bypassing top level/external transaction. If binds is specified in options once again Engine is returned for every bind in SignallingSession. In order to avoid this, I had to:
- Create binds
dictwith mapper ->Connectionand pass it to options. - monkeypatch implemented
get_bindmethod to callSessionBase.get_bind()instead as SqlAlchemy supports binds internally and they are already passed in the__init__.
I ended up with the following fixture in pytest:
@pytest.fixture(scope='function')
def session(db, monkeypatch):
"""Creates a new database session for a test."""
# Replace get_bind with base implementation.
monkeypatch.setattr('flask_sqlalchemy.SignallingSession.get_bind', SessionBase.get_bind)
default_conn = db.engine.connect()
default_trans = default_conn.begin()
other_binds = {}
other_trans = {}
# create a mapping between table mapper and Connection object with root transaction.
for mapper, engine in db.get_binds().items():
conn = engine.connect()
other_binds[mapper] = conn
other_trans[mapper] = conn.begin()
options = dict(bind=default_conn, binds=other_binds)
_session = db.create_scoped_session(options=options)
db.session = _session
yield _session
_session.remove()
# Roll all transaction back.
for trans in list(other_trans.values()) + [default_trans]:
trans.rollback()
# Close all the connections.
for conn in list(other_binds.values()) + [default_conn]:
conn.close()
I have tested this locally and there seem to be no negative impact when patching out get_bind, is this needed in the codebase?
I've tried your solution and it works fine when you have just one table. But if I have two or more tables in that bind, I get the following error:
sqlalchemy.exc.InvalidRequestError: Session already has a Connection associated for the given Connection's Engine
This is the new model which is just a clone of yours:
class MyOtherModel(db.model):
__bind_key__ = 'db1'
email = db.Column(db.String)
And the test that fails:
def test_two_models(session):
row1 = MyModel(email='[email protected]')
row2 = MyOtherModel(email='[email protected]')
session.add(row1)
session.add(row2)
session.commit() # <-- It fails here
assert len(MyModel.query.all()) == 1
assert len(MyOtherModel.query.all()) == 1
Can you check if it fails for you too?
@MarcAragones No we have this working for a while now without issues. However, my original answer only works for single entry in binds, modified version that includes all binds:
@pytest.fixture(scope='function')
def session(db, monkeypatch):
"""Database session for a test that will be cleared at the end."""
monkeypatch.setattr('flask_sqlalchemy.SignallingSession.get_bind', SessionBase.get_bind)
default_conn = db.engine.connect()
default_trans = default_conn.begin()
other_binds = {}
conns = {}
# Create a connection and a root transaction so that we can roll it back after the test.
for mapper, engine in db.get_binds().items():
# Reuse the same connection for the same engine,
# otherwise, different binds will throw an error, because mappers use the same engine.
if engine not in conns:
conn_ = engine.connect()
conns[engine] = (conn_, conn_.begin())
other_binds[mapper] = conns[engine][0]
options = dict(bind=default_conn, binds=other_binds)
_session = db.create_scoped_session(options=options)
db.session = _session
yield _session
_session.remove()
for conn_ in list(conns.values()) + [(default_conn, default_trans)]:
conn_[1].rollback()
conn_[0].close()
The previous solution fixed my error. However, now I am facing another issue: I need to execute a raw SQL query:
def test_two_models(session):
row1 = MyModel(email='[email protected]')
row2 = MyOtherModel(email='[email protected]')
session.add(row1)
session.add(row2)
session.commit()
assert len(session.execute('SELECT * FROM my_model_table').all()) == 1
Again, I get the same error:
sqlalchemy.exc.InvalidRequestError: Session already has a Connection associated for the given Connection's Engine
If I try to use the session's connection, I don't retrieve any row:
def test_two_models(session):
row1 = MyModel(email='[email protected]')
row2 = MyOtherModel(email='[email protected]')
session.add(row1)
session.add(row2)
session.commit()
connection = session.get_bind()
assert len(connection.execute('SELECT * FROM my_model_table').all()) == 1
# assert 0 == 1
Hi @MarcAragones I'm facing the same challenge when executing raw SQL. Did you find a solution?
Based on some of the investigation here, this should be fixed by #1087. It changes how binds work so that a different engine and metadata pair are used per bind key, and the session is implemented closer to how the base session works.
Thanks for the change, #1087 looks to be a significant change, looking forward to reading more about it and trying it.