django-celery-results
django-celery-results copied to clipboard
PENDING Tasks not added to database
Hi,
new tasks that have been pushed with .delay(*args) do not show up as PENDING in the django-celery-results database (when all workers are offline)
(SUCCESS / FAILURE works just fine)
is this a misconfiguration on my side? I am kinda confused by this.
settings.py
'django_celery_results'
is in installed_apps
##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("myapp.tasks", )
CELERY_TASK_TRACK_STARTED = True
# celery setting.
CELERY_CACHE_BACKEND = 'default'
# django setting.
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
'LOCATION': 'my_cache_table',
}
}
celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
tasks.py
from celery import shared_task
@shared_task(queue='queue1')
def add(x, y):
return x + y
push task
from myapp.tasks import add
add.delay(1,2)
start worker:
celery -A myproject worker -l INFO -Q queue1
As a Workaround, i tried writing a wrapper so when tasks are being pushed to the broker; we manually add the TaskResult object with status=PENDING
this is "kinda" a workaround, but it is problematic to get the tasks name from the AsyncResult.
For this I have seen that there is a celery option mentioned:
result_extended = True
that should save more information it to the AsyncResult._meta
However, the task name is still nowhere to be found in the AsyncResult object
I added result_extended = True
into settings.py
##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("demoapp.tasks", )
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXTENDED = True
# celery setting.
CELERY_CACHE_BACKEND = 'default'
# django setting.
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
'LOCATION': 'my_cache_table',
}
}
and to make sure I also added it in myproject.celery.py
app = Celery('myproject', result_extended=True)
myapp/views.py
from myapp import tasks
from django.views.generic import TemplateView
from django_celery_results.models import TaskResult
# Create your views here.
def call_task(task_name):
return getattr(tasks, task_name)
class CreateTaskView(TemplateView):
template_name = 'myapp/index.html'
extra_context = {}
def get(self, request, *args, **kwargs):
result = call_task('add').delay(1, 2)
result_meta = result._get_task_meta()
print(result_meta)
TaskResult.objects.create(task_id=result.id, task_name=result_meta.get("task_name"), status=result.status)
self.extra_context['result'] = result
return super(CreateTaskView, self).get(request)
@urzbs curious why is not creating the TaskResult
with PENDING status the default behavior when the the delay()
call is successfully created?
Thats exactly what we did in my second comment, however getting the tasks name from the AsyncResult and saving it to the TaskResult is a Problem;
Therefore i hoped that result_extended would work properly; then i could do it as described.
Using inspect or building the task name manually together is not suiable and feels extremly dirty when you have a complex task hierarchy.
For us it its important that certain tasks can never be put in queue twice, for example imagine a task that creates a sha512 checksum of 100TB of data. We need an absolute consistency for the tasks name aswell.
I have same issue
I have same issue
@amirhoseinbidar how did you solve this problem?
I have same issue
@amirhoseinbidar how did you solve this problem?
@barik94 I didn't solve it, instead used of redis cache to log task prograss with task id as key... you can have dedicated model for log if you want observe progress from admin
The filters in the Django Admin listing allow you to select pending tasks. However, they are not really available for selection while they are in the execution queue.
Is this the expected behavior with django-celery-results?
I solved it using celery signals.
from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult
@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
if "task" not in headers:
return
TaskResult.objects.store_result(
"application/json",
"utf-8",
headers["id"],
None,
states.PENDING,
task_name=headers["task"],
task_args=headers["argsrepr"],
task_kwargs=headers["kwargsrepr"],
)
I solved it using celery signals.
from celery import states from celery.signals import before_task_publish from django_celery_results.models import TaskResult @before_task_publish.connect def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs): if "task" not in headers: return TaskResult.objects.store_result( "application/json", "utf-8", headers["id"], None, states.PENDING, task_name=headers["task"], task_args=headers["argsrepr"], task_kwargs=headers["kwargsrepr"], )
should we consider adding to this package?
I solved it using celery signals.
A great solution. Thanks for it! Working perfectly with celery==5.2.7 django-celery-results==2.4.0
I solved it using celery signals.
from celery import states from celery.signals import before_task_publish from django_celery_results.models import TaskResult @before_task_publish.connect def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs): if "task" not in headers: return TaskResult.objects.store_result( "application/json", "utf-8", headers["id"], None, states.PENDING, task_name=headers["task"], task_args=headers["argsrepr"], task_kwargs=headers["kwargsrepr"], )
Thank you. It's useful solution.
I solved it using celery signals.
from celery import states from celery.signals import before_task_publish from django_celery_results.models import TaskResult @before_task_publish.connect def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs): if "task" not in headers: return TaskResult.objects.store_result( "application/json", "utf-8", headers["id"], None, states.PENDING, task_name=headers["task"], task_args=headers["argsrepr"], task_kwargs=headers["kwargsrepr"], )
should we consider adding to this package?
Yes, this would be a really useful feature.
Hi Guys,
Problem:
There seems to be documentation error according to the given link the variable to start track of the task is CELERY_TASK_TRACK_STARTED = True
https://docs.celeryq.dev/en/v5.2.7/userguide/configuration.html#std-setting-task_track_started
Solution:
But the actual variable should be
CELERY_TRACK_STARTED = True
You can also verify the applied config values from below snippet
from project.celery import app
print(app.conf)
Thanks @sandeep7410
The before_task_publish
signal does not fire when invoking a task to run once using django-celery-beat admin. In addition, trying to directly import TaskResult
resulted in a import error indicating that apps haven't finished loading yet. I had better luck with this:
# celery.py
@signals.task_prerun.connect
def create_task_result_on_task_prerun(task_id, task, args=(), kwargs=None, **other):
# trying to import models at the top of the celery file causes an import error in my app
TaskResult = get_task_result_model()
TaskResult.objects.get_or_create(
content_type="application/json",
content_encoding="utf-8",
task_id=task_id,
status=states.PENDING,
task_name=task.name,
task_args=args,
task_kwargs=kwargs if kwargs else {},
)
def get_task_result_model():
if not get_task_result_model.TaskResultModel:
from django_celery_results.models import TaskResult
get_task_result_model.TaskResultModel = TaskResult
return get_task_result_model.TaskResultModel
I solved it using celery signals.
from celery import states from celery.signals import before_task_publish from django_celery_results.models import TaskResult @before_task_publish.connect def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs): if "task" not in headers: return TaskResult.objects.store_result( "application/json", "utf-8", headers["id"], None, states.PENDING, task_name=headers["task"], task_args=headers["argsrepr"], task_kwargs=headers["kwargsrepr"], )
should we consider adding to this package?
Thanks for this solution. It got me off on a great start, but I noticed task_args
and task_kwargs
weren't being encoded. I ended up calling store_result
via DatabaseBackend
rather than TaskResult
to get the encoding logic to happen (among other pieces of missing logic). Below is my tweaked solution (imported in my applications ready()
function).
I agree that something like this should be added to the package, or at least be behind a feature toggle so we don't have to all custom implement varied solutions.
from celery import states
from celery.signals import before_task_publish
from django.conf import settings
from my_app_foo import celery_app
db_result_backend = None
registered_task_names = celery_app.tasks.keys()
def create_task_result_on_publish(sender=None, headers=None, **kwargs): # noqa: ARG001
"""
This is a workaround for an issue where django-celery-results
is not adding PENDING tasks to the database.
# ref: https://github.com/celery/django-celery-results/issues/286
"""
if "task" not in headers or not db_result_backend or sender not in registered_task_names:
return
# essentially transforms a single-level of the headers dictionary
# into an object with properties
request = type('request', (object,), headers)
db_result_backend.store_result(
headers["id"],
None,
states.PENDING,
traceback=None,
request=request,
)
celery_backend = getattr(settings, 'CELERY_RESULT_BACKEND', '')
is_django_celery_installed = 'django_celery_results' in getattr(settings, 'INSTALLED_APPS', [])
if is_django_celery_installed and celery_backend == 'django-db':
# We are good to import DatabaseBackend
from django_celery_results.backends.database import DatabaseBackend
db_result_backend = DatabaseBackend(celery_app)
# And now register the signal
before_task_publish.connect(create_task_result_on_publish, dispatch_uid='create_task_result_on_publish')