framework icon indicating copy to clipboard operation
framework copied to clipboard

Keep track of celery task timings

Open JeffreyDevloo opened this issue 7 years ago • 0 comments

Feature description

With the issue that certain task results are disappearing due to Memcached (current resultbackend) being volatile, it might be useful for the framework to keep track of certain tasks itself. We would still leverage on celery and the resultbackend for result but would be able to add more information about the task like timings, state, ... by capturing the events

Might prove to be helpful for https://github.com/openvstorage/framework/issues/1726 too

Some testing around I managed to get something working with prints, so here it is for later usage:

import datetime
taskId_startTime = {}
taskId_addedTime = {}

# Required changes for celery monitor:
# celery.conf.CELERY_SEND_EVENTS = True
# celery.conf.CELERY_SEND_TASK_SENT_EVENT = True

class CeleryMonitor(object):

    def __init__(self, celery_app):
        self.celery_app = celery_app

    def monitor(self):
        state = self.celery_app.events.State()

        def announce_task_succeeded(event):
            state.event(event)
            task = state.tasks.get(event['uuid'])
            print "runtime: ", task.runtime
            print "time of completion: ", datetime.datetime.now()

        def announce_failed_tasks(event):
            state.event(event)
            # task name is sent only with -received event, and state
            # will keep track of this for us.
            task = state.tasks.get(event['uuid'])

            print('TASK FAILED: %s[%s] %s' % (task.name, task.uuid, task.info(),))

        def announce_task_received(event):
            print 'received'
            state.event(event)
            task = state.tasks.get(event['uuid'])
            print 'Got me task matey: ', task.uuid

        def annouce(event):
            print 'Announcing'
            state.event(event)
            # task = state.tasks.get(event['uuid'])
            # print 'Elloh, ', task.uuid

        with self.celery_app.connection() as connection:
            handlers = {'task-succeeded': announce_task_succeeded,
                        'task-failed': announce_failed_tasks,
                        'task-received': announce_task_received}
            # handlers = {'*': annouce}
            recv = self.celery_app.events.Receiver(connection, handlers=handlers)
            recv.capture(limit=None, timeout=None, wakeup=True)


if __name__ == '__main__':
    from ovs.celery_run import celery
    celery_monitor = CeleryMonitor(celery)
    celery_monitor.monitor()

JeffreyDevloo avatar Nov 16 '17 12:11 JeffreyDevloo