coffee-resque
coffee-resque copied to clipboard
Worker PID Cleanup on Boot
Thanks for this awesome module!
One of the things that ruby-resque does at boot is inspect the list of workers redis has, and check if any of them used to run on this
server, and check if they are still running. This is important for knowing the status of your ecosystem (and allowing the resque web interface to continue to make sense)
In our projects, we use the following methods to clean up pids at boot:
var os = require("os");
var exec = require('child_process').exec;
var pidCleanup = function(worker, callback){
var self = this;
var hostname = worker.name;
var redis = worker.redis;
self.getPids('node', function(error, matchedPids){
var hostKey = "resque:worker:" + hostName + "*";
redis.keys(hostKey, function(err, keys){
if(keys.length == 0){
callback();
}else{
var started = 0;
keys.forEach(function(key){
started++;
var pid = parseInt(key.split(":")[3]);
var queues = key.split(":")[4];
if(matchedPids.indexOf(pid) >= 0){
// pid is still running,
process.nextTick(function(){
started--;
if(started == 0){ callback(); }
})
}else{
// pid has crashed or been stopped
console.log("removing previously stopped worker pid from resque: " + pid);
redis.del(key, function(){
var setEntry = hostName + ":" + pid + ":" + queues;
redis.srem(setEntry, function(){
started--;
if(started == 0){ callback(); }
});
});
}
});
}
});
});
};
var getPids = function(matcher, callback){
// It's important here to ALSO look for what normal resque would run `grep resque`
var grepString = 'ps awx | grep "' + String(matcher) + '\\|resque" | grep -v grep';
var child = exec(grepString, function(error, stdout, stderr){
var pids = [];
stdout.split("\n").forEach(function(line){
line = line.trim();
if(line.length > 0){
var pid = parseInt(line.split(' ')[0]);
pids.push(pid);
}
});
callback(error, pids);
});
},
///////////////////
// Boot Sequence //
///////////////////
var worker = require('coffee-resque').connect().worker(queues, processor);
worker.name = os.hostname(); // an important override to match ruby-redis!
worker.pidCleanup(worker, function(){
callback();
});
I ended up with this that not only removes dead workers, but also fails stuck jobs so you can re-queue them using the interface if you want.
os = require("os")
exec = require("child_process").exec
async = require 'async'
pidCleanup = (hostname, redis, pids, callback) ->
hostKey = "resque:worker:" + hostname + "*"
redis.keys hostKey, (err, keys) ->
#console.log err, keys, hostKey
return callback(err) if err
return callback() unless keys.length
workersToDelete = []
keysToDelete = []
jobsToFail = []
processKey = (key, cb) ->
parts = key.split(":")
[prefix, worker, hostname, pid, queues] = parts[..5]
pid = parseInt(pid, 10)
# skip known process ids
return cb() if pid in pids
# delete both the "in-process" job keys as well as the "started" keys
keysToDelete.push key
# Started seems to mean when the worker was created.
# If this key doesn't end with :started, then it means it is the job
# that's in progress.
isStarted = (parts.length == 6 and parts[5] == 'started')
if isStarted
workerKey = "#{hostname}:#{pid}:#{queues}"
workersToDelete.push workerKey
cb()
else
# in-process job that has to be retrieved so we can fail it
redis.get key, (err, ipString) ->
ip = JSON.parse(ipString)
jobName = ip.payload['class']
jobsToFail.push
queue: ip.queue
payload: ip.payload
worker: "#{hostname}:#{pid}:#{queues}"
exception: "Error"
error: "Stuck Job: #{jobName}"
backtrace: [] # is this necessary?
failed_at: new Date()
cb()
async.map keys, processKey, (err) ->
return callback(err) if err
unless keysToDelete.length or workersToDelete.length or jobsToFail.length
return callback()
console.log "keysToDelete", keysToDelete
console.log "workersToDelete", workersToDelete
console.log "jobsToFail", jobsToFail
multi = redis.multi()
for key in keysToDelete
multi.del key
for member in workersToDelete
multi.srem "resque:workers", member
for job in jobsToFail
multi.incr "resque:stat:failed"
jobString = JSON.stringify(job)
multi.lpush "resque:failed", jobString
multi.exec (err, results) ->
return callback(err) if err
callback()
getPids = (matcher, callback) ->
grepString = "ps awx | grep \"#{matcher}\" | grep -v grep"
child = exec grepString, (err, stdout, stderr) ->
pids = []
for line in stdout.split("\n")
line = line.trim()
continue unless line
pid = parseInt(line.split(" ")[0], 10)
pids.push pid
callback err, pids
cleanDeadWorkers = (callback) ->
getPids "resque", (err, pids) ->
return callback(err) if err
# NOTE: pids might contain more than just job queue worker processes, but
# that's OK because it is a whitelist only.
pidCleanup os.hostname(), jobq.redis, pids, callback
if require.main == module
cleanDeadWorkers (err) ->
throw err if err
run()
I guess I should update that we've moved over to our own resque package, and here's what we use for our pid cleanup logic now:
worker.prototype.workerCleanup = function(callback){
var self = this;
self.getPids(function(err, pids){
self.connection.redis.smembers(self.connection.key('workers'), function(err, workers){
workers.forEach(function(w){
var parts = w.split(":");
var host = parts[0]; var pid = parseInt(parts[1]); var queues = parseInt(parts[2]);
if(host === os.hostname() && pids.indexOf(pid) < 0){
(function(w){
self.emit("cleaning_worker", w, pid);
var parts = w.split(":");
var queues = parts.splice(-1, 1);
var pureName = parts.join(':')
self.untrack(pureName, queues);
})(w)
}
});
if(typeof callback == "function"){ callback(); }
});
});
}
worker.prototype.getPids = function(callback){
var child = exec('ps awx | grep -v grep', function(error, stdout, stderr){
var pids = [];
stdout.split("\n").forEach(function(line){
line = line.trim();
if(line.length > 0){
var pid = parseInt(line.split(' ')[0]);
pids.push(pid);
}
});
callback(error, pids);
});
}
and worker.untrack
is
worker.prototype.untrack = function(name, queues, callback) {
var self = this;
self.connection.redis.srem(self.connection.key('workers'), (name + ":" + queues), function(){
self.connection.redis.del([
self.connection.key('worker', name, self.stringQueues()),
self.connection.key('worker', name, self.stringQueues(), 'started'),
self.connection.key('stat', 'failed', name),
self.connection.key('stat', 'processed', name)
], function(err){
if(typeof callback == "function"){ callback(err); }
});
});
};
More details here