feature request: worker's job filter
In order to create a QOS system it would be great to support a filter option by the work() method.
This new parameter lets the user to write to a single queue/table and 2 different workers will be able to process the same queue at different pace:
- eg: queue
icecream - worker1 consumes
icecreamwith jobtaste: ciocohas a worker that process it withnewJobCheckInterval: 200 - worker1 consumes
icecreamwith jobtaste: bubblehas a worker that process it withnewJobCheckInterval: 2000
❗️ It is up to the user writing a filter that process all the jobs, otherwise the job with taste: lemon will be archived automatically.
Example:
const PgBoss = require('pg-boss');
(async function () {
try {
await buildConsumer();
} catch (error) {
console.log({ globalErr: error.message });
}
})();
async function buildConsumer () {
const boss = new PgBoss({
user: 'postgres',
password: 'postgres',
noScheduling: true,
});
await boss.work(
queueName,
{
teamSize: 2,
newJobCheckInterval: 1000,
filter: { // 🚀
jobFilter: `data ->> 'body' = $1`,
jobParams: ['body'],
}
},
executeJob
);
console.log('Waiting for jobs');
}
This should generate a query like this in the fetch function:
https://github.com/timgit/pg-boss/blob/523c36ba7bf285456c2358b710f13d8d77d34b3a/src/plans.js#L353
WITH nextJob as (
SELECT id
FROM pgboss.job j
WHERE state < 'active'
AND name LIKE $1
AND startAfter < now()
+ AND data ->> 'body' = $3
ORDER BY priority desc, createdOn, id
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE pgboss.job j SET
state = 'active',
startedOn = now(),
retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE j.id = nextJob.id
RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds
Note:
- this query should land to the
executeSqlfunction when the user customizes thedbtoo
What do you think?
You can already do this using wildcards. Set all workers to a wildcard by default, such as icecream.*, then, you can replace the wildcard with a specific flavor
Sorry, I don't get it. Does the producer that send the message must know the consumer's queue in di case?
My target would be having a silly producer that does not know how many consumers the BE has
Queue patterns use the * character to match 0 or more characters. For example, a job from queue status-report-12345 would be fetched with pattern status-report-* or even stat*5.
For example, a producer would use the flavor as part of the queue name, such as icecream.vanilla and icecream.chocolate. A consumer using work('icecream.*') would get both flavors, but another consumer using work('icecream.vanilla') would not get chocolate.
The proposed solution assumes that I have control over the producer - it is not the case 😞
Would you mind to accept a PR with such a feature in case?
Yes, sounds good
This would be useful. I didn't see a test case for work('icecream.*.toppings.none') sort of queue layouts.
Since wildcards have been dropped in v10, my original response is no longer valid. The most feasible option seems to be some sort of additional opt-in filter like originally proposed. The primary issue in using the data payload is fetch performance, since the data column is not indexed. I don't think indexing data is a good idea globally, since the payload could be large, and this would impact write performance as well.
However, this doesn't mean it's a deal-breaker. Now that queues are isolated via partitioning, this opens up the possibility of a diversity of indexing strategies since indexes in queue A wouldn't have to match indexes in queue B. One way to pull this off would be add a new option in createQueue().