tornado-sqlalchemy
tornado-sqlalchemy copied to clipboard
Is it OK that Session creation/cleanup is done in main thread?
Currently session creation/commit/close is done in tornado main thread. Although I am not sure how sqlalchemy works exactly, I feel that there can be blocking IO operation in session create/commit/close functions. At least, in commit function, framework should communicate with DB via TCP connection if there is any thing dirty.
I think it might be better to do these operations in thread pool. Is there any reason why you do in in main thread? Is it guaranteed that sqlalchemy does not have any IO operation while session creation and close as long as users commit actively?
I guess it depends for how long those operations are blocking. Given that a session kind of maps to a database transaction, there's at least some IO involved. In practice though, session create/commit/close operations tend to be quite fast. Definitely fast enough to not be the bottleneck. If you have an example where that's not the case, please let me know and we can discuss it together.
Although in general, I'm open to moving those operations to be put behind await. I don't think it's as simple as moving them to a thread pool, because it would require changing things at a lower level, possibly using some async adapter for SQLAlchemy, something like sqlalchemy-aio.
I am quite new to sqlalchemy and don't have experience about how heavy session create/commit/close operations are. I was just searching for the coding pattern for using tornado and sqlalchemy together. Your project helped me to understand how to use it a lot.
I think I will use your project and see how it goes. If create/commit/close are needed to be fully async, I will let you know and discuss how we can improve it.
If you drop context manage feature, I think you can easily make it run session operations in a thread pool. But, as you mentioned, it is a matter of how heavy they are. Calling them in a thread pool is expensive and might be inefficient. So, actual example is needed.
Would it be too bad if the framework calls commit functions every time query is called in thread pool?
If it is insert query, I believe actual writing will happen when commit is called. So, calling with commit function with dirty model object will be a bit heavy. We can let lib user to call commit function when they implement query function. But, if it is repeated coding pattern, would it be better to be handled by the framework?
# wrapper runs in thread pool.
def wrapper(self, query)
try:
query()
except Exception:
session.rollback()
raise
else:
session.commit()
The context manager and self.session both do effectively what your wrapper function is doing. self.session will commit (or rollback) and eventually close the session as the web request is processed (check the SessionMixin class). And the context manager does that in a specific scope defined by the user.
So, calling with commit function with dirty model object will be a bit heavy.
Unless you're inserting hundreds or thousands of objects (or a huge object), it probably won't be too heavy.
I mean running session.commit in thread pool together with query function as this operation is expected to have IO operation. It can be done by adding a simple wrapper like above.
Current version is running session.commit in the tornado main thread. If the query is insert, I think commit is heavier than add, isn't it?
Let me write a simple wrapper and send PR.
Before you start working on a PR, could you please run some numbers to check how much time would really be spent in a commit?
Maybe write a short script which inserts/queries a bunch of objects (at least in the thousands) and measure how much time the session spends in querying and how much time in committing.
If commits really turn out to be super slow, we can then discuss what exactly to implement in the PR. But let's discuss things once before you start working on this.
import time
import hashlib
from absl import app, flags
from tornado_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String, Index
db = SQLAlchemy()
FLAGS = flags.FLAGS
USER_PASSWORD_SALT = '12345678990'
class User(db.Model):
__tablename__ = 'user'
__table_args__ = (Index('ix_auth_pair', 'email', 'password_hashed'), )
id = Column(Integer, primary_key=True, nullable=False)
account_id = Column(Integer, unique=True, nullable=False)
email = Column(String(256), unique=True, nullable=False)
password_hashed = Column(String(80), nullable=False)
@staticmethod
def hash_password(password: str):
return hashlib.sha256((password + USER_PASSWORD_SALT).encode()).hexdigest()
def bootstrap():
db.configure(
url='mysql://root:suspicious-engelbart@{}/test?charset=utf8mb4'.format(
FLAGS.database_address),
engine_options={
'echo': False
}
)
db.drop_all()
db.create_all()
session_open_sec = 0
session_add_sec = 0
session_commit_sec = 0
session_close_sec = 0
for idx in range(1000):
t1 = time.time()
session = db.sessionmaker()
t2 = time.time()
email = email='a%[email protected]' % idx
session.add(
User(email=email,
password_hashed=User.hash_password('1000'),
account_id=9800100 + idx),
)
t3 = time.time()
session.commit()
t4 = time.time()
session.close()
t5 = time.time()
session_open_sec += (t2 - t1)
session_add_sec += (t3 - t2)
session_commit_sec += (t4 - t3)
session_close_sec += (t5 - t4)
print("session_open: %6.3f" % session_open_sec)
print("session_add: %6.3f" % session_add_sec)
print("session_commit %6.3f" % session_commit_sec)
print("session_close %6.3f" % session_close_sec)
def main(argv):
bootstrap()
if __name__ == '__main__':
flags.DEFINE_string(
'database_address',
'127.0.0.1:3306',
'Request Gateway Address')
app.run(main)
I measured the time spent in each function for insert query. I ran with mysql:8.0.18 docker running in the local machine.
Commit function takes most of the time.
$ python test.py --database_address=127.0.0.1
session_open: 0.112
session_add: 0.112
session_commit 2.392
session_close 0.050
I added select query and measure time as well.
import time
import hashlib
from absl import app, flags
from tornado_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String, Index, bindparam
from sqlalchemy.ext import baked
db = SQLAlchemy()
bakery = baked.bakery()
FLAGS = flags.FLAGS
USER_PASSWORD_SALT = '12345678990'
class User(db.Model):
__tablename__ = 'user'
__table_args__ = (Index('ix_auth_pair', 'email', 'password_hashed'), )
id = Column(Integer, primary_key=True, nullable=False)
account_id = Column(Integer, unique=True, nullable=False)
email = Column(String(256), unique=True, nullable=False)
password_hashed = Column(String(80), nullable=False)
@staticmethod
def hash_password(password: str):
return hashlib.sha256((password + USER_PASSWORD_SALT).encode()).hexdigest()
def test_insert():
session_open_sec = 0
session_add_sec = 0
session_commit_sec = 0
session_commit2_sec = 0
session_close_sec = 0
for idx in range(1000):
t1 = time.time()
session = db.sessionmaker()
t2 = time.time()
email = email='a%[email protected]' % idx
session.add(
User(email=email,
password_hashed=User.hash_password('1000'),
account_id=9800100 + idx),
)
t3 = time.time()
session.commit()
t4 = time.time()
session.commit()
t5 = time.time()
session.close()
t6 = time.time()
session_open_sec += (t2 - t1)
session_add_sec += (t3 - t2)
session_commit_sec += (t4 - t3)
session_commit2_sec += (t5 - t4)
session_close_sec += (t6 - t5)
print("\n== Insert Query ==")
print("session_open: %6.3f" % session_open_sec)
print("session_add: %6.3f" % session_add_sec)
print("session_commit: %6.3f" % session_commit_sec)
print("session_commit 2nd: %6.3f" % session_commit2_sec)
print("session_close: %6.3f" % session_close_sec)
def test_select():
session_open_sec = 0
session_query_sec = 0
session_commit_sec = 0
session_commit2_sec = 0
session_close_sec = 0
for idx in range(1000):
t1 = time.time()
session = db.sessionmaker()
t2 = time.time()
email = email='a%[email protected]' % idx
password_hashed=User.hash_password('1000')
query = bakery(lambda session: session.query(User))
query += lambda q: q.filter(User.email == bindparam('email'))
query += lambda q: q.filter(User.password_hashed == bindparam('password_hashed'))
user = query(session).params(email=email, password_hashed=password_hashed).one()
assert user.email == email and user.password_hashed == password_hashed
t3 = time.time()
session.commit()
t4 = time.time()
session.commit()
t5 = time.time()
session.close()
t6 = time.time()
session_open_sec += (t2 - t1)
session_query_sec += (t3 - t2)
session_commit_sec += (t4 - t3)
session_commit2_sec += (t5 - t4)
session_close_sec += (t6 - t5)
print("\n== Select Query ==")
print("session_open: %6.3f" % session_open_sec)
print("session_query: %6.3f" % session_query_sec)
print("session_commit: %6.3f" % session_commit_sec)
print("session_commit 2nd: %6.3f" % session_commit2_sec)
print("session_close: %6.3f" % session_close_sec)
def main(argv):
db.configure(
url='mysql://root:suspicious-engelbart@{}/test?charset=utf8mb4'.format(
FLAGS.database_address),
engine_options={
'echo': False
}
)
db.drop_all()
db.create_all()
test_insert()
test_select()
if __name__ == '__main__':
flags.DEFINE_string(
'database_address',
'127.0.0.1:3306',
'database address')
app.run(main)
I ran with mysql:8.0.18 docker running in the local machine. Here is the result.
$ python test.py
== Insert Query ==
session_open: 0.111
session_add: 0.115
session_commit: 2.447
session_commit 2nd: 0.072
session_close: 0.042
== Select Query ==
session_open: 0.078
session_query: 0.885
session_commit: 0.433
session_commit 2nd: 0.061
session_close: 0.029
Nice. Thanks for running those numbers.
For inserts, it might make sense to provide such a wrapper ("might", because I wonder how often do web request handlers need to insert thousands of database objects). For querying, I'm somehow not completely convinced, but if it's a common pattern, it might be worth considering.
What exact changes would you like to propose?
I realized that it is not an easy problem. Because of lazy loading, every step can be blocking if model object is accessed. So, I think the whole block within context manager should be run in thread pool.
What I suggest is to make a decorator similar to run_on_executor decorator in tornado.
def _query_wrapper(self, db: 'SQLAlchemy', query: Callable, *args: Any, **kwargs: Any):
session = db.sessionmaker()
try:
res = query(self, session, *args, **kwargs)
session.commit()
return res
except Exception:
logging.exception('query exception')
session.rollback()
raise
finally:
session.close()
# decorator for member method
# "db" should be set as property in the object
def query_on_executor(fn: Callable):
@functools.wraps(fn)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> asyncio.Future:
db = getattr(self, "db", None)
if not db:
raise MissingDatabaseSetting()
return as_future(lambda: _query_wrapper(self, db, fn, *args, **kwargs))
return wrapper
I'm assuming that the fn callable is a Query object? What happens if it already has a session attached? The _query_wrapper function would make another session, no?