arq
arq copied to clipboard
Question: cron jobs with parameters
Hi, I wanted to start using arq
cron functionality instead of crontab but realized that it's not possible (or I can't find how) to pass parameters to the function that I want to execute.
In crontab, I can declare a couple of cronjobs like:
0 */4 * * * python my_script.py --foo=foo
0 */8 * * * python my_script.py --foo=bar
And I would like to do something similar with ARQ, which I guess I expect to look something like
CronWorkerSettings = {
**BaseWorkerSettings,
'queue_name': settings.queue_cron,
'cron_jobs': [
cron(
my_script_func,
ctx={'foo': 'foo'},
hour={0, 4, 8, 12, 16, 20},
minute=1,
),
cron(
my_script_func,
ctx={'foo': 'bar'},
hour={0, 8, 16},
minute=1,
),
],
}
Maybe instead of ctx, allow to pass params as foo=bar like it's possible in ArqRedis.enqueue_job()
I'm surprised that it's not possible since to me it's a quite common use case. I know
Question: is it possible somehow? Is it somewhere in the roadmap to implement this?
It's pretty simple to implement now with partial
.
If that's really not satisfactory, and you wanted to add it as an explicit feature, pr welcome.
Hi. We're still having issues with using partial
We are using arq==0.22 and python version 3.8.7
CronWorkerSettings = {
**BaseWorkerSettings,
'queue_name': 'cron_queue',
'cron_jobs': [
cron(
partial(cron_test, 'shannon'),
minute=1,
),
}
async def cron_test(name):
print(f'hello {name}')
If we use partial we get a TypeError cron test failed, TypeError: cron_test() takes 1 positional argument but 2 were given
If I use keyword arguments I get TypeError: cron_test() got multiple values for argument 'name'
I understand where this error comes from when partial but I know it's not how I am calling partial because this works fine
hello_shannon = partial(cron_test, 'shannon')
asyncio.run(hello_shannon())
So why is the worker trying to pass extra arguments to the function?
okay, you'll need your own partial
method, something like
from functools import wraps
def partial(f, *args, **kwargs):
@wraps(f)
def partial_function(ctx):
return f(*args, **kwargs)
return partial_function
I agree though that a proper way to do this would be better.
Guys, it's been a year already, please take part in this issue
Guy, it’s been 4 hours already, please submit your pull request 🙃
For anyone else stumbling across this issue, for async functions it needs to be a bit different:
from functools import wraps
def partial(f, *args, **kwargs):
"""
Wrapper for cron functions to inject corellation_id
"""
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
And
cron_jobs = [
cron(partial(your_task, {}, cid=uuid4().hex), hour={1}, minute={12}),
In my case I'm passing a UUID, but you can of course pass whatever parameters you want.
I tested following situation with partial:
from functools import wraps
def partial(f, *args, **kwargs): """ Wrapper for cron functions to inject corellation_id """
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
async def my_test_cron_job(ctx, store_id): print(ctx) print(store_id)
class WorkerSettings: cron_jobs = [cron(partial(my_test_cron_job, {}, store_id='826b2e76-be55-42d0-9022-7f7c26b430ab'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id1'), cron(partial(my_test_cron_job, {}, store_id='33beef9c-dfd3-42b7-a78b-706f29f8fcbe'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id2'), ]
run_worker(WorkerSettings)
Only the last job parameter will printed twice?
{} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe {} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe
What did I wrong or is this an error?