django-celery-results icon indicating copy to clipboard operation
django-celery-results copied to clipboard

PENDING Tasks not added to database

Open urzbs opened this issue 3 years ago • 15 comments

Hi,

new tasks that have been pushed with .delay(*args) do not show up as PENDING in the django-celery-results database (when all workers are offline)

(SUCCESS / FAILURE works just fine)

is this a misconfiguration on my side? I am kinda confused by this.

settings.py

'django_celery_results' is in installed_apps



##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("myapp.tasks", )
CELERY_TASK_TRACK_STARTED = True

# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

celery.py

import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

tasks.py

from celery import shared_task

@shared_task(queue='queue1')
def add(x, y):
    return x + y

push task

from myapp.tasks import add
add.delay(1,2)

start worker:

celery -A myproject worker -l INFO -Q queue1

urzbs avatar Feb 08 '22 10:02 urzbs

As a Workaround, i tried writing a wrapper so when tasks are being pushed to the broker; we manually add the TaskResult object with status=PENDING

this is "kinda" a workaround, but it is problematic to get the tasks name from the AsyncResult.

For this I have seen that there is a celery option mentioned: result_extended = True that should save more information it to the AsyncResult._meta

result_extended

However, the task name is still nowhere to be found in the AsyncResult object

I added result_extended = True into settings.py

##### CELERY ######
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_IMPORTS = ("demoapp.tasks", )
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXTENDED = True

# celery setting.
CELERY_CACHE_BACKEND = 'default'

# django setting.
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}

and to make sure I also added it in myproject.celery.py

app = Celery('myproject', result_extended=True) 

myapp/views.py

from myapp import tasks
from django.views.generic import TemplateView
from django_celery_results.models import TaskResult

# Create your views here.

def call_task(task_name):
    return getattr(tasks, task_name)

class CreateTaskView(TemplateView):
    template_name = 'myapp/index.html'
    extra_context = {}

    def get(self, request, *args, **kwargs):
        result = call_task('add').delay(1, 2)
        result_meta = result._get_task_meta()
        print(result_meta)
        TaskResult.objects.create(task_id=result.id, task_name=result_meta.get("task_name"), status=result.status)
        self.extra_context['result'] = result
        return super(CreateTaskView, self).get(request)

urzbs avatar Feb 16 '22 17:02 urzbs

@urzbs curious why is not creating the TaskResult with PENDING status the default behavior when the the delay() call is successfully created?

jezeniel avatar Mar 09 '22 09:03 jezeniel

Thats exactly what we did in my second comment, however getting the tasks name from the AsyncResult and saving it to the TaskResult is a Problem;

Therefore i hoped that result_extended would work properly; then i could do it as described.

Using inspect or building the task name manually together is not suiable and feels extremly dirty when you have a complex task hierarchy.

For us it its important that certain tasks can never be put in queue twice, for example imagine a task that creates a sha512 checksum of 100TB of data. We need an absolute consistency for the tasks name aswell.

urzbs avatar Mar 09 '22 15:03 urzbs

I have same issue

amirhoseinbidar avatar Aug 24 '22 14:08 amirhoseinbidar

I have same issue

@amirhoseinbidar how did you solve this problem?

barik94 avatar Sep 28 '22 09:09 barik94

I have same issue

@amirhoseinbidar how did you solve this problem?

@barik94 I didn't solve it, instead used of redis cache to log task prograss with task id as key... you can have dedicated model for log if you want observe progress from admin

amirhoseinbidar avatar Sep 28 '22 19:09 amirhoseinbidar

The filters in the Django Admin listing allow you to select pending tasks. However, they are not really available for selection while they are in the execution queue.

Is this the expected behavior with django-celery-results?

gustavo-sdo avatar Sep 30 '22 14:09 gustavo-sdo

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

rubimpassos avatar Oct 14 '22 15:10 rubimpassos

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

auvipy avatar Oct 15 '22 04:10 auvipy

I solved it using celery signals.

A great solution. Thanks for it! Working perfectly with celery==5.2.7 django-celery-results==2.4.0

gustavo-sdo avatar Oct 15 '22 13:10 gustavo-sdo

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

Thank you. It's useful solution.

itisnotyourenv avatar Oct 25 '22 10:10 itisnotyourenv

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

Yes, this would be a really useful feature.

ottorei avatar Nov 25 '22 22:11 ottorei

Hi Guys,

Problem: There seems to be documentation error according to the given link the variable to start track of the task is CELERY_TASK_TRACK_STARTED = True https://docs.celeryq.dev/en/v5.2.7/userguide/configuration.html#std-setting-task_track_started

Solution: But the actual variable should be CELERY_TRACK_STARTED = True

You can also verify the applied config values from below snippet

from project.celery import app
print(app.conf)

Thanks @sandeep7410

MohammedAliD avatar Jun 02 '23 06:06 MohammedAliD

The before_task_publish signal does not fire when invoking a task to run once using django-celery-beat admin. In addition, trying to directly import TaskResult resulted in a import error indicating that apps haven't finished loading yet. I had better luck with this:

# celery.py
@signals.task_prerun.connect
def create_task_result_on_task_prerun(task_id, task, args=(), kwargs=None, **other):
    # trying to import models at the top of the celery file causes an import error in my app
    TaskResult = get_task_result_model()
    TaskResult.objects.get_or_create(
        content_type="application/json",
        content_encoding="utf-8",
        task_id=task_id,
        status=states.PENDING,
        task_name=task.name,
        task_args=args,
        task_kwargs=kwargs if kwargs else {},
    )


def get_task_result_model():
    if not get_task_result_model.TaskResultModel:
        from django_celery_results.models import TaskResult

        get_task_result_model.TaskResultModel = TaskResult
    return get_task_result_model.TaskResultModel

jjorissen52 avatar Jul 27 '23 22:07 jjorissen52

I solved it using celery signals.

from celery import states
from celery.signals import before_task_publish
from django_celery_results.models import TaskResult


@before_task_publish.connect
def create_task_result_on_publish(sender=None, headers=None, body=None, **kwargs):
    if "task" not in headers:
        return

    TaskResult.objects.store_result(
        "application/json",
        "utf-8",
        headers["id"],
        None,
        states.PENDING,
        task_name=headers["task"],
        task_args=headers["argsrepr"],
        task_kwargs=headers["kwargsrepr"],
    )

should we consider adding to this package?

Thanks for this solution. It got me off on a great start, but I noticed task_args and task_kwargs weren't being encoded. I ended up calling store_result via DatabaseBackend rather than TaskResult to get the encoding logic to happen (among other pieces of missing logic). Below is my tweaked solution (imported in my applications ready() function).

I agree that something like this should be added to the package, or at least be behind a feature toggle so we don't have to all custom implement varied solutions.

from celery import states
from celery.signals import before_task_publish
from django.conf import settings

from my_app_foo import celery_app


db_result_backend = None
registered_task_names = celery_app.tasks.keys()


def create_task_result_on_publish(sender=None, headers=None, **kwargs):  # noqa: ARG001
    """
    This is a workaround for an issue where django-celery-results
    is not adding PENDING tasks to the database.

    # ref: https://github.com/celery/django-celery-results/issues/286
    """

    if "task" not in headers or not db_result_backend or sender not in registered_task_names:
        return

    # essentially transforms a single-level of the headers dictionary
    # into an object with properties
    request = type('request', (object,), headers)

    db_result_backend.store_result(
        headers["id"],
        None,
        states.PENDING,
        traceback=None,
        request=request,
    )


celery_backend = getattr(settings, 'CELERY_RESULT_BACKEND', '')
is_django_celery_installed = 'django_celery_results' in getattr(settings, 'INSTALLED_APPS', [])

if is_django_celery_installed and celery_backend == 'django-db':
    # We are good to import DatabaseBackend
    from django_celery_results.backends.database import DatabaseBackend

    db_result_backend = DatabaseBackend(celery_app)
    # And now register the signal
    before_task_publish.connect(create_task_result_on_publish, dispatch_uid='create_task_result_on_publish')

mghantous avatar Nov 01 '23 14:11 mghantous