sentry-python
sentry-python copied to clipboard
SAQ Integration
Problem Statement
We recently migrated to use SAQ https://github.com/tobymao/saq as our asyncio task runner. It's been working great, but we don't have adequate visibility in Sentry.
Solution Brainstorm
An integration for SAQ built into Sentry-Python.
If wanted I could try and build a basic Sentry integration for SAQ and post it as a PR.
If wanted I could try and build a basic Sentry integration for SAQ and post it as a PR.
@grigi Yes, we are always open to PRs if you would like to try!
I will put this issue on our internal backlog, but since our team is quite busy at the moment with some larger projects, it would probably take us some time to create this integration. If you would like to open a PR, that would probably greatly speed up the process 🙂
Thanks @szokeasaurusrex
I wrote a quick integration for SAQ (based on the ARQ one) that suits our needs. It's relatively simple, and it works with some local dev testing. Tried to keep the style the same as what was there. Other than this (and defining the consts correctly) what else would be needed for a PR? I have no idea what level of testing would be needed for this to be accepted upstream. Apologies for the lazy dump here.
Things to note, SAQ only accepts keyword argument for functions, and all arguments/responses must be JSON serialisable. So I'm not handling *args
as that's not supported. This might be bad practice, so no objections to having to put those back. I just removed anything that I wasn't using directly.
SaqIntegration
from __future__ import absolute_import
import sys
from sentry_sdk import Hub
from sentry_sdk._compat import reraise
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, event_from_exception, parse_version
try:
from saq import __version__ as SAQ_VERSION
from saq.queue import JobError, Queue
from saq.worker import Worker
except ImportError:
raise DidNotEnable("SAQ is not installed")
if TYPE_CHECKING:
from typing import Any, Callable, Coroutine, Dict, Optional, Union
from saq.job import Job
from sentry_sdk._types import Event, EventProcessor, ExcInfo, Hint
OP.QUEUE_SUBMIT_SAQ = "queue.submit.saq"
OP.QUEUE_TASK_SAQ = "queue.task.saq"
class SaqIntegration(Integration):
identifier = "saq"
@staticmethod
def setup_once():
# type: () -> None
try:
version = parse_version(SAQ_VERSION)
except (TypeError, ValueError):
pass
if version is None:
raise DidNotEnable("Unparsable SAQ version: {}".format(SAQ_VERSION))
if version < (0, 12):
raise DidNotEnable("SAQ 0.12 or newer required.")
patch_queue_enqueue()
patch_worker_start()
ignore_logger("saq")
def patch_queue_enqueue():
# type: () -> None
old_queue_enqueue = Queue.enqueue
async def _sentry_queue_enqueue(self, job_or_func, **kwargs):
# type: (Queue, Union[str, Job], **Any) -> Optional[Job]
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return await old_queue_enqueue(self, job_or_func, **kwargs)
description = job_or_func if isinstance(job_or_func, str) else job_or_func.function
with hub.start_span(op=OP.QUEUE_SUBMIT_SAQ, description=description):
return await old_queue_enqueue(self, job_or_func, **kwargs)
Queue.enqueue = _sentry_queue_enqueue
def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current
if hub.scope.transaction is not None:
if exc_info[0] == JobError:
hub.scope.transaction.set_status("aborted")
return
hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": SaqIntegration.identifier, "handled": False},
)
hub.capture_event(event, hint=hint)
def _make_event_processor(ctx, **kwargs):
# type: (Dict[Any, Any], **Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]
hub = Hub.current
with capture_internal_exceptions():
job = ctx["job"] # type: Job
if hub.scope.transaction is not None:
hub.scope.transaction.name = job.function
event["transaction"] = job.function
job.next_retry_delay()
tags = event.setdefault("tags", {})
tags["saq_task_id"] = job.id
tags["saq_task_retry"] = job.retries > 1
extra = event.setdefault("extra", {})
extra["saq-job"] = {
"task": job.function,
"kwargs": (
kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"attempts": job.attempts,
"retries": job.retries,
"queue": job.queue.name if job.queue else "default"
}
return event
return event_processor
def _wrap_function(coroutine):
# type: (Callable) -> Any
async def _sentry_coroutine(ctx, **kwargs):
# type: (Dict[Any, Any], **Any) -> Any
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return await coroutine(ctx, **kwargs)
hub.scope.add_event_processor(
_make_event_processor(ctx, **kwargs)
)
try:
result = await coroutine(ctx, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
return _sentry_coroutine
def patch_worker_start():
# type: () -> None
old_worker_start = Worker.start
def _sentry_worker_start(self):
# type: (Worker) -> Coroutine
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return old_worker_start(self)
self.functions = {
name: _wrap_function(func) for name, func in self.functions.items()
}
return old_worker_start(self)
Worker.start = _sentry_worker_start
from __future__ import absolute_import
import sys
from sentry_sdk import Hub
from sentry_sdk._compat import reraise
from sentry_sdk._types import TYPE_CHECKING
from sentry_sdk.consts import OP
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.utils import SENSITIVE_DATA_SUBSTITUTE, capture_internal_exceptions, event_from_exception, parse_version
try:
from saq import __version__ as SAQ_VERSION
from saq.queue import JobError, Queue
from saq.worker import Worker
except ImportError:
raise DidNotEnable("SAQ is not installed")
if TYPE_CHECKING:
from typing import Any, Callable, Coroutine, Dict, Optional, Union
from saq.job import Job
from sentry_sdk._types import Event, EventProcessor, ExcInfo, Hint
OP.QUEUE_SUBMIT_SAQ = "queue.submit.saq"
OP.QUEUE_TASK_SAQ = "queue.task.saq"
class SaqIntegration(Integration):
identifier = "saq"
@staticmethod
def setup_once():
# type: () -> None
try:
version = parse_version(SAQ_VERSION)
except (TypeError, ValueError):
pass
if version is None:
raise DidNotEnable("Unparsable SAQ version: {}".format(SAQ_VERSION))
if version < (0, 12):
raise DidNotEnable("SAQ 0.12 or newer required.")
patch_queue_enqueue()
patch_worker_start()
ignore_logger("saq")
def patch_queue_enqueue():
# type: () -> None
old_queue_enqueue = Queue.enqueue
async def _sentry_queue_enqueue(self, job_or_func, **kwargs):
# type: (Queue, Union[str, Job], **Any) -> Optional[Job]
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return await old_queue_enqueue(self, job_or_func, **kwargs)
description = job_or_func if isinstance(job_or_func, str) else job_or_func.function
with hub.start_span(op=OP.QUEUE_SUBMIT_SAQ, description=description):
return await old_queue_enqueue(self, job_or_func, **kwargs)
Queue.enqueue = _sentry_queue_enqueue
def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current
if hub.scope.transaction is not None:
if exc_info[0] == JobError:
hub.scope.transaction.set_status("aborted")
return
hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": SaqIntegration.identifier, "handled": False},
)
hub.capture_event(event, hint=hint)
def _make_event_processor(ctx, **kwargs):
# type: (Dict[Any, Any], **Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]
hub = Hub.current
with capture_internal_exceptions():
job = ctx["job"] # type: Job
if hub.scope.transaction is not None:
hub.scope.transaction.name = job.function
event["transaction"] = job.function
job.next_retry_delay()
tags = event.setdefault("tags", {})
tags["saq_task_id"] = job.id
tags["saq_task_retry"] = job.retries > 1
extra = event.setdefault("extra", {})
extra["saq-job"] = {
"task": job.function,
"kwargs": (
kwargs if _should_send_default_pii() else SENSITIVE_DATA_SUBSTITUTE
),
"attempts": job.attempts,
"retries": job.retries,
"queue": job.queue.name if job.queue else "default"
}
return event
return event_processor
def _wrap_function(coroutine):
# type: (Callable) -> Any
async def _sentry_coroutine(ctx, **kwargs):
# type: (Dict[Any, Any], **Any) -> Any
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return await coroutine(ctx, **kwargs)
hub.scope.add_event_processor(
_make_event_processor(ctx, **kwargs)
)
try:
result = await coroutine(ctx, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
return _sentry_coroutine
def patch_worker_start():
# type: () -> None
old_worker_start = Worker.start
def _sentry_worker_start(self):
# type: (Worker) -> Coroutine
hub = Hub.current
if hub.get_integration(SaqIntegration) is None:
return old_worker_start(self)
self.functions = {
name: _wrap_function(func) for name, func in self.functions.items()
}
return old_worker_start(self)
Worker.start = _sentry_worker_start
@grigi Awesome, thanks for giving this a shot! 🚀
We definitely will need tests for this -- I'd look at how the tests for similar integrations (like ARQ for instance) are done and try to replicate that level of coverage. In general, you should make sure that all the different scenarios the integration supports are tested somehow (errors are being captured, error events have all the extra stuff the event processor is supposed to add, creating transactions works, testing different ways of enqueuing a task if that's something SAQ supports, etc.). But you can already open a draft PR with this and add the tests later.
@grigi Can you please consider opening a PR with the code in your comment, above? We can only add this code to the SDK if it is contributed via a pull request.
This issue has gone three weeks without activity. In another week, I will close it.
But! If you comment or otherwise update it, I will reset the clock, and if you remove the label Waiting for: Community
, I will leave it alone ... forever!
"A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀