arq icon indicating copy to clipboard operation
arq copied to clipboard

Question: cron jobs with parameters

Open nterevinto-adjust opened this issue 3 years ago • 7 comments

Hi, I wanted to start using arq cron functionality instead of crontab but realized that it's not possible (or I can't find how) to pass parameters to the function that I want to execute.

In crontab, I can declare a couple of cronjobs like:

0 */4 * * * python my_script.py --foo=foo
0 */8 * * * python my_script.py --foo=bar

And I would like to do something similar with ARQ, which I guess I expect to look something like

CronWorkerSettings = {
    **BaseWorkerSettings,
    'queue_name': settings.queue_cron,
    'cron_jobs': [
        cron(
            my_script_func,
			ctx={'foo': 'foo'}, 
            hour={0, 4, 8, 12, 16, 20},
            minute=1,
        ),
        cron(
            my_script_func,
			ctx={'foo': 'bar'},
            hour={0, 8, 16},
            minute=1,
        ),
    ],
}

Maybe instead of ctx, allow to pass params as foo=bar like it's possible in ArqRedis.enqueue_job()

I'm surprised that it's not possible since to me it's a quite common use case. I know

Question: is it possible somehow? Is it somewhere in the roadmap to implement this?

nterevinto-adjust avatar Sep 09 '21 13:09 nterevinto-adjust

It's pretty simple to implement now with partial.

If that's really not satisfactory, and you wanted to add it as an explicit feature, pr welcome.

samuelcolvin avatar Sep 09 '21 16:09 samuelcolvin

Hi. We're still having issues with using partial

We are using arq==0.22 and python version 3.8.7

CronWorkerSettings = {
    **BaseWorkerSettings,
    'queue_name': 'cron_queue',
    'cron_jobs': [
        cron(
            partial(cron_test, 'shannon'),
            minute=1,
        ),
}

async def cron_test(name):
    print(f'hello {name}')

If we use partial we get a TypeError cron test failed, TypeError: cron_test() takes 1 positional argument but 2 were given

If I use keyword arguments I get TypeError: cron_test() got multiple values for argument 'name'

I understand where this error comes from when partial but I know it's not how I am calling partial because this works fine

hello_shannon = partial(cron_test, 'shannon')
asyncio.run(hello_shannon())

So why is the worker trying to pass extra arguments to the function?

s-quinn avatar Sep 14 '21 07:09 s-quinn

okay, you'll need your own partial method, something like

from functools import wraps

def partial(f, *args, **kwargs):
    @wraps(f)
    def partial_function(ctx):
        return f(*args, **kwargs)
    
    return partial_function

I agree though that a proper way to do this would be better.

samuelcolvin avatar Sep 14 '21 07:09 samuelcolvin

Guys, it's been a year already, please take part in this issue

DenysMoskalenko avatar Aug 26 '22 19:08 DenysMoskalenko

Guy, it’s been 4 hours already, please submit your pull request 🙃

JonasKs avatar Aug 27 '22 00:08 JonasKs

For anyone else stumbling across this issue, for async functions it needs to be a bit different:

from functools import wraps

def partial(f, *args, **kwargs):
    """
    Wrapper for cron functions to inject corellation_id
    """

    @wraps(f)
    async def partial_function(ctx):
        return await f(*args, **kwargs)

    return partial_function

And

        cron_jobs = [
            cron(partial(your_task, {}, cid=uuid4().hex), hour={1}, minute={12}),

In my case I'm passing a UUID, but you can of course pass whatever parameters you want.

waza-ari avatar Dec 14 '22 19:12 waza-ari

I tested following situation with partial:

from functools import wraps

def partial(f, *args, **kwargs): """ Wrapper for cron functions to inject corellation_id """

@wraps(f)
async def partial_function(ctx):
    return await f(*args, **kwargs)

return partial_function

async def my_test_cron_job(ctx, store_id): print(ctx) print(store_id)

class WorkerSettings: cron_jobs = [cron(partial(my_test_cron_job, {}, store_id='826b2e76-be55-42d0-9022-7f7c26b430ab'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id1'), cron(partial(my_test_cron_job, {}, store_id='33beef9c-dfd3-42b7-a78b-706f29f8fcbe'), hour=at_every_x_hour(1), minute=at_every_y_minutes(1), run_at_startup=True, job_id='id2'), ]

run_worker(WorkerSettings)

Only the last job parameter will printed twice?

{} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe {} 33beef9c-dfd3-42b7-a78b-706f29f8fcbe

What did I wrong or is this an error?

fmatten avatar Feb 14 '24 14:02 fmatten