framework
framework copied to clipboard
Keep track of celery task timings
Feature description
With the issue that certain task results are disappearing due to Memcached (current resultbackend) being volatile, it might be useful for the framework to keep track of certain tasks itself. We would still leverage on celery and the resultbackend for result but would be able to add more information about the task like timings, state, ... by capturing the events
Might prove to be helpful for https://github.com/openvstorage/framework/issues/1726 too
Some testing around I managed to get something working with prints, so here it is for later usage:
import datetime
taskId_startTime = {}
taskId_addedTime = {}
# Required changes for celery monitor:
# celery.conf.CELERY_SEND_EVENTS = True
# celery.conf.CELERY_SEND_TASK_SENT_EVENT = True
class CeleryMonitor(object):
def __init__(self, celery_app):
self.celery_app = celery_app
def monitor(self):
state = self.celery_app.events.State()
def announce_task_succeeded(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print "runtime: ", task.runtime
print "time of completion: ", datetime.datetime.now()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (task.name, task.uuid, task.info(),))
def announce_task_received(event):
print 'received'
state.event(event)
task = state.tasks.get(event['uuid'])
print 'Got me task matey: ', task.uuid
def annouce(event):
print 'Announcing'
state.event(event)
# task = state.tasks.get(event['uuid'])
# print 'Elloh, ', task.uuid
with self.celery_app.connection() as connection:
handlers = {'task-succeeded': announce_task_succeeded,
'task-failed': announce_failed_tasks,
'task-received': announce_task_received}
# handlers = {'*': annouce}
recv = self.celery_app.events.Receiver(connection, handlers=handlers)
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
from ovs.celery_run import celery
celery_monitor = CeleryMonitor(celery)
celery_monitor.monitor()