ready not firing
using agenda 2.0.2 on ubuntu 14.04
my code;
'use strict';
const winston = require('winston');
const config = require('config');
const glob = require('glob-promise');
const path = require('path');
const Agenda = require('agenda');
const agenda = new Agenda();
module.exports.initialize = async () => {
try {
// setup the connection string.
const connectionString = `mongodb://${config.database.mongodb.host}/${config.database.mongodb.name}`;
// configure the agenda.
agenda
.database(connectionString, 'tasks')
.processEvery(config.tasks.processEvery) // frequency which agenda will query the database for jobs.
.defaultConcurrency(5) // default number of a specific job that can be running at any given moment.
.maxConcurrency(25) // max number of jobs that can be running at any given moment.
.lockLimit(0) // specifies the max number jobs that can be locked at any given moment.
.defaultConcurrency(10) // specifies the default number of a specific that can be running at any given moment
.defaultLockLifetime(600000); // specifies the default lock lifetime in milliseconds.
// find all available tasks.
let tasks = await glob('src/tasks/definitions/**/*.js');
for (let task of tasks) {
require(path.join(__dirname, '../../', task))(agenda); // eslint-disable-line security/detect-non-literal-require
}
winston.info(`[TASK_MANAGER] loaded ${tasks.length} task definititons.`);
agenda.on('ready', async () => { // wait for mongo connection.
await cancelExisting(); // cancel existing jobs.
await agenda.start(); // start the job processor.
await schedule(); // schedule the tasks.
winston.info('[TASK_MANAGER] scheduled jobs..');
});
// listen for agenda initilization event.
agenda.on('error', (err) => {
winston.debug(`[TASK_MANAGER] error occured while initializing agenda: ${err}.`);
});
// listen for started jobs.
agenda.on('start', (job) => {
winston.info(`[${job.attrs.name}] started..`);
});
// listen for failed jops.
agenda.on('fail', (err, job) => {
winston.error(`[${job.attrs.name}] failed - ${err.message}`);
});
// listen for succeeded jobs.
// agenda.on('success', (job) => {
// winston.info(`[${job.attrs.name}] completed..`);
// });
// watch for graceful shutdowns so that currently running / grabbed jobs are abandoned and can be re-grabbed later again.
process.on('SIGTERM', gracefulExit);
process.on('SIGINT', gracefulExit);
} catch (err) {
throw new Error(err);
}
};
let cancelExisting = async () => {
let removed = await agenda.cancel({});
removed
? winston.debug(`[TASK_MANAGER] cleaned ${removed} existing agenda tasks..`)
: winston.debug(`[TASK_MANAGER] no tasks found that needs to be cleaned..`);
};
let schedule = async () => {
try {
// data processor tasks
if (config.tasks.stats.process.enabled && !config.tasks.stats.rescan.enabled) agenda.every(config.tasks.stats.process.every, 'stats:process'); // only stats:process when stats:rescan is disabled.
if (config.tasks.stats.rescan.enabled) agenda.every(config.tasks.stats.rescan.every, 'stats:rescan');
}
} catch (err) {
throw new Error(`Error scheduling agenda tasks - ${err}.`);
}
};
// stop the agenda on graceful exit.
let gracefulExit = async () => {
try {
await agenda.stop();
winston.info('[TASK_MANAGER] stopped task manager..');
} catch (err) {
throw new Error(`Error stopping task manager - ${err}.`);
}
};
and here is the debug output
agenda:processEvery Agenda.processEvery(NaN) +0ms
agenda:maxConcurrency Agenda.maxConcurrency(25) +0ms
agenda:defaultConcurrency Agenda.defaultConcurrency(10) +0ms
agenda:defaultLockLifetime Agenda.defaultLockLifetime(600000) +0ms
(node:7331) DeprecationWarning: current URL string parser is deprecated, and will be removed in a future version. To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.
agenda:database successful connection to MongoDB using collection: [tasks] +0ms
agenda:db_init init database collection using name [tasks] +0ms
agenda:db_init attempting index creation +1ms
agenda:db_init index creation success +10ms
agenda:define job [stats:process] defined with following options:
agenda:define { fn: [AsyncFunction],
agenda:define concurrency: 1,
agenda:define lockLimit: 0,
agenda:define priority: 'high',
agenda:define lockLifetime: 600000,
agenda:define running: 0,
agenda:define locked: 0 } +13ms
agenda:define job [stats:rescan] defined with following options:
agenda:define { fn: [AsyncFunction],
agenda:define concurrency: 1,
agenda:define lockLimit: 0,
agenda:define priority: 'high',
agenda:define lockLifetime: 600000,
agenda:define running: 0,
agenda:define locked: 0 } +2ms
25/12 05:18:42 [7331] - info: [TASK_MANAGER] loaded 12 task definititons.
clearly winston.info('[TASK_MANAGER] scheduled jobs..'); line is not fired...
though it does on my windows setup.
Hey
Looks like you're adding the ready event listener after you do an async operation let tasks = await glob('src/tasks/definitions/**/*.js'); which means you may well miss the event.
If you add the listener first, then you should avoid the race condition.
Hey too, I am struggling too much on missing the ready event in tests cases.
For this kind of event, firing only once at start and useful only as a condition of further operations, can someone list the downsize of using a stateful event handler instead of an event easy to miss ?
I'm talking about a setup like this :
agenda.isReady = false;
agenda.onReady = (listener: (...args: any[]) => void) => {
if (agenda.isReady) return listener();
agenda.on('ready', () => { agenda.isReady = true; listener(); });
};
This way if ready event is already passed, handler synchronously execute.
Could be made a promise auto-resolving instead as well.
Edit: I checked the internals, turns out there is a _ready promise that I can use. This is so much reliable.
The ready event isn't even necessary if you use agenda.mongo(...) and handle the connection yourself :)
Hi,
turns out there is a
_readypromise that I can use. This is so much reliable.
Definitely a hidden gem indeed, thank you so much for having shared it!