Version 0.17.8 broke Celery tasks with custom Task class
I am using tenant-schemas-celery package, which generates it's own TenantTask (extends celery.app.task.Task) class in it's own CeleryApp (extends celery.Celery). After updating from sentry-sdk from 0.17.7 to 0.17.8 the TenantTask's tenant context switching stopped working. I suspect this is because of the following change in 0.17.8:
diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py
index 1a11d4a..2b51fe1 100644
--- a/sentry_sdk/integrations/celery.py
+++ b/sentry_sdk/integrations/celery.py
@@ -60,9 +60,8 @@ class CeleryIntegration(Integration):
# Need to patch both methods because older celery sometimes
# short-circuits to task.run if it thinks it's safe.
task.__call__ = _wrap_task_call(task, task.__call__)
task.run = _wrap_task_call(task, task.run)
- task.apply_async = _wrap_apply_async(task, task.apply_async)
# `build_tracer` is apparently called for every task
# invocation. Can't wrap every celery task for every invocation
# or we will get infinitely nested wrapper functions.
@@ -71,8 +70,12 @@ class CeleryIntegration(Integration):
return _wrap_tracer(task, old_build_tracer(name, task, *args, **kwargs))
trace.build_tracer = sentry_build_tracer
+ from celery.app.task import Task # type: ignore
+
+ Task.apply_async = _wrap_apply_async(Task.apply_async)
+
_patch_worker_exit()
# This logger logs every status of every task that ran on the worker.
# Meaning that every task's breadcrumbs are full of stuff like "Task
I think this problem has to do with this new way of importing celery.app.task.Task. Even though TenantTask extends the celery.app.task.Task, for some reason this change broke the TenantTask logic.
I'm not sure which package this should be fixed in, but I'm bit skeptical about this new import in sentry-sdk, so I'm reporting it here.
Here is my celery.py:
from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp
app = TenantAwareCeleryApp()
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
This is curious because the way we monkeypatch should generally work fine with subclasses, and tenanttask does not override apply_async, and the other methods it does override are not called by us. I think there is some magic going on in Celery that I overlooked.
This change was not super mission-critical, I only did it because patching instance methods seemed unnecessary. I'll take a look next week unless there is a second user who has this problem as well.
I think I just ran in to this as well with Flask... here's my code:
in base.py
import celery
from config.celery import DefaultCeleryConfig
from tasks.error_handling import RetryError
# https://flask.palletsprojects.com/en/1.1.x/patterns/celery/
# Run Celery task inside Flask context
# pylint: disable=W0223
class TaskWithFlaskContext(celery.Task): # noqa: W0223
autoretry_for = (RetryError,)
retry_backoff = DefaultCeleryConfig.RETRY_BACKOFF_DELAY_SECONDS
retry_backoff_max = DefaultCeleryConfig.RETRY_BACKOFF_MAX_SECONDS
max_retries = DefaultCeleryConfig.MAX_RETRIES
retry_jitter = DefaultCeleryConfig.RETRY_JITTER
def __call__(self, *args, **kwargs):
from config.main import setup_application
app = setup_application()
with app.app_context():
return self.run(*args, **kwargs)
in worker.py
celery = Celery("worker", task_cls="tasks.base:TaskWithFlaskContext")
judging by the change mentioned by OP I would assume this code
from celery.app.task import Task # type: ignore
Task.apply_async = _wrap_apply_async(Task.apply_async)
Is patching a class that we're not using (or that get's replaced).
In this documentation https://flask.palletsprojects.com/en/1.1.x/patterns/celery/#configure
you can see that the class is being replaced celery.Task = ContextTask:
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
Thanks, this snippet helps
Seems like there's a fix coming already, but I'll just leave a note here that this also broke our Celery tasks which are triggered via celery.tasks.periodic_task. We don't have any subclassing going on.
@clintmod (and others) can you verify whether it was actually 0.17.8 that broke your application? I am not able to reproduce issues with custom task classes. I have not tried setting up a Django app with tenant-schemas-celery yet.
It would be greatly apprechiated if somebody could give me a reproducible example in form of a repo where all dependencies are pinned.
@untitaker yes, periodic tasks broken by 0.7.18. Periodic task created with @pediodic_task by default has class PeriodicTask (subclass of Task)
Aye, 0.17.7 works ok and with 0.17.8 it starts breaking.
We don't use tenant-schemas-celery, so the issue should be reproducible with plain Django + Celery.
Which celery version is this now? I tried specifically periodic_task now and I am still not able to reproduce. Are there Django-specific bits I have to take into account? I didn't see task subclasses in djcelery.
Please again somebody give me a testapp.
testcase that passes under celery 4-5 (not tested 3):
click
diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py
index 32b3021..f939e99 100644
--- a/tests/integrations/celery/test_celery.py
+++ b/tests/integrations/celery/test_celery.py
@@ -22,7 +22,7 @@ def connect_signal(request):
@pytest.fixture
-def init_celery(sentry_init, request):
+def init_celery(sentry_init, request, tmpdir):
def inner(propagate_traces=True, backend="always_eager", **kwargs):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
@@ -51,19 +51,20 @@ def init_celery(sentry_init, request):
request.addfinalizer(lambda: Hub.main.bind_client(None))
# Once we drop celery 3 we can use the celery_worker fixture
+ schedule_filename = str(tmpdir.join("celerybeat-schedule"))
if VERSION < (5,):
- worker_fn = worker.worker(app=celery).run
+ worker_fn = lambda: worker.worker(app=celery).run(beat=True, schedule_filename=schedule_filename)
else:
from celery.bin.base import CLIContext
worker_fn = lambda: worker.worker(
obj=CLIContext(app=celery, no_color=True, workdir=".", quiet=False),
- args=[],
+ args=['-B', '-s', schedule_filename],
)
worker_thread = threading.Thread(target=worker_fn)
worker_thread.daemon = True
- worker_thread.start()
+ return celery, worker_thread
else:
raise ValueError(backend)
@@ -307,8 +308,10 @@ def test_retry(celery, capture_events):
@pytest.mark.forked
-def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir):
- celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)
+def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe):
+ from celery.schedules import crontab
+ celery, worker_thread = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)
+ worker_thread.start()
events = capture_events_forksafe()
@@ -354,6 +357,33 @@ def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, t
assert not runs
[email protected]
+def test_periodic_task(init_celery, capture_events_forksafe):
+ from celery.task.base import periodic_task
+ from celery.schedules import crontab
+ celery, worker_thread = init_celery(backend="redis", debug=True)
+
+ runs = []
+
+ @periodic_task(run_every=1)
+ def periodic_task():
+ runs.append(1)
+ 1 / 0
+
+ celery.conf.timezone = 'UTC'
+
+ events = capture_events_forksafe()
+
+ worker_thread.start()
+
+ event = events.read_event()
+ (exception,) = event["exception"]["values"]
+ assert exception["type"] == "ZeroDivisionError"
+
+ # assert that process forked
+ assert not runs
+
+
@pytest.mark.forked
@pytest.mark.parametrize("newrelic_order", ["sentry_first", "sentry_last"])
def test_newrelic_interference(init_celery, newrelic_order, celery_invocation):
It seems that the version 0.17.7 was also broken, but didn't appear in my local dev environment. After I updated our stage and production environments to 0.17.7, our Celery tasks stopped working because our TenantTask was not handling the context switching anymore. Downgrading to 0.17.6. fixed it. The changelog of 0.17.7 has this:
Fix a bug under Celery 4.2+ that may have caused disjoint traces or missing transactions.
EDIT: It seems that 0.17.7 did something to the task headers. Tenant-schemas-celery adds the tenant schema name to the headers, so these changes probably somehow broke the headers with custom Task class: https://github.com/getsentry/sentry-python/compare/0.17.6...0.17.7#diff-bbcc458d29b0f1880cf429abbf8a94a5e8517f61072945d783e44fec4969a10bL96
With Celery==4.2.2. I can try to provide a testapp but it will take a couple of weeks until I have time to try it out.
The tasks that were failing were using:
from celery.schedules import crontab
from celery.task import periodic_task
...
def repeating_task(task_func):
run_every_5min = crontab(minute="*/5")
return periodic_task(run_every=run_every_5min)(task_func)
@repeating_task
def run_tasks():
...
I pinned to 0.17.7 and tracing started working again
I have picked this up and I was able to replicate the issue and it seems that the issue occurs with the older / deprecated Task class in celery/task/base.py, which happens to be the base class for PeriodicTask as an example, a common issue with obv
(I suspect the same is occurring with tenants-schema-celery but I can't be sure since I have not tested that myself yet)
https://github.com/celery/celery/blob/4.3/celery/task/base.py#L126-L128
I believe we are only patching the new Task class https://github.com/celery/celery/blob/4.3/celery/app/task.py#L143-L145
Since invoking apply_async on a PeriodicTask for example does not go through our patched apply_async. I am currently looking into this and will keep everyone updated
This issue has gone three weeks without activity. In another week, I will close it.
But! If you comment or otherwise update it, I will reset the clock, and if you label it Status: Backlog or Status: In Progress, I will leave it alone ... forever!
"A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀
It looks like the problem here is still open. @clintmod @arcukwasshka @ur001 @user37596 @ahmedetefy could anyone of you please verify this problem still exists with newest Sentry SDK and some current Celery? Thanks!
This issue has gone three weeks without activity. In another week, I will close it.
But! If you comment or otherwise update it, I will reset the clock, and if you label it Status: Backlog or Status: In Progress, I will leave it alone ... forever!
"A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀
I believe this is still happening
sentry-sdk==1.5.12
celery[redis]>=4.3,<4.4
on celery task with @periodic_task decorator. After apply_async is called within the decorator, it gives me SchedulingError
Couldn't apply scheduled task my_periodic_task: unbound method apply_async() must be called with Task instance as first argument (got TaskType instance instead)
Are there any remediation for this?
Pinning the version to 0.17.7 seems to work for now with this (or using celery[redis] < 4.3) but if you guys have any updates would be much appreciated
@sl0thentr0py, is there a fix planned for this?
Hello @abhishek-choudhury-callhub ! We have now moved this issue up in prioritization and will make a plan on when we can squeeze fixing this in. I will keep you posted!
Hi @antonpirker! Can we have an estimate of when the fix can be deployed?
@abhishek-choudhury-callhub there are multiple problems in this issue
tenant-schemas-celeryand custom Task classes@periodic_taskdecorators, which are deprecated since a long time in celery, so you should just be using the newer APIs
which one are you interested in?
this issue is very old, so please give me exact versions and the problem you are specifically facing.
so I finally spent some time trying to repro this today, but was unable to. @fredbaa since yours was the last comment, I tried this setup in django/celery/redis.
https://github.com/sl0thentr0py/sentry_sample_apps/tree/main/python/django
celery is on 4.3.1 and I used the @periodic_task decorator here. I don't see the error you reported and both exception handling and tracing works correctly.
Let me know how I can repro what you are seeing if you want me to investigate this further.
@sl0thentr0py basically setup 2 celery tasks, one that uses @periodic_task and one that just uses the default @task. Add a call the latter task using apply_async under the periodic task (make sure this is ran on the schedule set, not manually). Is this the setup that you mentioned?
@fredbaa ok tried that too here and it also works, can't repro what you're seeing.
@fredbaa could you send us a minimal project that can reproduce this?
This issue has gone three weeks without activity. In another week, I will close it.
But! If you comment or otherwise update it, I will reset the clock, and if you label it Status: Backlog or Status: In Progress, I will leave it alone ... forever!
"A weed is but an unloved flower." ― Ella Wheeler Wilcox 🥀
Any chance this will be fixed?
please see the discussion above and give me something to reproduce, there is nothing to fix otherwise.
I'll try to make minimal project for repro purposes when I have the time, but so far this PR fixed problem for me: https://github.com/getsentry/sentry-python/pull/1151
So the issue is real.
I too have the run across the same issue. I upgraded sentry and suddenly had all kinds of issues with Tasks. Here is the exception, and I resolved it via the patch in this (PR) commit:
https://github.com/getsentry/sentry-python/pull/1151/commits/3358c4daf80aee3cbede623f6db17bf7026089bd
Message
TypeError: unbound method apply_async() must be called with Task instance as first argument (got TaskType instance instead)
Stack Trace(most recent call first)
App Only
Full
Raw
TypeError: unbound method apply_async() must be called with Task instance as first argument (got TaskType instance instead)
File "django/core/handlers/exception.py", line 41, in inner
response = get_response(request)
File "django/core/handlers/base.py", line 249, in _legacy_get_response
response = self._get_response(request)
File "django/core/handlers/base.py", line 187, in _get_response
response = self.process_exception_by_middleware(e, request)
File "django/core/handlers/base.py", line 185, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "sentry_sdk/integrations/django/views.py", line 79, in sentry_wrapped_callback
return callback(request, *args, **kwargs)
File "django/views/decorators/csrf.py", line 58, in wrapped_view
return view_func(*args, **kwargs)
File "switchboard/views.py", line 2455, in recording_complete
expires=datetime.utcnow() + timedelta(days=1)
File "sentry_sdk/integrations/celery.py", line 127, in apply_async
return f(*args, **kwargs)