django-q
django-q copied to clipboard
Multiple Queues ?
Hello !
I had a look on Django-Q and didn't find anything about multiple queues ? Does it exist, or is it in the roadmap ?
Nice work by the way.
Currently there are no multiple queues. Although we do use it in testing by setting the list_key
variable, it is not possible for the workers to read from multiple queues based on priority. I'm not a fan of adding features, just for the sake of a named feature. So if you can describe a good use case scenario for what you want, I'll be happy to take a look at it and add it to the roadmap.
Of course: I need to use async tasks for 2 things:
- Compute long-time treatment functions (let's call it A)
- Send e-mails.
But if there're in the queue 10 A functions, the emails wouldn't be sent before minutes/hours.
But if you see another way to do this, I'm listening to you.
This is fairly easy to accomplish, it's just that it would also require to separate worker clusters for each queue. In fact most of the code is already in place to do this, I would just have make it a little bit more accessible.
It gets complicated when you have only one cluster working from several queues. Of course you can tell the workers to always pull from the higher priority queue first, but in your case it might happen that all workers are tied up doing long calculations on queue A, so the high priority queue still has to wait.
Dual queue, dual clusters:
from django_q.brokers import get_broker
from django_q.tasks import async
from django_q.cluster import Cluster
broker_A = get_broker('Priority_A')
broker_B = get_broker('Priority_B')
# queue to Priority_A
async('math.floor', 1.5, broker=broker_A)
# queue to Priority_B
async('math.copysign', 1, -1, broker=broker_B)
# start cluster A
c_a = Cluster(broker_A).start()
# start cluster B
c_b = Cluster(broker_B).start()
Of course this code will get stuck at running the first cluster, cause they need to be started in separate threads. That would just need a minor change to management command to make this work.
Thank you ! I'll test it and tell you.
That works perfectly, I just created a new command called "qclusters":
from optparse import make_option
from django.core.management.base import BaseCommand
from django.utils.translation import ugettext as _
from django_q.cluster import Cluster
from django_q.brokers import get_broker
from my_project.task_managers import clusters_names
class Command(BaseCommand):
# Translators: help text for qcluster management command
help = _("Starts MyProject Clusters.")
option_list = BaseCommand.option_list + (
make_option('--run-once',
action='store_true',
dest='run_once',
default=False,
help='Run once and then stop.'),
)
def handle(self, *args, **options):
for c in clusters_names:
q = Cluster(get_broker(c))
q.start()
if options.get('run_once', False):
q.stop()
clusters_names is defined like this
clusters_names = ['email', 'A']
I'm happy that worked out. I've been using separate queues for parallel testing mostly, until now. Let me think a bit on how to integrate this into the project. You're running this on the same machine, but I can imagine other people might want a separate cluster instance per queue.
Probably an optional --queue
keyword for the qcluster
command, which can take both a single queue name or a list, would be the way to go.
I'm following the project on GitHub, and if you update anything about it, I would be happy to test it !
@nicolastobo I tried using your code but it seems if I run python manage.py qclusters
(your function)
The processes can no longer be stopped by using ctrl + c
@Koed00 I noticed sinds Django 1.8 the option_list
is deprecated and should override the add_arguments
function
def add_arguments(self, parser):
parser.add_argument(
'--run-once',
action='store_true',
dest='run_once',
default=False,
help='Run once and then stop.')
Just wondering if there was any progress on this? I was going to implement the solution @nicolastobo posted but I noticed #137 (scheduled tasks run once per cluster) and I'm worried there might be other side effects.
Would love to see this solved as I've got the same problem (long running tasks block higher priority tasks in the queue) and I don't want to go back to Celery (!)
Perhaps a simpler solution would be to allow users to optionally assign task priority as an integer, which causes to worker to always execute the highest priority tasks first? Would be happy to submit a PR.
I'm also looking at this, my use case is splitting tasks between 'immediate' tasks triggered by Django post_save signals, which I was looking to use Redis for, and less 'urgent' things such as daily scheduled tasks and other things where the additional visibility that using the ORM broker gives me is useful and the performance overheads are not an issue.
Also looking for a way to achieve this. My main use case is that I have lots of tasks which are quick to execute (sending emails, sending SMS, etc.) and which I need to have processed ASAP, and then a few other tasks that are very long running (5 minutes or so). It's possible that a large number of these long running tasks can be queued up at the same time, and I want to avoid having all my workers stuck processing these long running tasks and preventing all of the other smaller tasks from running until after all of the long running tasks are processed.
Also looking for a good way to do this. Same use-case as others.
I'm also looking for a feature to handle this, its certainly a valid use case else you have to try scaling it out with workers which eventually stops working if they can't process the tasks fast enough.
I also had the requirement to have two queues. One for quick notification tasks and one for long running management tasks. I wanted them both to use the Django ORM broker.
I created a model that has a one to one with django_q.OrmQ. This stores a name. A NamedBroker only processes tasks with its own name.
To run each cluster:
python manage.py qcluster --settings settings.notification
python manage.py qcluster --settings settings.management
Here is the code:
# models.py
class OrmQExtension(models.Model):
"""Extends OrmQ with a name field"""
orm_q = models.OneToOneField("django_q.OrmQ", on_delete=models.CASCADE, related_name="extension")
name = models.CharField("name", max_length=255)
def __str__(self):
return self.name
# broker.py
from time import sleep
from django.utils import timezone
from django_q.brokers.orm import ORM, _timeout
from django_q.conf import Conf
class NamedBroker(ORM):
def queue_size(self) -> int:
return (
self.get_connection()
.filter(key=self.list_key, lock__lte=_timeout(), extension__name=self.name)
.count()
)
def lock_size(self) -> int:
return (
self.get_connection()
.filter(key=self.list_key, lock__gt=_timeout(), extension__name=self.name)
.count()
)
def purge_queue(self):
return (
self.get_connection()
.filter(key=self.list_key, extension__name=self.name)
.delete()
)
def enqueue(self, task):
from some_app.models import OrmQExtension # noqa
package = self.get_connection().create(
key=self.list_key, payload=task, lock=_timeout()
)
OrmQExtension.objects.create(orm_q=package, name=self.name)
return package.pk
def dequeue(self):
tasks = self.get_connection().filter(
key=self.list_key, lock__lt=_timeout(), extension__name=self.name
)[0:Conf.BULK]
if tasks:
task_list = []
for task in tasks:
if (
self.get_connection()
.filter(id=task.id, lock=task.lock, extension__name=self.name)
.update(lock=timezone.now())
):
task_list.append((task.pk, task.payload))
# else don't process, as another cluster has been faster than us on that task
return task_list
# empty queue, spare the cpu
sleep(Conf.POLL)
class NotificationBroker(NamedBroker):
name = "notification"
class ManagementBroker(NamedBroker):
name = "management"
# Create a task
from django_q.tasks import async_task
async_task('some.task.function', broker=NotificationBroker())
# Create a schedule
from django_q.tasks import schedule
schedule(
func="django.core.management.call_command",
args="('some_command',)",
name="Some name",
schedule_type=Schedule.DAILY,
q_options={"broker": ManagementBroker()}
)
# settings/base.py
...
Q_CLUSTER = {
"name": "DjangoORM",
...
"orm": "default",
}
# settings/notification.py
from .base import * # noqa
Q_CLUSTER.update({"broker_class": "some_app.broker.NotificationBroker"})
# settings.management.py
from .base import * # noqa
Q_CLUSTER.update({"broker_class": "some_app.broker.ManagementBroker"})
I ran into issues while overriding the ORM
class until I added my new instance variables to __getstate__
and __setstate__
. Just a heads up for anyone trying this approach to get multiple ORM Queues.
Hi everyone, I'm not sure if my problems can be solved with background tasks due to I'm new to background tasks. My problems is I want to create a task per users (the task is importing data that consume a lot of time). Please let me know if you have any updates on this, thank you very much.
Is there not a simple way to this this? Such as:
async_task('tasks.send_email', label=emails)
async_task('tasks.process_image', label=images)
python manage.py qcluster --label=emails
python manage.py qcluster --label=images
Why is this not a thing? It seems like basic functionality to not want every task in the same processing queue (especially if you have 1 server processing 1 queue and another processing another queue)
(Note @allcaps code works fine, but this seems like a ton of extra work to do something so basic)
Also it would be nice to have the ability to have a qcluster that processes all labels, or a list of multiple labels (or even all labels except, etc...)...basically the same way as the laravel queues work.
Would love the functionality that @Shadercloud mentioned above. Really handy for splitting up multiple tasks into different servers, with long running power hungry tasks running on a high powered machine, and smaller/shorter tasks running on a lower powered machine.
@Koed00 Are you still interested in this functionality?