opentelemetry-python-contrib icon indicating copy to clipboard operation
opentelemetry-python-contrib copied to clipboard

SQLAlchemyInstrumentor - calling `instrument` for multiple engines only instruments the first engine

Open dfaivre-pcs opened this issue 1 month ago • 5 comments

What problem do you want to solve?

Adding SQL Alchemy engines one at a time using this pattern - does not seem to add the second engine to instrumentation (ie, calling SQLAlchemyInstrumentor().instrument(engine=engine_1, ...) multiple times)

instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py

def test_instrument_two_engines(self):
        engine_1 = create_engine("sqlite:///:memory:")
        engine_2 = create_engine("sqlite:///:memory:")

        SQLAlchemyInstrumentor().instrument(
            engine=engine_1,
            tracer_provider=self.tracer_provider,
        )
        cnx_1 = engine_1.connect()
        cnx_1.execute("SELECT	1 + 1;").fetchall()

        SQLAlchemyInstrumentor().instrument(
            engine=engine_2,
            tracer_provider=self.tracer_provider,
        )
        cnx_2 = engine_2.connect()
        cnx_2.execute("SELECT	1 + 1;").fetchall()

        spans = self.memory_exporter.get_finished_spans()

        self.assertEqual(len(spans), 2)

Describe the solution you'd like

I'm lazy initializing multiple SQL Alchemy engines in my app to multiple DBs - I'd like to be able to directly call SQLAlchemyInstrumentor().instrument(... with different engines multiple times and have it work.

Describe alternatives you've considered

I have a place holder/naive/not-code reviewed instrumentation CLAUDE CODE wrapper for now that seems to allow multiple engines:

"""SQLAlchemy OpenTelemetry instrumentation for multiple async engines."""

import logging
import weakref
from typing import Any, ClassVar

import structlog
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy import (  # type: ignore[import-untyped]
    SQLAlchemyInstrumentor,
)
from opentelemetry.instrumentation.sqlalchemy.engine import (  # type: ignore[import-untyped]
    EngineTracer,
)
from opentelemetry.semconv._incubating.attributes import (  # type: ignore[import-untyped]
    db_attributes,
)
from sqlalchemy import Engine
from sqlalchemy.ext.asyncio import AsyncEngine

from plu.pcs.apps.services.web.backend.observability.logger import get_logger

# Stdlib logger for library-level class (consistent with BaseInstrumentor)
_LOG = logging.getLogger(__name__)

# Attribute name for db.name in logs (matches OpenTelemetry semantic conventions)
DB_NAME_LOG_ATTR = "db.name"


class DatabaseNameEngineTracer(EngineTracer):  # type: ignore[misc]
    """EngineTracer that adds database name to all spans.

    The base EngineTracer extracts db.name from URL.database, but ODBC connection
    strings (mssql+aioodbc:///?odbc_connect=...) don't expose the database in the
    URL object. This subclass stores the database name and adds it to all spans.

    Also handles connections_usage=None by overriding pool event methods to be no-ops.
    """

    def __init__(
        self,
        tracer: Any,
        engine: Any,
        connections_usage: Any,
        database_name: str,
        enable_commenter: bool = False,
        commenter_options: dict[str, Any] | None = None,
        enable_attribute_commenter: bool = False,
    ) -> None:
        """Initialize tracer with database name.

        Args:
            tracer: OpenTelemetry tracer instance.
            engine: SQLAlchemy engine to trace.
            connections_usage: Connections usage metric (can be None to skip metrics).
            database_name: Database name to add to spans.
            enable_commenter: Enable SQL commenter.
            commenter_options: Commenter options.
            enable_attribute_commenter: Include commenter in span attributes.

        """
        super().__init__(  # pyright: ignore[reportUnknownMemberType]
            tracer,
            engine,
            connections_usage,
            enable_commenter=enable_commenter,
            commenter_options=commenter_options,
            enable_attribute_commenter=enable_attribute_commenter,
        )
        self._database_name = database_name

    def _add_idle_to_connection_usage(self, value: int) -> None:
        """Override to handle None connections_usage."""
        if self.connections_usage is not None:
            super()._add_idle_to_connection_usage(value)  # pyright: ignore[reportUnknownMemberType]

    def _add_used_to_connection_usage(self, value: int) -> None:
        """Override to handle None connections_usage."""
        if self.connections_usage is not None:
            super()._add_used_to_connection_usage(value)  # pyright: ignore[reportUnknownMemberType]

    def _before_cur_exec(
        self,
        conn: Any,
        cursor: Any,
        statement: Any,
        params: Any,
        context: Any,
        executemany: Any,
    ) -> Any:
        """Hook before cursor execution - creates span with db.name attribute.

        Calls base implementation then adds db.name to the span if set.
        """
        result: tuple[Any, Any] = super()._before_cur_exec(  # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
            conn, cursor, statement, params, context, executemany
        )

        # Add database name to span if we have one and span exists
        if self._database_name and hasattr(context, "_otel_span"):
            span = context._otel_span
            if span and span.is_recording():
                span.set_attribute(db_attributes.DB_NAME, self._database_name)

        # Bind database name to structlog context for logs during query execution
        if self._database_name:
            structlog.contextvars.bind_contextvars(**{DB_NAME_LOG_ATTR: self._database_name})

        return result

    def _after_cur_exec(
        self,
        conn: Any,
        cursor: Any,
        statement: Any,
        params: Any,
        context: Any,
        executemany: Any,
    ) -> None:
        """Hook after cursor execution - cleans up context.

        Calls base implementation then unbinds db.name from structlog context.
        """
        super()._after_cur_exec(  # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
            conn, cursor, statement, params, context, executemany
        )

        # Unbind database name from structlog context
        if self._database_name:
            structlog.contextvars.unbind_contextvars(DB_NAME_LOG_ATTR)


class PluSQLAlchemyInstrumentor(SQLAlchemyInstrumentor):
    """SQLAlchemy instrumentor with PLU-specific enhancements.

    Enhancements over base SQLAlchemyInstrumentor:
    - Supports instrumenting multiple engines (base uses singleton pattern)
    - Adds db.name to all spans/logs (ODBC connection strings don't expose it)
    """

    _instrumented_engines: ClassVar[weakref.WeakSet[Engine]] = weakref.WeakSet()

    @classmethod
    def reset_instrumented_engines(cls) -> None:
        """Clear tracked engines. Useful for testing."""
        cls._instrumented_engines = weakref.WeakSet()

    def instrument(
        self,
        *,
        database_name: str = "",
        engine: Engine | None = None,
        **kwargs: Any,
    ) -> None:
        """Instrument an engine, allowing multiple engines.

        Tracks engines via WeakSet to warn on duplicates. When engines are
        garbage collected, they are automatically removed from tracking.

        Args:
            database_name: Name of database for spans/logging.
            engine: SQLAlchemy engine to instrument.
            **kwargs: Passed to base instrument().

        """
        if engine is not None:
            if engine in self._instrumented_engines:
                _LOG.warning(
                    "Engine for %s is already instrumented, skipping",
                    database_name or engine,
                )
                return
            self._instrumented_engines.add(engine)

        # Reset flag so base.instrument() doesn't hit the guard
        self._is_instrumented_by_opentelemetry = False

        # If we have a database_name, use our custom tracer directly
        if database_name and engine is not None:
            self._instrument_with_database_name(engine, database_name, **kwargs)
        else:
            # Fallback to base for full logic
            super().instrument(engine=engine, **kwargs)

    def _instrument_with_database_name(
        self,
        engine: Engine,
        database_name: str,
        **kwargs: Any,
    ) -> None:
        """Instrument engine with custom tracer that includes database name.

        Args:
            engine: SQLAlchemy engine to instrument.
            database_name: Database name to add to all spans.
            **kwargs: Additional options (tracer_provider, commenter settings).

        """
        tracer_provider = kwargs.get("tracer_provider")
        tracer = trace.get_tracer(
            __name__,
            tracer_provider=tracer_provider,
        )

        # Create our custom tracer with database name
        DatabaseNameEngineTracer(
            tracer,
            engine,
            connections_usage=None,  # Skip metrics for now
            database_name=database_name,
            enable_commenter=kwargs.get("enable_commenter", False),
            commenter_options=kwargs.get("commenter_options"),
            enable_attribute_commenter=kwargs.get("enable_attribute_commenter", False),
        )


def instrument_async_engine(engine: AsyncEngine, *, database_name: str = "") -> None:
    """Instrument an async SQLAlchemy engine for OpenTelemetry tracing.

    Args:
        engine: Async SQLAlchemy engine to instrument.
        database_name: Optional name for logging.
    """
    logger = get_logger(__name__)

    logger.info(
        "db.engine.instrumenting",
        database=database_name,
        message=f"Instrumenting SQLAlchemy engine for {database_name or 'unknown'}",
    )

    # Use sync_engine for async engines (required for event listener registration)
    PluSQLAlchemyInstrumentor().instrument(
        engine=engine.sync_engine,
        database_name=database_name,
    )

    logger.info(
        "db.engine.instrumented",
        database=database_name,
        message=f"SQLAlchemy engine instrumented for {database_name or 'unknown'}",
    )

Additional Context

I'm probably using the instrumentation for SQL Alchemy wrong and missing something obvious - feel free to yell at me that I should have read the docs/source code more closely. I have a lot of dev experience, but not much with python/otel...

Would you like to implement a fix?

Yes

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

dfaivre-pcs avatar Nov 26 '25 20:11 dfaivre-pcs