pg-boss icon indicating copy to clipboard operation
pg-boss copied to clipboard

Replace existing queue item by singletonKey instead of discarding the new item?

Open khromov opened this issue 10 months ago • 4 comments

👋 We have a queue that works on incoming news articles. Sometimes, an updated version of an article comes in before we have time to process the old version. Since there is no longer a point in processing an old version of the article, we would like to discard the old entry (as long as it's not in active state already).

We can easily create a singletonKey by article id, but singletonKey appears to discard the new queue item rather than replacing the old one.

I've gone up and down in the docs but I don't see an option to configure this, and I also don't see any way to connect to the pg-boss SQL table directly. Perhaps it would be possible to expose the executeSql so we can easily make a query to check for existing jobs for the same article ID and cancel them? Something like:

 const findQuery = `
    SELECT id FROM pgboss.job 
    WHERE name = 'process-article' 
    AND state = 'created' 
    AND data->>'articleId' = $1`;
  
  // Using pg-boss's direct db access
  const { rows } = await boss.db.executeSql(findQuery, [articleId]);

khromov avatar Feb 25 '25 10:02 khromov

We ended up doing a manual query to solve this:

const articleId = '1234';
const existingJobsQuery = `
  SELECT id 
  FROM pgboss.job 
  WHERE name = $1 
  AND (state = 'created' OR state = 'retry') 
  AND data->>'articleId' = $2
`;

// A normal pg query
const existingJobs = await query(existingJobsQuery, [
  queueName,
  articleId,
]);

if (existingJobs && existingJobs.rows.length > 0) {
  console.log(
    `Found ${existingJobs.rows.length} existing jobs for article ${articleId}. Canceling them.`,
  );

  for (const job of existingJobs.rows) {
    await boss.cancel(job.id);
    console.log(`Canceled job ${job.id} for article ${articleId}`);
  }
}

khromov avatar Feb 28 '25 14:02 khromov

I'm really surprised there is no "replaceJob" feature. Current debounce doesn't look like a real debounce.

aristofun avatar Jul 12 '25 18:07 aristofun

Yes, this debounce method is confusing. The detail of the doc is explicit but the usual behaviour of debouncing is the one described in this ticket, not in the pgboss doc.

Should we, at least, add a replaceJob or sendOrReplace ?

pomarec avatar Sep 30 '25 15:09 pomarec

I'm going for the @khromov solution, helped by #596

const jobs = await boss.getJobsBySingletonKey(name, key)
await boss.cancel(name, jobs.map((j) => j.id))
await boss.send({
  name: name,
  data: data,
  options: {
    startAfter: delay,
    singletonKey: key,
  },
})

pomarec avatar Oct 01 '25 06:10 pomarec