rq-scheduler
rq-scheduler copied to clipboard
Need better documentation how to start Scheduler
I put a simple task in a file (scheduler.py):
from redis import Redis
from rq_scheduler import Scheduler
from datetime import datetime
from jobs import log_info
#scheduler = Scheduler(connection=Redis())
scheduler = Scheduler(connection=Redis(host='localhost', port=6379, db=0))
scheduler.schedule(
scheduled_time=datetime.now(),
func=log_info,
args=[],
kwargs={},
interval=5,
repeat=None
)
When launching rqscheduler it does nothing. The worker is running and executing jobs when executed from the django shell with log_info.delay().
I tried: rqscheduler --host localhost --port 6379 --db 0 rqscheduler -H localhost -p 6379 -d 0 --path /home/foo/scheduler.py rqscheduler -H localhost -p 6379 -d 0 --path /home/foo/
You'll have to put in datetime in UTC. I do agree that the documentation could be very much improved.
Server time is set to UTC so it is the same. How do I link/connect the Scheduler object and rqscheduler? Or asked differently how does rqscheduler know where to find the file containing Scheduler?
Can the scheduler be stopped "safely"? For example, the RQ workers have a warm shutdown so that the workers wait until the currently running task is finished.
Hi,
@ui I am having an issue when stopping the scheduler. When I try to start a new one after stopping it, I get an error: "There's already an active RQ scheduler". Looking at the code, there is a scheduler key stored in Redis... https://github.com/ui/rq-scheduler/blob/master/rq_scheduler/scheduler.py#L32 Could we get more information about how the birth and death of the scheduler work?
Thanks Michael
I'm absolutely swamped this week, will try to address your questions this weekend
@selwin Hi Selwin, any chance you will have a moment soon? Thanks
Sorry for the delay. When the scheduler is started, register_birth() is called and it sets the rq:scheduler
key that automatically expires.
Similar to RQ, this library also intercepts SIGINT
and SIGTERM
as a cue to stop, in which case it clears the rq:scheduler
key.
At the moment, if SIGINT
is received when the scheduler is busy (although the time window is usually very small), there's a chance of data loss. Ways to get around this:
- Use a
--burst
mode as I outlined here. - Add a
status
property toScheduler
so that ifSIGINT
is received when scheduling is in progress, theScheduler
knows to finish its current process before quitting.
I would still be interested how to get rqscheduler process my jobs :-)
Found a combination of the answers in this SO helpful: https://stackoverflow.com/questions/17537771/where-do-i-register-an-rq-scheduler-job-in-a-django-app/32033454#32033454
Short answer: You can put it in apps.py, eg.
.__init__.py
default_app_config = 'whos.apps.WhosAppConfig'
apps.py
from django.apps import AppConfig
from redis import Redis
from rq_scheduler import Scheduler
from datetime import datetime
class ToolsAppConfig(AppConfig):
name = "tools"
def ready(self):
from tools.views import readmail
scheduler = Scheduler(connection=Redis(host='localhost', port=6379, db=0))
scheduler.schedule(
scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
func=readmail, # Function to be queued
interval=60, # Time before the function is called again, in seconds
repeat=10 # Repeat this number of times (None means repeat forever)
)
@phoebebright I have similar configuration what you have suggested. But Mails are going repeatedly I have given 86400 interval i.e 24 hrs still Mails are going every second any help?
@kevart Did you ever get this to work for you? Unfortunately, seems there's no one willing to explain how to get this to actually consume jobs as am getting AttributeError: 'module' object has no attribute 'func'
errors while running rq worker.
In case someone needs an answer to @crazywizard254 comment, this can be caused by your method ( func ) not being included in INSTALLED_APPS. For example, you are trying to call a method in myapp.utils but myapp.utils is not specifically added to INSTALLED_APPS, you probably only have myapp in INSTALLED_APPS.
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime
import logging
import django_rq
from django_rq.management.commands import rqscheduler
scheduler = django_rq.get_scheduler()
log = logging.getLogger(__name__)
def clear_scheduled_jobs():
# Delete any existing jobs in the scheduler when the app starts up
for job in scheduler.get_jobs():
log.debug("Deleting scheduled job %s", job)
job.delete()
def register_scheduled_jobs():
# do your scheduling here
scheduler.schedule(...)
class Command(rqscheduler.Command):
def handle(self, *args, **kwargs):
# This is necessary to prevent dupes
clear_scheduled_jobs()
register_scheduled_jobs()
super(Command, self).handle(*args, **kwargs)
@ipmb Thank you for that! Creating a management command solved all the issues I've been having trying to get this to function!
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime import logging import django_rq from django_rq.management.commands import rqscheduler scheduler = django_rq.get_scheduler() log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up for job in scheduler.get_jobs(): log.debug("Deleting scheduled job %s", job) job.delete() def register_scheduled_jobs(): # do your scheduling here scheduler.schedule(...) class Command(rqscheduler.Command): def handle(self, *args, **kwargs): # This is necessary to prevent dupes clear_scheduled_jobs() register_scheduled_jobs() super(Command, self).handle(*args, **kwargs)
If anyone is coming across this with the same problem, I originally thought @ipmb was saying I needed to run this management command in addition to the ./manage.py rqscheduler
management command, but I realized after reading the code more closely that this replaces the ./manage.py rqscheduler
command, by subclassing it and adding the scheduler.schedule
calls. Elegant solution!
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime import logging import django_rq from django_rq.management.commands import rqscheduler scheduler = django_rq.get_scheduler() log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up for job in scheduler.get_jobs(): log.debug("Deleting scheduled job %s", job) job.delete() def register_scheduled_jobs(): # do your scheduling here scheduler.schedule(...) class Command(rqscheduler.Command): def handle(self, *args, **kwargs): # This is necessary to prevent dupes clear_scheduled_jobs() register_scheduled_jobs() super(Command, self).handle(*args, **kwargs)
are we suppose to put this code in apps.py ? in our app. I'm using docker and there i'm running workers:
rq_worker:
build: .
environment:
REDIS_URL: "redis"
command: python manage.py rqworker src_low src_high src_default
volumes:
- .:/code
depends_on:
- db
- redis
But the problem is function is not calling. here is code:
# app/management/commands/foo.py
import datetime
import logging
import django_rq
from django_task.task_command import TaskCommand
from src.settings import QUEUE_DEFAULT
scheduler = django_rq.get_scheduler(QUEUE_DEFAULT)
log = logging.getLogger(__name__)
def clear_scheduled_jobs():
# Delete any existing jobs in the scheduler when the app starts up
jobs = scheduler.get_jobs()
i = 0
for job in jobs:
# print(job.func_name)
job.delete()
i += 1
print('total:', i)
def test():
with open('test.txt', 'w') as fp:
fp.write('test')
def register_scheduled_jobs():
scheduler.schedule(datetime.datetime.utcnow(), test, interval=60*5)
class Command(TaskCommand):
def add_arguments(self, parser):
super().add_arguments(parser)
# parser.add_argument('test', type=int)
def handle(self, *args, **options):
clear_scheduled_jobs()
register_scheduled_jobs()
besides i'm using django_task
. if i also tried:
docker-compose exec web python manage.py foo
development, mode applying static root
total: 1
but still nothing, function is never called.
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime import logging import django_rq from django_rq.management.commands import rqscheduler scheduler = django_rq.get_scheduler() log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up for job in scheduler.get_jobs(): log.debug("Deleting scheduled job %s", job) job.delete() def register_scheduled_jobs(): # do your scheduling here scheduler.schedule(...) class Command(rqscheduler.Command): def handle(self, *args, **kwargs): # This is necessary to prevent dupes clear_scheduled_jobs() register_scheduled_jobs() super(Command, self).handle(*args, **kwargs)
are we suppose to put this code in apps.py ? in our app. I'm using docker and there i'm running workers:
rq_worker: build: . environment: REDIS_URL: "redis" command: python manage.py rqworker src_low src_high src_default volumes: - .:/code depends_on: - db - redis
But the problem is function is not calling. here is code:
# app/management/commands/foo.py import datetime import logging import django_rq from django_task.task_command import TaskCommand from src.settings import QUEUE_DEFAULT scheduler = django_rq.get_scheduler(QUEUE_DEFAULT) log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up jobs = scheduler.get_jobs() i = 0 for job in jobs: # print(job.func_name) job.delete() i += 1 print('total:', i) def test(): with open('test.txt', 'w') as fp: fp.write('test') def register_scheduled_jobs(): scheduler.schedule(datetime.datetime.utcnow(), test, interval=60*5) class Command(TaskCommand): def add_arguments(self, parser): super().add_arguments(parser) # parser.add_argument('test', type=int) def handle(self, *args, **options): clear_scheduled_jobs() register_scheduled_jobs()
besides i'm using
django_task
. if i also tried:docker-compose exec web python manage.py foo development, mode applying static root total: 1
but still nothing, function is never called.
i was not running django scheduler worker in background. But now it working fine.
rq_default_scheduler:
build: .
environment:
REDIS_URL: "redis"
command: python manage.py rqscheduler --queue src_default
volumes:
- .:/code
depends_on:
- db
- redis
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime import logging import django_rq from django_rq.management.commands import rqscheduler scheduler = django_rq.get_scheduler() log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up for job in scheduler.get_jobs(): log.debug("Deleting scheduled job %s", job) job.delete() def register_scheduled_jobs(): # do your scheduling here scheduler.schedule(...) class Command(rqscheduler.Command): def handle(self, *args, **kwargs): # This is necessary to prevent dupes clear_scheduled_jobs() register_scheduled_jobs() super(Command, self).handle(*args, **kwargs)
If anyone is coming across this with the same problem, I originally thought @ipmb was saying I needed to run this management command in addition to the
./manage.py rqscheduler
management command, but I realized after reading the code more closely that this replaces the./manage.py rqscheduler
command, by subclassing it and adding thescheduler.schedule
calls. Elegant solution!
hmm I am still wrapping my head around this solution; how do you register the custom Command so 'python manage.py rqscheduler' picks it up? I tried to put 'import custom.Command' in init.py and that didn't work
Registering it in a Django AppConfig (or anywhere else in the standard Django import path) is problematic because of #121. I chose to build a custom management command for the scheduler and do the setup there.
import datetime import logging import django_rq from django_rq.management.commands import rqscheduler scheduler = django_rq.get_scheduler() log = logging.getLogger(__name__) def clear_scheduled_jobs(): # Delete any existing jobs in the scheduler when the app starts up for job in scheduler.get_jobs(): log.debug("Deleting scheduled job %s", job) job.delete() def register_scheduled_jobs(): # do your scheduling here scheduler.schedule(...) class Command(rqscheduler.Command): def handle(self, *args, **kwargs): # This is necessary to prevent dupes clear_scheduled_jobs() register_scheduled_jobs() super(Command, self).handle(*args, **kwargs)
If anyone is coming across this with the same problem, I originally thought @ipmb was saying I needed to run this management command in addition to the
./manage.py rqscheduler
management command, but I realized after reading the code more closely that this replaces the./manage.py rqscheduler
command, by subclassing it and adding thescheduler.schedule
calls. Elegant solution!hmm I am still wrapping my head around this solution; how do you register the custom Command so 'python manage.py rqscheduler' picks it up? I tried to put 'import custom.Command' in init.py and that didn't work
tried to import in apps/init.py or manage.py and got 'django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.';
@cyruszhang To use my solution, you'll need to create a custom management command: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/
You would then run that command instead of rqscheduler
as noted here, https://github.com/rq/rq-scheduler/issues/51#issuecomment-536652049
@cyruszhang To use my solution, you'll need to create a custom management command: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/
You would then run that command instead of
rqscheduler
as noted here, #51 (comment)
Got it! thanks @ipmb
can we schedule cron job with result_ttl
scheduler.cron( cron_string, func = function_test, result_ttl = 0 )
Just leaving my comment here because I found this issue useful.
Rather than overriding the scheduler command, I created a separate management command for destroying old scheduled jobs and creating new ones. The new command has almost identical logic to the overridden command class created by @ipmb
The reason I separated it into another management command is because our system scales to multiple running instances of the same application, and each of them run an instance of rqscheduler
. This means that at release time, with the overridden scheduler command, each scheduler could be destroying and creating new jobs at the same time. This could get a little weird if the timing was right, and multiple schedulers created new jobs at the same time (I think that you could end up with multiple redundant scheduled jobs).
With the separate management command that destroys and re-creates all scheduled jobs, I have more control over when (and how many times) that function runs. In my case, I include it in the "release" stage on Heroku, but this could apply to different stages of deployment, depending on what system you are using for application hosting.