sentry-python icon indicating copy to clipboard operation
sentry-python copied to clipboard

SAQ Integration

Open grigi opened this issue 1 year ago • 3 comments

Problem Statement

We recently migrated to use SAQ https://github.com/tobymao/saq as our asyncio task runner. It's been working great, but we don't have adequate visibility in Sentry.

Solution Brainstorm

An integration for SAQ built into Sentry-Python.

If wanted I could try and build a basic Sentry integration for SAQ and post it as a PR.

grigi avatar Feb 12 '24 11:02 grigi

If wanted I could try and build a basic Sentry integration for SAQ and post it as a PR.

@grigi Yes, we are always open to PRs if you would like to try!

I will put this issue on our internal backlog, but since our team is quite busy at the moment with some larger projects, it would probably take us some time to create this integration. If you would like to open a PR, that would probably greatly speed up the process 🙂

szokeasaurusrex avatar Feb 12 '24 13:02 szokeasaurusrex

Thanks @szokeasaurusrex

I wrote a quick integration for SAQ (based on the ARQ one) that suits our needs. It's relatively simple, and it works with some local dev testing. Tried to keep the style the same as what was there. Other than this (and defining the consts correctly) what else would be needed for a PR? I have no idea what level of testing would be needed for this to be accepted upstream. Apologies for the lazy dump here.

Things to note, SAQ only accepts keyword argument for functions, and all arguments/responses must be JSON serialisable. So I'm not handling *args as that's not supported. This might be bad practice, so no objections to having to put those back. I just removed anything that I wasn't using directly.

SaqIntegration
from __future__ import absolute_import

import sys

from sentry_sdk import Hub
from sentry_sdk._compat import reraise
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, event_from_exception, parse_version

try:
    from saq import __version__ as SAQ_VERSION
    from saq.queue import JobError, Queue
    from saq.worker import Worker
except ImportError:
    raise DidNotEnable("SAQ is not installed")

if TYPE_CHECKING:
    from typing import Any, Callable, Coroutine, Dict, Optional, Union

    from saq.job import Job
    from sentry_sdk._types import Event, EventProcessor, ExcInfo, Hint

OP.QUEUE_SUBMIT_SAQ = "queue.submit.saq"
OP.QUEUE_TASK_SAQ = "queue.task.saq"


class SaqIntegration(Integration):
    identifier = "saq"

    @staticmethod
    def setup_once():
        # type: () -> None

        try:
            version = parse_version(SAQ_VERSION)

        except (TypeError, ValueError):
            pass

        if version is None:
            raise DidNotEnable("Unparsable SAQ version: {}".format(SAQ_VERSION))

        if version < (0, 12):
            raise DidNotEnable("SAQ 0.12 or newer required.")

        patch_queue_enqueue()
        patch_worker_start()

        ignore_logger("saq")


def patch_queue_enqueue():
    # type: () -> None
    old_queue_enqueue = Queue.enqueue

    async def _sentry_queue_enqueue(self, job_or_func, **kwargs):
        # type: (Queue, Union[str, Job], **Any) -> Optional[Job]
        hub = Hub.current

        if hub.get_integration(SaqIntegration) is None:
            return await old_queue_enqueue(self, job_or_func, **kwargs)

        description = job_or_func if isinstance(job_or_func, str) else job_or_func.function
        with hub.start_span(op=OP.QUEUE_SUBMIT_SAQ, description=description):
            return await old_queue_enqueue(self, job_or_func, **kwargs)

    Queue.enqueue = _sentry_queue_enqueue


def _capture_exception(exc_info):
    # type: (ExcInfo) -> None
    hub = Hub.current

    if hub.scope.transaction is not None:
        if exc_info[0] == JobError:
            hub.scope.transaction.set_status("aborted")
            return

        hub.scope.transaction.set_status("internal_error")

    event, hint = event_from_exception(
        exc_info,
        client_options=hub.client.options if hub.client else None,
        mechanism={"type": SaqIntegration.identifier, "handled": False},
    )
    hub.capture_event(event, hint=hint)


def _make_event_processor(ctx, **kwargs):
    # type: (Dict[Any, Any], **Any) -> EventProcessor
    def event_processor(event, hint):
        # type: (Event, Hint) -> Optional[Event]

        hub = Hub.current

        with capture_internal_exceptions():
            job = ctx["job"]  # type: Job

            if hub.scope.transaction is not None:
                hub.scope.transaction.name = job.function
                event["transaction"] = job.function

            job.next_retry_delay()

            tags = event.setdefault("tags", {})
            tags["saq_task_id"] = job.id
            tags["saq_task_retry"] = job.retries > 1
            extra = event.setdefault("extra", {})
            extra["saq-job"] = {
                "task": job.function,
                "kwargs": (
                    kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
                ),
                "attempts": job.attempts,
                "retries": job.retries,
                "queue": job.queue.name if job.queue else "default"
            }

        return event

    return event_processor


def _wrap_function(coroutine):
    # type: (Callable) -> Any
    async def _sentry_coroutine(ctx, **kwargs):
        # type: (Dict[Any, Any], **Any) -> Any
        hub = Hub.current
        if hub.get_integration(SaqIntegration) is None:
            return await coroutine(ctx, **kwargs)

        hub.scope.add_event_processor(
            _make_event_processor(ctx, **kwargs)
        )

        try:
            result = await coroutine(ctx, **kwargs)
        except Exception:
            exc_info = sys.exc_info()
            _capture_exception(exc_info)
            reraise(*exc_info)

        return result

    return _sentry_coroutine


def patch_worker_start():
    # type: () -> None
    old_worker_start = Worker.start

    def _sentry_worker_start(self):
        # type: (Worker) -> Coroutine
        hub = Hub.current

        if hub.get_integration(SaqIntegration) is None:
            return old_worker_start(self)

        self.functions = {
            name: _wrap_function(func) for name, func in self.functions.items()
        }

        return old_worker_start(self)

    Worker.start = _sentry_worker_start

grigi avatar Feb 13 '24 10:02 grigi

@grigi Awesome, thanks for giving this a shot! 🚀

We definitely will need tests for this -- I'd look at how the tests for similar integrations (like ARQ for instance) are done and try to replicate that level of coverage. In general, you should make sure that all the different scenarios the integration supports are tested somehow (errors are being captured, error events have all the extra stuff the event processor is supposed to add, creating transactions works, testing different ways of enqueuing a task if that's something SAQ supports, etc.). But you can already open a draft PR with this and add the tests later.

sentrivana avatar Feb 14 '24 16:02 sentrivana

@grigi Can you please consider opening a PR with the code in your comment, above? We can only add this code to the SDK if it is contributed via a pull request.

szokeasaurusrex avatar Jul 29 '24 12:07 szokeasaurusrex

This issue has gone three weeks without activity. In another week, I will close it.

But! If you comment or otherwise update it, I will reset the clock, and if you remove the label Waiting for: Community, I will leave it alone ... forever!


"A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀

getsantry[bot] avatar Aug 20 '24 07:08 getsantry[bot]