Allow Instrumentation
I would like to be able to monitor database queries via opentracing. This is relatively straightforward to do with SQLAlchemy which provides events one can listen and react to, when ever a query is about to be submitted to the database and when execution finishes (Example).
I've not been able to find anything similar in databases. Looking at the source code, the only way I can see to introduce instrumentation, would be to implement my own backend. I'd much prefer to avoid that.
I would appreciate, if you would consider adding callbacks like SQLAlchemy to allow such instrumentation.
hey @dneuhaeuser-zalando, it's an interesting task indeed!
I might say that databases has rather a loose coupling with sqlalchemy engines, so it likely won't work out of the box.
We can probably deal with it a bit lower level, but it might take some time and I can't tell you any ETA at the moment.
Also, it's quite a good candidate as the third party lib.
WRT to custom backends, it's currently not available, but this might happen sooner.
We have addressed this now by monkey patching. Not elegant and probably somewhat fragile when upgrading but does the job:
def patch_databases():
"""
Applies a monkey patch to implement opentracing instrumentation.
This needs to be called before an instance of the `Database` class is first
created.
"""
from databases.core import Transaction
from databases.backends.postgres import PostgresConnection, Record
tags = {
opentracing.tags.COMPONENT: "databases",
opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT,
opentracing.tags.DATABASE_TYPE: "sql",
}
def _set_db_instance(span):
from blacklist.database.core import database
url = database.url.obscure_password
span.set_tag(opentracing.tags.DATABASE_INSTANCE, url)
@span(tags=tags.copy(), pass_span=True)
async def fetch_all(self, query, *, span):
_set_db_instance(span)
assert self._connection is not None, "Connection is not acquired"
query, args, result_columns = self._compile(query)
span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
rows = await self._connection.fetch(query, *args)
return [Record(row, result_columns, self._dialect) for row in rows]
@span(tags=tags.copy(), pass_span=True)
async def fetch_one(self, query, *, span):
_set_db_instance(span)
assert self._connection is not None, "Connection is not acquired"
query, args, result_columns = self._compile(query)
span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
row = await self._connection.fetchrow(query, *args)
if row is None:
return None
return Record(row, result_columns, self._dialect)
@span(tags=tags.copy(), pass_span=True)
async def execute(self, query, *, span):
_set_db_instance(span)
assert self._connection is not None, "Connection is not acquired"
query, args, result_columns = self._compile(query)
span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
return await self._connection.fetchval(query, *args)
@span(tags=tags.copy(), pass_span=True)
async def execute_many(self, queries, *, span):
# This is generally not called directly but through
# `Database.execute_many` which ensures that each query in queries is
# actually the same, if we ignore differences in values. So having a
# single span should make sense.
_set_db_instance(span)
assert self._connection is not None, "Connection is not acquired"
# asyncpg uses prepared statements under the hood, so we just
# loop through multiple executes here, which should all end up
# using the same prepared statement.
for single_query in queries:
span.set_tag(opentracing.tags.DATABASE_STATEMENT, single_query)
single_query, args, result_columns = self._compile(single_query)
await self._connection.execute(single_query, *args)
@span(tags=tags.copy(), pass_span=True)
async def iterate(self, query, *, span):
_set_db_instance(span)
assert self._connection is not None, "Connection is not acquired"
query, args, result_columns = self._compile(query)
span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
async for row in self._connection.cursor(query, *args):
yield Record(row, result_columns, self._dialect)
PostgresConnection.fetch_all = fetch_all
PostgresConnection.fetch_one = fetch_one
PostgresConnection.execute = execute
PostgresConnection.execute_many = execute_many
PostgresConnection.iterate = iterate
transaction_start = Transaction.start
async def start(self):
self._opentracing_scope = opentracing.tracer.start_active_span(
"transaction", tags=tags.copy(), finish_on_close=True
)
return await transaction_start(self)
transaction_commit = Transaction.commit
async def commit(self):
await transaction_commit(self)
self._opentracing_scope.span.set_tag("commit", True)
transaction_rollback = Transaction.rollback
async def rollback(self):
await transaction_rollback(self)
self._opentracing_scope.span.set_tag("rollback", True)
transaction_aexit = Transaction.__aexit__
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None):
if exc_type is not None:
span = self._opentracing_scope.span
span.set_tag(opentracing.tags.ERROR, True)
span.log_kv(
{
opentracing.logs.EVENT: opentracing.tags.ERROR,
opentracing.logs.MESSAGE: str(exc_value),
opentracing.logs.ERROR_OBJECT: exc_value,
opentracing.logs.ERROR_KIND: exc_type,
opentracing.logs.STACK: traceback,
}
)
await transaction_aexit(
self, exc_type=exc_type, exc_value=exc_value, traceback=traceback
)
self._opentracing_scope.close()
Transaction.start = start
Transaction.commit = commit
Transaction.rollback = rollback
Transaction.__aexit__ = __aexit__