Replace existing queue item by singletonKey instead of discarding the new item?
👋 We have a queue that works on incoming news articles. Sometimes, an updated version of an article comes in before we have time to process the old version. Since there is no longer a point in processing an old version of the article, we would like to discard the old entry (as long as it's not in active state already).
We can easily create a singletonKey by article id, but singletonKey appears to discard the new queue item rather than replacing the old one.
I've gone up and down in the docs but I don't see an option to configure this, and I also don't see any way to connect to the pg-boss SQL table directly. Perhaps it would be possible to expose the executeSql so we can easily make a query to check for existing jobs for the same article ID and cancel them? Something like:
const findQuery = `
SELECT id FROM pgboss.job
WHERE name = 'process-article'
AND state = 'created'
AND data->>'articleId' = $1`;
// Using pg-boss's direct db access
const { rows } = await boss.db.executeSql(findQuery, [articleId]);
We ended up doing a manual query to solve this:
const articleId = '1234';
const existingJobsQuery = `
SELECT id
FROM pgboss.job
WHERE name = $1
AND (state = 'created' OR state = 'retry')
AND data->>'articleId' = $2
`;
// A normal pg query
const existingJobs = await query(existingJobsQuery, [
queueName,
articleId,
]);
if (existingJobs && existingJobs.rows.length > 0) {
console.log(
`Found ${existingJobs.rows.length} existing jobs for article ${articleId}. Canceling them.`,
);
for (const job of existingJobs.rows) {
await boss.cancel(job.id);
console.log(`Canceled job ${job.id} for article ${articleId}`);
}
}
I'm really surprised there is no "replaceJob" feature. Current debounce doesn't look like a real debounce.
Yes, this debounce method is confusing. The detail of the doc is explicit but the usual behaviour of debouncing is the one described in this ticket, not in the pgboss doc.
Should we, at least, add a replaceJob or sendOrReplace ?
I'm going for the @khromov solution, helped by #596
const jobs = await boss.getJobsBySingletonKey(name, key)
await boss.cancel(name, jobs.map((j) => j.id))
await boss.send({
name: name,
data: data,
options: {
startAfter: delay,
singletonKey: key,
},
})