arq
arq copied to clipboard
Question: cron jobs with parameters
Hi, I wanted to start using arq cron functionality instead of crontab but realized that it's not possible (or I can't find how) to pass parameters to the function that I want to execute.
In crontab, I can declare a couple of cronjobs like:
0 */4 * * * python my_script.py --foo=foo
0 */8 * * * python my_script.py --foo=bar
And I would like to do something similar with ARQ, which I guess I expect to look something like
CronWorkerSettings = {
**BaseWorkerSettings,
'queue_name': settings.queue_cron,
'cron_jobs': [
cron(
my_script_func,
ctx={'foo': 'foo'},
hour={0, 4, 8, 12, 16, 20},
minute=1,
),
cron(
my_script_func,
ctx={'foo': 'bar'},
hour={0, 8, 16},
minute=1,
),
],
}
Maybe instead of ctx, allow to pass params as foo=bar like it's possible in ArqRedis.enqueue_job()
I'm surprised that it's not possible since to me it's a quite common use case. I know
Question: is it possible somehow? Is it somewhere in the roadmap to implement this?
It's pretty simple to implement now with partial.
If that's really not satisfactory, and you wanted to add it as an explicit feature, pr welcome.
Hi. We're still having issues with using partial
We are using arq==0.22 and python version 3.8.7
CronWorkerSettings = {
**BaseWorkerSettings,
'queue_name': 'cron_queue',
'cron_jobs': [
cron(
partial(cron_test, 'shannon'),
minute=1,
),
}
async def cron_test(name):
print(f'hello {name}')
If we use partial we get a TypeError cron test failed, TypeError: cron_test() takes 1 positional argument but 2 were given
If I use keyword arguments I get TypeError: cron_test() got multiple values for argument 'name'
I understand where this error comes from when partial but I know it's not how I am calling partial because this works fine
hello_shannon = partial(cron_test, 'shannon')
asyncio.run(hello_shannon())
So why is the worker trying to pass extra arguments to the function?
okay, you'll need your own partial method, something like
from functools import wraps
def partial(f, *args, **kwargs):
@wraps(f)
def partial_function(ctx):
return f(*args, **kwargs)
return partial_function
I agree though that a proper way to do this would be better.
Guys, it's been a year already, please take part in this issue
Guy, it’s been 4 hours already, please submit your pull request 🙃
For anyone else stumbling across this issue, for async functions it needs to be a bit different:
from functools import wraps
def partial(f, *args, **kwargs):
"""
Wrapper for cron functions to inject corellation_id
"""
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
And
cron_jobs = [
cron(partial(your_task, {}, cid=uuid4().hex), hour={1}, minute={12}),
In my case I'm passing a UUID, but you can of course pass whatever parameters you want.
I tested following situation with partial:
from functools import wraps
def partial(f, *args, **kwargs): """ Wrapper for cron functions to inject corellation_id """
@wraps(f)
async def partial_function(ctx):
return await f(*args, **kwargs)
return partial_function
async def my_test_cron_job(ctx, store_id): print(ctx) print(store_id)
class WorkerSettings: cron_jobs = [cron(partial(my_test_cron_job, {}, store_id='826b2e76-be55-42d0-9022-7f7c26b430ab'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id1'), cron(partial(my_test_cron_job, {}, store_id='33beef9c-dfd3-42b7-a78b-706f29f8fcbe'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id2'), ]
run_worker(WorkerSettings)
Only the last job parameter will printed twice?
{} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe {} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe
What did I wrong or is this an error?