databases icon indicating copy to clipboard operation
databases copied to clipboard

Allow Instrumentation

Open dneuhaeuser-zalando opened this issue 5 years ago • 2 comments

I would like to be able to monitor database queries via opentracing. This is relatively straightforward to do with SQLAlchemy which provides events one can listen and react to, when ever a query is about to be submitted to the database and when execution finishes (Example).

I've not been able to find anything similar in databases. Looking at the source code, the only way I can see to introduce instrumentation, would be to implement my own backend. I'd much prefer to avoid that.

I would appreciate, if you would consider adding callbacks like SQLAlchemy to allow such instrumentation.

dneuhaeuser-zalando avatar Mar 23 '20 16:03 dneuhaeuser-zalando

hey @dneuhaeuser-zalando, it's an interesting task indeed! I might say that databases has rather a loose coupling with sqlalchemy engines, so it likely won't work out of the box.

We can probably deal with it a bit lower level, but it might take some time and I can't tell you any ETA at the moment.

Also, it's quite a good candidate as the third party lib.

WRT to custom backends, it's currently not available, but this might happen sooner.

gvbgduh avatar Mar 26 '20 12:03 gvbgduh

We have addressed this now by monkey patching. Not elegant and probably somewhat fragile when upgrading but does the job:

def patch_databases():
    """
    Applies a monkey patch to implement opentracing instrumentation.

    This needs to be called before an instance of the `Database` class is first
    created.
    """
    from databases.core import Transaction
    from databases.backends.postgres import PostgresConnection, Record

    tags = {
        opentracing.tags.COMPONENT: "databases",
        opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT,
        opentracing.tags.DATABASE_TYPE: "sql",
    }

    def _set_db_instance(span):
        from blacklist.database.core import database

        url = database.url.obscure_password
        span.set_tag(opentracing.tags.DATABASE_INSTANCE, url)

    @span(tags=tags.copy(), pass_span=True)
    async def fetch_all(self, query, *, span):
        _set_db_instance(span)
        assert self._connection is not None, "Connection is not acquired"
        query, args, result_columns = self._compile(query)
        span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
        rows = await self._connection.fetch(query, *args)
        return [Record(row, result_columns, self._dialect) for row in rows]

    @span(tags=tags.copy(), pass_span=True)
    async def fetch_one(self, query, *, span):
        _set_db_instance(span)
        assert self._connection is not None, "Connection is not acquired"
        query, args, result_columns = self._compile(query)
        span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
        row = await self._connection.fetchrow(query, *args)
        if row is None:
            return None
        return Record(row, result_columns, self._dialect)

    @span(tags=tags.copy(), pass_span=True)
    async def execute(self, query, *, span):
        _set_db_instance(span)
        assert self._connection is not None, "Connection is not acquired"
        query, args, result_columns = self._compile(query)
        span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
        return await self._connection.fetchval(query, *args)

    @span(tags=tags.copy(), pass_span=True)
    async def execute_many(self, queries, *, span):
        # This is generally not called directly but through
        # `Database.execute_many` which ensures that each query in queries is
        # actually the same, if we ignore differences in values. So having a
        # single span should make sense.

        _set_db_instance(span)
        assert self._connection is not None, "Connection is not acquired"
        # asyncpg uses prepared statements under the hood, so we just
        # loop through multiple executes here, which should all end up
        # using the same prepared statement.
        for single_query in queries:
            span.set_tag(opentracing.tags.DATABASE_STATEMENT, single_query)
            single_query, args, result_columns = self._compile(single_query)
            await self._connection.execute(single_query, *args)

    @span(tags=tags.copy(), pass_span=True)
    async def iterate(self, query, *, span):
        _set_db_instance(span)
        assert self._connection is not None, "Connection is not acquired"
        query, args, result_columns = self._compile(query)
        span.set_tag(opentracing.tags.DATABASE_STATEMENT, query)
        async for row in self._connection.cursor(query, *args):
            yield Record(row, result_columns, self._dialect)

    PostgresConnection.fetch_all = fetch_all
    PostgresConnection.fetch_one = fetch_one
    PostgresConnection.execute = execute
    PostgresConnection.execute_many = execute_many
    PostgresConnection.iterate = iterate

    transaction_start = Transaction.start

    async def start(self):
        self._opentracing_scope = opentracing.tracer.start_active_span(
            "transaction", tags=tags.copy(), finish_on_close=True
        )
        return await transaction_start(self)

    transaction_commit = Transaction.commit

    async def commit(self):
        await transaction_commit(self)
        self._opentracing_scope.span.set_tag("commit", True)

    transaction_rollback = Transaction.rollback

    async def rollback(self):
        await transaction_rollback(self)
        self._opentracing_scope.span.set_tag("rollback", True)

    transaction_aexit = Transaction.__aexit__

    async def __aexit__(self, exc_type=None, exc_value=None, traceback=None):
        if exc_type is not None:
            span = self._opentracing_scope.span
            span.set_tag(opentracing.tags.ERROR, True)
            span.log_kv(
                {
                    opentracing.logs.EVENT: opentracing.tags.ERROR,
                    opentracing.logs.MESSAGE: str(exc_value),
                    opentracing.logs.ERROR_OBJECT: exc_value,
                    opentracing.logs.ERROR_KIND: exc_type,
                    opentracing.logs.STACK: traceback,
                }
            )

        await transaction_aexit(
            self, exc_type=exc_type, exc_value=exc_value, traceback=traceback
        )
        self._opentracing_scope.close()

    Transaction.start = start
    Transaction.commit = commit
    Transaction.rollback = rollback
    Transaction.__aexit__ = __aexit__

dneuhaeuser-zalando avatar Apr 14 '20 16:04 dneuhaeuser-zalando