pq
pq copied to clipboard
Adding recurring tasks?
Hello,
is it possible to add recurring tasks similarly to a cronjob? I scrolled quickly through the issues and the source code and could not find any indication for that. If this functionality does not exist I'd be interested in easy workaround ideas :thinking: .
The use case is a queue of URLs to be crawled from multiple computers regularly (e.g. everyday at 1 AM). To make things more difficult all URLs are crawled at different times (some at 1 AM, others at 2 AM etc.).
It seems this was implemented in the django-pq library but it is unmaintained and there has not been a commit since 2014. Also your library has been very reliable for me so it'd be nice to use it for this as well :+1:.
Specifications
- Version: 1.9.0
- Python version: 3.8.3
You could use a cronjob to add items to the queue. You can then either use schedule_at to defer execution to a later time or just rely on the cronjob schedule to make the queue item available for work.
@ThibTrip I second @malthe here
Add the logic to self-schedule a job in the future, pq will take care of it (if your job does too many things, create a dispatcher worker). :raised_hands:
Thanks for your quick answers :+1: !
@stas This is the solution I went for :see_no_evil: ... Not very proud of it but it seems to work well. I am unsure whether timezones are well respected with my code though :thinking: . pq does not seem to save the timezone for schedule_at (because I always provide schedule_at as a datetime with UTC timezone).
import datetime
import pytz
from croniter import croniter # pip install croniter
from loguru import logger # pip install loguru
def requeue_job(queue, job, delete_job=False, engine=None):
"""
Requeues a job created with the use of the library pq.
This is a workaround for making recurring tasks.
The job must fullfill the following conditions:
* have a key called "cron" with a valid cronjob expression in its data
* have a datetime for its schedule_at attribute
**WARNING**: we assume the timezone of the datetime is UTC (pq does not
seem to save the TZ information in the attribute schedule_at).
Parameters
----------
queue : pq.Queue
job : pq.Job
delete_job : bool, default False
Whether to delete the job that's been passed (happens after requeuing it).
If True, an engine must be provided.
engine : sqlalchemy.engine.base.Engine or None, default None
Needed if delete_job is True
Examples
--------
>>> import datetime
>>>
>>> # let's assume you have defined a queue somewhere
>>> queue.put(data={'url':'https://www.github.com', 'cron':'0 2 * * *'},
... schedule_at=datetime.datetime.now().astimezone(datetime.timezone.utc))
>>>
>>> job = queue.get()
>>> # do something...
>>> requeue_job(queue=queue, job=job)
"""
# verify data
## type
data = job.data
if not isinstance(data, dict):
raise TypeError(f'Expecting job.data to be dict. Received type {type(data)} instead')
## content
if 'cron' not in data:
raise ValueError('Key "cron" is missing from job.data')
if not job.schedule_at:
raise ValueError('schedule_at must have been provided for the job (it is None)!')
# make sure schedule_at is timezoned
schedule_at = job.schedule_at
if schedule_at.tzinfo is None:
schedule_at = pytz.utc.localize(schedule_at)
# get next schedule based on cron expression
cron, start_time = data['cron'], job.schedule_at
iter = croniter(expr_format=cron, start_time=start_time)
schedule_at = iter.get_next(datetime.datetime)
logger.debug(f'Requeuing job {job.id}. Next execution: {schedule_at} (base_time: {start_time}, cron: {cron})')
# requeue job
new_job_id = queue.put(job.data, schedule_at=schedule_at)
# delete job
if delete_job:
if not engine:
raise ValueError('I need a sqlalchemy engine to drop jobs! You have not provided any')
engine.execute(f'DELETE FROM "{queue.name}" WHERE id=%(job_id)s;', job_id=job.id)
return new_job_id
I think this is a nice solution for the case where you want to schedule a subsequent run only if the queue item is being "worked".