celery-haystack icon indicating copy to clipboard operation
celery-haystack copied to clipboard

Separate Task/Queue for SearchIndex

Open jobelenus opened this issue 8 years ago • 26 comments

I have a lot of real time indexing going on in my application. There is one model that actually has a higher priority of being indexed. But these are a minority of indexing jobs going on in the background. Other jobs are ahead of it in the queue so it sits there waiting.

I can solve this problem by creating a separate route for a specific task of my own creation that will extend your task and do nothing different. The problem is how to get the signal processor to launch that task instead of your default task. I see the optional task path arg and the setting for the task. But the problem is that in enqueue doesn't pass an argument.

I was hoping to avoid copying that method in my own signal processor just to change one line for passing an argument. Could there be a class attribute on the signal processor that represents the task path?

Or do you have a better proposal for solving this specific problem. I can of course actually do the work and submit a PR - just hoping to get some communication here that this is the desired approach and that the PR would be accepted and get released fairly quickly?

jobelenus avatar Jul 27 '16 15:07 jobelenus

@jobelenus I don't have any experience using celery-haystack, but I have plenty using celery. I handle different priority queues in some of my own applications, so I can't imagine it being a huge problem here - I just need to know where to start looking. Can you point me to where the current bottleneck is and where you'd expect the change to take place?

SteveByerly avatar Jul 27 '16 15:07 SteveByerly

Sure. Starting in the signal processor: https://github.com/django-haystack/celery-haystack/blob/develop/celery_haystack/signals.py#L44

And this is the def enqueue_task: https://github.com/django-haystack/celery-haystack/blob/develop/celery_haystack/utils.py#L41

Where def get_update_task(task_path=None): is the method signature (that defaults to the setting: CELERY_HAYSTACK_DEFAULT_TASK).

Let me know if thats confusing

jobelenus avatar Jul 27 '16 15:07 jobelenus

Since celery doesn't support message priority I have to separate out the messages into different queues and then allocate the amount of resources I want to each queue. Correct?

jobelenus avatar Jul 27 '16 21:07 jobelenus

Yes. You'll basically want to send your special indexes to a different queue(s) than the rest so they end up getting processed faster.

I've briefly looked at the code and I'm not sure the best approach. One option would be to define which queue to use on a per-index basis, defaulting to the CELERY_HAYSTACK_QUEUE setting. Perhaps have an attribute or method on CelerySearchIndex that is retrieved in the signal's enqueue_task and passed into enqueue_task as a kwarg.

What were you thinking your approach would be?

SteveByerly avatar Jul 27 '16 22:07 SteveByerly

That is precisely what I was thinking. More than happy to do the work. Just wanted to make sure thats an approach you all want to support/accept into the project

On Wed, Jul 27, 2016 at 6:08 PM, Steve Byerly [email protected] wrote:

Yes. You'll basically want to send your special indexes to a different queue(s) than the rest so they end up getting processed faster.

I've briefly looked at the code and I'm not sure the best approach. One option would be to define the queue to use on a per-index basis, defaulting to the CELERY_HAYSTACK_QUEUE setting. Perhaps have an attribute or method on CelerySearchIndex that is retrieved in the signal's enqueue_task and passed into enqueue_task as a kwarg.

What were you thinking your approach would be?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/django-haystack/celery-haystack/issues/59#issuecomment-235736952, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEPpSfJ_t41ISq8okCevLY6ZQSLonntks5qZ9b5gaJpZM4JWUti .

jobelenus avatar Jul 27 '16 22:07 jobelenus

I think it's a reasonable addition. Go for it.

SteveByerly avatar Jul 27 '16 22:07 SteveByerly

Hrmm, this is odd... while testing I'm getting this trace. I've forked off master and made my changes, but something seems to be not quite write with the app config? The celery_haystack is still in INSTALLED_APPS, I'm not sure what I'm missing here:

File "/srv/lenspro/env/local/lib/python2.7/site-packages/celery_haystack/utils.py", line 36, in enqueue_task
if settings.CELERY_HAYSTACK_QUEUE:
out: AttributeError: 'Settings' object has no attribute 'CELERY_HAYSTACK_QUEUE'

Edit: I thought I figured out what was going on and imported the celery_haystack conf settings in my own application task, but it didn't quite do the trick:

from celery_haystack.tasks import CeleryHaystackSignalHandler
from celery_haystack.conf import settings
class CustomerCeleryHaystackSignalHandler(CeleryHaystackSignalHandler):
    pass

I can see that CeleryHaystackSignalHandler.using is filled in, but the setting it is filled in with (settings.CELERY_HAYSTACK_DEFAULT_ALIAS) doesn't exist. Where is the magic I am missing?

Edit2: Actually even without that settings import my own task CustomerCeleryHaystackSignalHandler.using is filled in. I wonder what would have changed that the utils.py file now has bad settings? I don't think it was anything I changed, you can see my commit here: https://github.com/jobelenus/celery-haystack/commit/425f5bef8e6bc1356d89f4369f243368401a52f7

I happen to be in Django 1.9.6 if that makes a difference, I see the travis config has 1.9

jobelenus avatar Jul 28 '16 17:07 jobelenus

It looks like an issue with app loading and your new tasks. When i run celery_haystack locally, I can import the settings fine.

If you add CELERY_HAYSTACK_QUEUE setting in your own settings, what happens? If it works fine, then I think something needs to be loaded/moved.

SteveByerly avatar Jul 28 '16 19:07 SteveByerly

I can do from celery_haystack.conf import settings but that settings dict doesn't have any CELERY_X.... settings in it. Where is the magic that loads the CeleryHaystack AppConf into the settings? Because that is what it seems I am missing/cannot find/don't see anywhere else in the project.

When I had celery_haystack installed via pip from pypi there wasn't a problem. I branched from master, is there something on master that's wrong and I should be using something else?

jobelenus avatar Jul 28 '16 19:07 jobelenus

I believe it gets loaded when django does its magic app loading. I believe Celery also does some loading of tasks too.

What happens when you manually add the fully qualified setting (CELERY_HAYSTACK_QUEUE) to conf.py? Does it work/any new error?

SteveByerly avatar Jul 28 '16 19:07 SteveByerly

Well of course that will work, if i add the setting -- but that isn't really a solution to the problem, and there are at least 3 settings that the utils.py file needs. What is really weird is that the indexes.py file seems to work fine with the app loading, but not utils. Thats what confuses me the most about this

jobelenus avatar Jul 28 '16 19:07 jobelenus

I thought perhaps this would fix it, but it doesn't https://github.com/jobelenus/celery-haystack/commit/2479424f326f97e6752a9c8fee4c14bb74ef3a53 (e.g. that the relative import was using my apps settings file instead of the celery_haystack)

jobelenus avatar Jul 28 '16 19:07 jobelenus

Can you post the new code you added to your own project? Either here or a gist?

SteveByerly avatar Jul 28 '16 19:07 SteveByerly

Sure, its really not much. I'm not even using the new feature yet.

lenspro.tasks.py:

+from celery_haystack.tasks import CeleryHaystackSignalHandler
+class CustomerCeleryHaystackSignalHandler(CeleryHaystackSignalHandler):
+    pass

lenspro.settings.py:

CELERY_ROUTES = {
...
+    'lenspro.tasks.CustomerCeleryHaystackSignalHandler': {'queue': 'customerindex'}
}

lenspro.search_indexes.py: this already existed

class CustomerIndex(CelerySearchIndex, indexes.Indexable):
    text = indexes.CharField(document=True, use_template=True)
    customer_id = indexes.IntegerField(model_attr='id', null=False)

    def get_model(self):
        return User

When I create a new User is when I see the error. I didn't even put the task_path attribute into my Index. So there is something going on here. I'm installing it with my requirements.txt file with this entry git+https://github.com/jobelenus/celery-haystack.git@searchindex-taskpath

jobelenus avatar Jul 28 '16 19:07 jobelenus

Why are you using a custom signal handler if the goal is to define the task queues on the searchindex?

Then instead of defining the routes, you can define the queues, though i don't think you have to - they'll get added by celery auto-magically:

CELERY_QUEUES = (
    Queue('default', Exchange('default', durable=True), routing_key='default'),
    Queue('priority', Exchange('priority', durable=True), routing_key='priority'),
)

SteveByerly avatar Jul 28 '16 19:07 SteveByerly

So I can do this: {'queue': 'customerindex'} Because right now all the indexing jobs go into the CeleryHaystackSignalHandler task. And since all the model index updates go into there I cannot prioritize any of them -- they are all the same. So I want to move one model out into a different Queue. And I can do that through the CELERY_ROUTES setting.

But even if I did't make that class (I'm not even using it yet) this has a setup problem: celery_haystack/utils.py cannot find the settings that ought to be setup via celery_haystack/conf.py through the Django AppConfig system. I'm running everything here through manage.py shell_plus so the AppConfig system is loading things. What am I missing with regards to the setup here?

The weird part is that celery_haystack/signals.py finds those settings just fine. Stumped

jobelenus avatar Jul 28 '16 19:07 jobelenus

Based on our initial comments, and this change: https://github.com/jobelenus/celery-haystack/commit/425f5bef8e6bc1356d89f4369f243368401a52f7

You would just add task_path = priority into your CustomerIndex

Then, the current signal handler will use the priority queue to handle that task. Does that sounds correct?

SteveByerly avatar Jul 28 '16 19:07 SteveByerly

No. priority is not a valid task_path. Task path is a dot string pointing to a celery task: https://github.com/django-haystack/celery-haystack/blob/develop/celery_haystack/utils.py#L14

So my index change would be:

class CustomerIndex(CelerySearchIndex, indexes.Indexable):
    task_path = 'lenspro.tasks.CustomerCeleryHaystackSignalHandler'

jobelenus avatar Jul 28 '16 19:07 jobelenus

ah, right. i was thinking it was much simpler.

Either way, I still think it's an issue with app loading between Django, Celery, and AppConf. Celery wants to create the celery app/tasks, Django wants to import all the install apps, then AppConf needs to be loaded at some point. I'm assuming there wouldn't be a problem if this project was not depending on AppConf.

My earlier point about adding the setting that's throwing the AttributeError directly to the file was to show that everything runs w/o AppConf.

SteveByerly avatar Jul 28 '16 20:07 SteveByerly

If I do not import CeleryHaystackSignalHandler and create my subclass in my tasks file it works without an error. But the message doesn't get routed to a different celery queue (e.g. it still goes to the default queue with all the other celery_haystack index jobs) which means I cannot prioritize these specific ones.

Why would importing in my app cause a problem within the celery_haystack app?

jobelenus avatar Jul 28 '16 20:07 jobelenus

OK -- small update: I did not realize that the conf.py file was using an outside appconf (i read it quickly and thought it was the Django AppConfig), egg on me. The issue is the application loading lifecycle. So I created an apps.py file that will load the conf.py file: https://github.com/jobelenus/celery-haystack/blob/d1aacadde8b2d4f04c4941be0d0a8d6a685c5595/celery_haystack/apps.py and changed my INSTALLED_APPS to reference the new AppConfig rather than the module.

I also needed to change my own tasks file for application loading: I was loading my tasks in my __init__.py file, and moved that to apps.py as well. This makes all the settings work perfectly fine. However, it raises a different problem further down in celery, which I have submitted here: https://github.com/celery/celery/issues/3349

jobelenus avatar Jul 29 '16 16:07 jobelenus

Don't forget to add the following (specific per-app) that you're adding an apps.py file to.

default_app_config = 'celery_haystack.apps.CeleryHaystackAppConfig'

https://docs.djangoproject.com/en/1.9/ref/applications/#configuring-applications This may fix your celery issue too.

SteveByerly avatar Jul 29 '16 17:07 SteveByerly

Right, I went with the (now Django preferred) of INSTALLED_APPS = ['celery_haystack.apps.CeleryHaystackAppConfig',] in this case, but definitely for the project settings in __init__.py I would have to add that

jobelenus avatar Jul 29 '16 17:07 jobelenus

@jobelenus so you got everything up and running? Are there any other changes needed for the PR?

SteveByerly avatar Jul 31 '16 17:07 SteveByerly

OK -- I finally figured out my other problem. I will polish this up and submit a PR

jobelenus avatar Aug 04 '16 16:08 jobelenus

Sorry its been a while here, was dealing with other fallout. I noticed that I can't squash the commits for the PR I want to submit (its much cleaner that way). Can you guys allow that options? If not thats OK. But I want to send you the PR.

jobelenus avatar Aug 29 '16 21:08 jobelenus