django-q2
django-q2 copied to clipboard
A couple of questions
Hello, I've been using django-q2 on my first ever django project, been a treat so far, so much easier to setup than celery, thanks for this package!
I have a few questions I'd like to ask
-
Is there a way for me to make the hook (callback) fucntion to run in the main process and not in worker process?
-
Is there a way for me to check if a task exist before it completed via the task Id? When I use get_task I noticed that it returns a Task object only if the task was completed.
-
Follow up on 2 Is it possible to get the status of the task? (in queue and if so at what position, and if its currently running)
-
Is there a way for me to specify on which cluster and queue to run an async_task?
-
Can I set priority for a task? For example if i have 100 tasks waiting and some are less important than others, can I give them priority?
-
Can I have a single task wait for multiple queues and let the first one that is available to pick them up?
-
Can I limit the results ttl in the DB for an x amount of time?
-
When a the timeout is reached, does the worker kill the task? Or will it keep running but just won't wait or store the results?
-
Can I limit resource usage for each worker? For example if the max RAM I want each worker to be able to use is 4 GB, and only 15% of cpu, can I achive that somehow?
I know that's alot of questions, but I want to know if these features exist before I try to find workarounds which will probably be way less efficient than ones that are built in to the package.
Thanks in advance!!
- Not really. When you create a task, you get an
id
and with thatid
you can poll the task until it is completed: https://django-q2.readthedocs.io/en/master/tasks.html#django_q.result. The worker runs in a different process and that can't trigger something in another process. - The task id isn't saved in the database until it's done running, so no.
- No, not right now.
- Yes, you can pass
q_options={'cluster': 'something'}
in the async task to push it to a specific cluster. - No, tasks are ran in the order they are put in the queue (FIFO). As an alternative, you could create multiple clusters and use one cluster for high priority tasks.
- No, I don't think that's possible
- No, you can only limit based on the amount.
- It will kill the task.
- You can't set hard limits, but for RAM, you can set a max after which the worker will restart: https://django-q2.readthedocs.io/en/master/configure.html#max-rss
Okay, imma try to implement some of this my self as an additional layer, can I share the results here to get suggestions on how to improve? Suggestions from somone who maintain or collab with the package would be amazing...
Sure, I can take a look, no problem.
Okay, so this is a work in progress but... this is what ive came up with so far
from django_q.tasks import Task, async_task, result as q_result
from django.conf import settings
from time import time, sleep
import asyncio
from threading import Thread
from typing import Any, Callable
from django_q.humanhash import uuid
import traceback
from readerwriterlock import rwlock
from asgiref.sync import async_to_sync, sync_to_async
from contextlib import asynccontextmanager
import aiofiles
class QueueIsFull(Exception):
"""Raised when the queue is full and attempting to add a new task."""
pass
class BadConfiguration(Exception):
"""Raised when the MyTask initialization is incorrect."""
pass
def thread_safe_iterator(iterable) -> Any:
"""
Thread-safe iterator.
"""
with Manager.read_lock:
length = len(iterable)
current = 0
while current < length:
try:
yield iterable[current]
current += 1
except IndexError:
break
@asynccontextmanager
async def get_lock(lock: rwlock.RWLockFairD._aReader | rwlock.RWLockFairD._aWriter) -> Any:
"""
Get lock asynchronously.
"""
await sync_to_async(lock.acquire, thread_sensitive=False)()
try:
yield
finally:
lock.release()
class MyTask:
"""The task wrapper."""
def __init__(self, func, _kwargs: dict, *args, **kwargs):
"""
Initialize a task.
Args:
func (callable): The function to be executed as a task.
_kwargs (dict): Additional keyword arguments for task configuration.
*args: Positional arguments to be passed to the function.
**kwargs: Additional keyword arguments to be passed to the function.
Raises:
BadConfiguration: If task initialization is incorrect or missing required parameters.
"""
if _kwargs is None:
_kwargs = dict()
self.date_created = time()
self.start_time: None | float = None
self.end_time: None | float = None
self.func: Callable = func
self.args: tuple = args
self.kwargs: dict = kwargs
self.task: Task | None = None
self.task_id: str | None = kwargs.pop("task_id", None)
self.priority: int = _kwargs.get("priority", 0)
self.ttl: int = _kwargs.get("ttl", Manager.ttl)
self.callback: Callable | None = _kwargs.get("threaded_hook", None)
self.result: Any = None
self.status: str = "pending"
self.check_auth: bool = _kwargs.get("check_auth", True)
self.by_user_pk: int | None = _kwargs.get("user_pk", None)
self.is_auth: bool = _kwargs.get("is_auth", False)
self.by_session: str | None = _kwargs.get("session_key", None)
self.timeout: int = _kwargs.get("timeout", Manager.num_timeout)
cleanup = _kwargs.get("cleanup", None)
self.cleanup: Callable | None = cleanup if callable(cleanup) else None
self.cleaned = False
if hasattr(self.func, "cleanup") and callable(self.func.cleanup) and self.cleanup is None:
self.cleanup = self.func.cleanup
if self.check_auth and not self.is_auth and self.by_session is None:
raise BadConfiguration("Task must be authenticated.")
if self.func is None or not callable(self.func):
raise BadConfiguration("Task must have a function.")
if self.task_id is None:
raise BadConfiguration("Task must have a task id.")
if self.cleanup is not None and not callable(self.cleanup):
raise BadConfiguration("Cleanup must be a callable.")
async def get_result(self) -> Any:
"""
Get a task result asynchronously.
Returns:
Any: The result of the task.
"""
while self.status != "completed":
if self.start_time is not None and time() - self.start_time > self.timeout or self.status == "timeout":
return -1
await asyncio.sleep(Manager.task_tick)
return self.result
def is_submitted_by(self, user_pk: int | None, session_key: str) -> bool:
"""
Check if the task is submitted by the user or session.
Args:
user_pk (int): The user primary key.
session_key (str): The session key.
Returns:
bool: True if the task is submitted by the user or session, False otherwise.
"""
return self.is_auth and self.by_user_pk == user_pk or self.by_session == session_key
class Manager:
"""
Task manager for handling tasks.
Attributes:
num_workers (int): Number of worker processes for task execution.
Num_timeout (int): Timeout duration for tasks in seconds.
Manager_tick (int | float): Interval for manager tick in seconds.
Task_tick (float): Interval for task tick in seconds.
Max_queue (int): Maximum limit for the task queue.
Ttl (int): Time to live for task results in seconds.
Delete_from_db (bool): Flag to determine if tasks should be deleted from the database after completion.
Free_workers (int): Number of available worker threads.
Jobs (dict): Dictionary to store task objects.
Pending_tasks (list): List of pending tasks.
Running_tasks (list): List of running tasks.
Completed_tasks (list): List of completed tasks.
Is_running (bool): Flag to indicate if the manager is running.
Read_write_lock (rwlock): Reader-writer lock for thread-safe access.
Read_lock (rwlock._aReader): Reader lock instance.
Write_lock (rwlock._aWriter): Writer lock instance.
If deployed with multiple workers of the asgi server, the manager will be initiated for each worker and be divided
by the number of workers you set in the settings.Q_CLUSTER['web_workers'].
"""
web_instances = settings.Q_CLUSTER.get("web_workers", 1)
num_workers: int = settings.Q_CLUSTER.get("workers", 4) // web_instances
if num_workers < 1:
num_workers = 1
num_timeout: int = settings.Q_CLUSTER.get("timeout", 80)
manager_tick: int | float = settings.Q_CLUSTER.get("manager_tick", 0.3)
task_tick: float = settings.Q_CLUSTER.get("task_tick", 0.7)
max_queue: int = settings.Q_CLUSTER.get("queue_limit", 50) // web_instances
if max_queue < 10:
max_queue = 10
ttl: int = settings.Q_CLUSTER.get("result_ttl", 4)
delete_from_db: bool = settings.Q_CLUSTER.get("delete_from_db", True)
free_workers: int = num_workers + 1
jobs: dict = dict()
pending_tasks: list = []
running_tasks: list = []
completed_tasks: list = []
is_running: bool = False
read_write_lock: rwlock = rwlock.RWLockFairD()
read_lock: rwlock.RWLockFairD._aReader = read_write_lock.gen_rlock()
write_lock: rwlock.RWLockFairD._aWriter = read_write_lock.gen_wlock()
need_sort = False
@staticmethod
async def add_task(func: Callable, _kwargs: dict, *args, **kwargs) -> str:
"""
Add a task to the queue.
Args:
func (callable): The function to be executed as a task.
_kwargs (dict): Additional keyword arguments for task configuration.
*args: Positional arguments to be passed to the function.
**kwargs: Additional keyword arguments to be passed to the function.
_keyword (dict, optional):
priority (int): Set the priority of the task, higher == more important.
Ttl (int, optional): Set the time to live of the task results. Defaults to `Manager.ttl`.
Threaded_hook (callable, optional): A callback function to run in the main process in a new thread once the task is done.
Check_auth (bool, optional): Set if the task should check for authentication, if session_key or user_pk must be provided.
User_pk (int, optional): The user primary key. Required if `check_auth` is True.
Session_key (str, optional): The session key. Required if `check_auth` is True and `user_pk` is not provided.
Is_auth (bool, optional): Set if the task is authenticated.
Timeout (int, optional): Set the task timeout. Defaults to `Manager.num_timeout`.
Cleanup (callable, optional): A cleanup function to run after the task completes.
Raises:
QueueIsFull: If the queue is full and unable to add a new task.
BadConfiguration: If task initialization is incorrect or missing required parameters.
Returns:
str: The unique identifier of the added task.
"""
await asyncio.sleep(0)
async with get_lock(Manager.read_lock):
if len(Manager.pending_tasks) + len(Manager.running_tasks) >= Manager.max_queue:
raise QueueIsFull("Queue is full, try again later...")
if not Manager.is_running:
Manager.initiate_manager()
tag = uuid()
task_id = tag[1]
kwargs["task_id"] = task_id
_kwargs["task_id"] = task_id
kwargs["tag"] = tag
task = MyTask(func, _kwargs, *args, **kwargs)
async with get_lock(Manager.write_lock):
Manager.pending_tasks.append(task)
Manager.need_sort = True
Manager.jobs[task_id] = task
return task_id
@staticmethod
def sort_tasks() -> None:
"""Sort tasks by priority."""
with Manager.write_lock:
Manager.pending_tasks.sort(key=lambda x: (x.priority, x.date_created))
@staticmethod
def run_tasks() -> None:
"""
Run tasks in the queue.
"""
while Manager.pending_tasks and Manager.free_workers > 0:
with Manager.write_lock:
task = Manager.pending_tasks.pop(0)
async_task(task.func, *task.args, **task.kwargs, timeout=task.timeout, )
task.status = "running"
if not Manager.free_workers == 1:
task.start_time = time()
with Manager.write_lock:
Manager.running_tasks.append(task)
Manager.free_workers -= 1
@staticmethod
async def remove_task(task_id: str, session_key: str, user_pk: int | None) -> None:
"""
Remove a task from the queue.
Args:
task_id (str): The unique identifier of the task.
session_key (str): The session key of the user.
user_pk (int): The primary key of the user.
"""
task = Manager.jobs.get(task_id, None)
if task is None:
return
if task.status == "running":
return
if task.check_auth and not task.is_submitted_by(user_pk, session_key):
return
if task.cleanup is not None and not task.cleaned:
task.cleaned = True
if hasattr(task.func, "is_cleaned_up"):
task.is_cleaned_up = True
Thread(target=task.cleanup, daemon=True).start()
async with get_lock(Manager.write_lock):
try:
Manager.pending_tasks.remove(task)
except ValueError:
pass
Manager.jobs.pop(task_id)
@staticmethod
def check_tasks() -> None:
"""
Check tasks status.
- Completed tasks are moved to the completed tasks list.
- Timed out tasks are moved to the completed tasks list.
- Completed tasks are removed from the completed tasks list after time to live expires.
"""
for task in thread_safe_iterator(Manager.running_tasks):
actual_task = Task.get_task(task.task_id)
current_time = time()
if actual_task is not None and actual_task.stopped:
task.end_time = current_time
task.result = actual_task.result
with Manager.write_lock:
Manager.completed_tasks.append(task)
task.status = "completed"
Manager.running_tasks.remove(task)
Manager.free_workers += 1
if task.callback is not None and callable(task.callback):
Thread(target=task.callback, args=(task.result,), daemon=True).start()
elif task.start_time is not None and current_time - task.start_time > task.timeout:
task.end_time = current_time
with Manager.write_lock:
Manager.completed_tasks.append(task)
task.status = "timeout"
Manager.running_tasks.remove(task)
Manager.free_workers += 1
elif task.start_time is None and Manager.free_workers > 1:
task.start_time = current_time
current_time = time()
expired_tasks = [task for task in thread_safe_iterator(Manager.completed_tasks) if current_time - task.end_time > task.ttl]
for task in expired_tasks:
if task.cleanup is not None and not task.cleaned:
task.cleaned = True
if hasattr(task.func, "is_cleaned_up"):
task.is_cleaned_up = True
Thread(target=task.cleanup, daemon=True).start()
with Manager.write_lock:
Manager.completed_tasks.remove(task)
Manager.jobs.pop(task.task_id)
Manager.remove_from_db(task.task_id)
@staticmethod
def initiate_manager(is_threaded: bool = False) -> None:
"""
Initiate the task manager.
This function should be called only once.
The manager runs in a separate thread.
Args:
is_threaded (bool, optional): Indicates whether the manager is running in a separate thread. Defaults to False.
"""
if Manager.is_running:
return
if not is_threaded:
Thread(target=Manager.initiate_manager, args=(True,)).start()
return
Manager.is_running = True
while True:
Manager.run_tasks()
Manager.check_tasks()
if Manager.need_sort:
Manager.sort_tasks()
Manager.need_sort = False
sleep(Manager.manager_tick)
@staticmethod
async def get_task(task_id: str, session_key: str, user_pk: int | None) -> MyTask | int:
"""
Get a task by task ID.
Args:
task_id (str): The unique identifier of the task.
session_key (str): The session key of the user.
user_pk (int): The primary key of the user.
Returns:
Union[MyTask, int]: The task object if found, or an error code (-1 or -2) indicating the reason if not found.
"""
async with get_lock(Manager.read_lock):
task = Manager.jobs.get(task_id, None)
if task is None:
return -1
if task.check_auth and not task.is_submitted_by(user_pk, session_key):
return -2
return task
@staticmethod
async def get_task_results(task: MyTask) -> Any:
"""
Get task results asynchronously.
Args:
task (MyTask): The task object.
Returns:
Any: The result of the task.
"""
return await task.get_result()
@staticmethod
async def get_queue_position(task: MyTask) -> int:
"""
Get task position in the queue asynchronously.
Args:
task (MyTask): The task object.
Returns:
int: The position of the task in the queue.
"""
async with get_lock(Manager.read_lock):
try:
return Manager.pending_tasks.index(task)
except:
return -1
@staticmethod
def remove_from_db(task_id: str) -> None:
"""
Remove a task from the database.
Args:
task_id (str): The unique identifier of the task.
"""
try:
Task.objects.filter(id=task_id).delete()
except:
print("Error while removing from the database:\n", traceback.format_exc())
ive yet to implement or fine tune some methods, but this is my general direction, any input will be appricated!
Sure, I can take a look, no problem.
okay, so i updated a few times, i think i need a little work on my lock and timing, but i feel like this is a good enough of a start the reason i made it all async is because i use asgi instead of wsgi would love feedback :-)