worker
worker copied to clipboard
"Replace functionality" to cronned jobs
Feature description
An option similar to "Jobkey" to recurring jobs (crontab). I.e. the ability to replace an existing job rather than creating a duplicate during heavy loads.
Motivating example
For example when running a recurring job every minute but the next minute passes without the job having time to execute or finish (heavy loads, worker busy with other jobs) a new job is created. This will add up quite quickly and fill up job backlog with duplicates that will slow down the system as time passes.
Supporting development
I [tick all that apply]:
- [ ] am interested in building this feature myself
- [ ] am interested in collaborating on building this feature
- [x] am willing to help testing this feature before it's released
- [ ] am willing to write a test-driven test suite for this feature (before it exists)
- [ ] am a Graphile sponsor ❤️
- [ ] have an active support or consultancy contract with Graphile
I think we could just use job_key to implement this. Currently we pass null as the job_key explicitly:
https://github.com/graphile/worker/blob/95d890cc7277d1823d64579e2a68be97283872b5/src/cron.ts#L146
I'm not sure if this is just because I wasn't expecting this to be needed or if there was something deeper in mind. I can't think of a particular reason not to allow it currently, so if someone wants to take it on making the change there and also extending makeJobForItem/ParsedCronItem to add job key would be where I'd start:
https://github.com/graphile/worker/blob/95d890cc7277d1823d64579e2a68be97283872b5/src/cron.ts#L78-L79
I think I'd add a ?job_key= option to the cron opts.
Am I right in thinking that this would also be covered when/if https://github.com/graphile/worker/pull/294 is merged?
@benjie As a temporary workaround for when it's not yet supported natively, would the following approach be viable?
import { run, parseCronItems, Task } from 'graphile-worker';
// Actual job which I want to be deduplicated and not ran concurrently
const runJob: Task = async () => {
await doSomeActualWork();
};
// Cron-triggered scheduling job, doesn't do much on its own but schedules a "regular" job with jobKey and queueName allowing for deduplication
const scheduleJob: Task = async (_, helpers) => {
await helpers.addJob(
'runJob',
{},
{
maxAttempts: 1,
jobKeyMode:'replace',
queueName: 'test-job-queue',
jobKey: 'testJobKey'
}
);
};
const main = async () => {
const runner = await run({
connectionString: '...',
concurrency: 2,
noHandleSignals: false,
pollInterval: 1000,
taskList: {
scheduleJob,
runJob
},
parsedCronItems: parseCronItems([
{
identifier: 'scheduleJob',
pattern: '* * * * *',
task: 'scheduleJob',
options: {
maxAttempts: 1,
backfillPeriod: 0,
}
}
])
});
await runner.promise;
};
What I'm trying to do above is to try running a job doing doSomeActualWork every minute, but I want to make sure those are never ran concurrently, and if the "actual work" takes longer than one minute to complete I want to skip one invocation and then check if we can run the next minute, eventually running it on the next time, matching the cron expression at which no previously started jobs are already running. So essentially I'm trying to a)ensure no concurrent jobs of the same kind; b) prevent backlog buildup - if it's time to run the job but the previous one is not done yet—skip the current attempt to start it and try the next time as per the cron expression
This should be done now, attached PRs have been merged! 🥳
Indeed!