celery-batches icon indicating copy to clipboard operation
celery-batches copied to clipboard

Support success event for batched tasks

Open srinivaas14 opened this issue 3 years ago • 9 comments

HI @clokep , thanks for the package. I'm interested to know a bit more about the class SimpleRequest . As mentioned in the doc string for class SimpleRequest (SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request), but in the code class SimpleRequest has only the subset of properties , compared to the class:~celery.worker.request.Request. So if I want to use some of the methods (ex. send_event ) or properties of the class:~celery.worker.request.Request, what will be the suggested/best way that I can do with the class SimpleRequest

My use case : I want to send the send_event('task-succeeded') after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.

srinivaas14 avatar May 27 '21 08:05 srinivaas14

HI @clokep , thanks for the package.

You're welcome!

I'm interested to know a bit more about the class SimpleRequest . As mentioned in the doc string for class SimpleRequest (SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request), but in the code class SimpleRequest has only the subset of properties , compared to the class:~celery.worker.request.Request.

I recently tried to make this a bit clearer, but the main difference is that Request is not serializable, while SimpleRequest is. (This is necessary because the prefork pool pickles data when sending from the main process to the worker processes -- celery itself only sends a dictionary, not an object, see the code around https://github.com/celery/celery/blob/8d6778810c5153c9e4667eed618de2d0bf72663e/celery/worker/request.py#L317-L382).

It is probably missing a few properties as those aren't necessary to actually execute a task. I'm not sure it really makes sense for this class to have any methods on it though.

So if I want to use some of the methods (ex. send_event ) or properties of the class:~celery.worker.request.Request, what will be the suggested/best way that I can do with the class SimpleRequest

If there's "simple" properties missing we should just add them, that should be straightforward (which one in particular do you need?)

My use case : I want to send the send_event('task-succeeded') after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.

For the send_event use-case, do you know where that gets fired from for a normal Celery task? It sounds like this might just be missing in way tasks get called in celery-batches?

We do fire some events already:

https://github.com/clokep/celery-batches/blob/29d5f56bfa0ed085fff0b3c4808b1bccd6693906/celery_batches/init.py#L305-L309

Celery seems to fire this by calling on_success in one of two places (both places?)

But I'm unsure where the appropriate place to fire success is. Some options spring to mind, but there might be other:

  • Fire the success signal in the callback of the Batch task (like around lines 308-309 in the above).
  • Leave it the task to call the event manually (similar to how we do with mark_as_done).
  • A bigger refactoring where the result of each task request is handled automatically so the Batch task knows if each individual request succeeded or not (this sounds hard and not backwards compatible).

Sorry for the long answer! I need to look up again what those events mean, but I think one of those is a reasonable option. 😄

clokep avatar May 27 '21 12:05 clokep

@clokep , Thanks for your informative answer.

Currently I'm calling request.send_event("task-succeeded", result=response) after saving the request's result in the backend (i.e after mark_as_done).

But I'm also checking the celery code to know how it is calling this event with appropriate data (like task runtime, received_time, start_time etc..).

srinivaas14 avatar May 27 '21 13:05 srinivaas14

Currently I'm calling request.send_event("task-succeeded", result=response) after saving the request's result in the backend (i.e after mark_as_done).

When you say "currently" are you using Celery batches now?

But I'm also checking the celery code to know how it is calling this event with appropriate data (like task runtime, received_time, start_time etc..).

Great. 👍 Hopefully it isn't too hard to make it work.

clokep avatar May 27 '21 16:05 clokep

(I've also retitled this, hopefully it captures what you were going for.)

clokep avatar May 27 '21 17:05 clokep

Hi @clokep , I've checked the celery code related to the following events during the task life-cycle.

  • Received
  • Started
  • Success
  • Failure(with error Traceback )

so to emit these events for the tasks which we were currently processed in the batches, I've done the following updates to task class class Batches(Task)

  • Event : Task Received
# emits task received signal
from celery import signals

def Strategy(self, task, app, consumer):

    def task_message_handler(message, body, ack, reject, callbacks, \*\*kw):
        events = eventer and eventer.enabled
        send_event = eventer and eventer.send
        task_sends_events = events and task.send_events


        signals.task_received.send(sender=consumer, request=request) # emits task received signal
        if task_sends_events:
            send_event(
                "task-received",
                uuid=request.id,
                name=request.name,
                args=request.argsrepr,
                kwargs=request.kwargsrepr,
                root_id=request.root_id,
                parent_id=request.parent_id,
                retries=request.request_dict.get("retries", 0),
                eta=request.eta and request.eta.isoformat(),
                expires=request.expires and request.expires.isoformat(),
            )

  • Event : Started

def flush(self, requests):

    def on_accepted(pid, time_accepted):
        [req.acknowledge() for req in acks_late[False]]
        for req in requests:
            req.start_time = time_accepted
            req.send_event("task-started")  # emits request started event


  • Event : Success & Failure
from time import monotonic
from billiard.einfo import ExceptionInfo

def flush(self, requests):

    def on_return(results):
        """ Handler for the returned value from the task function(i.e consumer of batched requests) """
        if isinstance(results, (list,)):
            [req.acknowledge() for req in acks_late[True]]
            for index, result in enumerate(results):
                request = requests[index]  # input request, which was a part batch
                request_runtime = monotonic() - request.start_time
                if isinstance(result, (Exception)):
                    type_, _, tb = type(result), result, result.__traceback__
                # Using billiard's exception wrapper utility, so to make the exceptions raised by task function
                # to be compatable with celery's `on_failure` handler
                excep = ExceptionInfo((type_, _, tb))
                failed__retval__runtime = (1, excep, request_runtime)
            else:
                failed__retval__runtime = (0, result, request_runtime)
            # emits request success event, which will be propagated by celery
            request.on_success(failed__retval__runtime)


@clokep , can you please review and give the feedback, so I can update the code as per the suggestions. let me know if any further info is required.

srinivaas14 avatar May 31 '21 04:05 srinivaas14

@srinivaas14 The Received and Started changes look pretty reasonable, I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.

It would probably be easier to look at a pull request to really see the changes (and let CI run).

clokep avatar Jun 03 '21 17:06 clokep

Hi @clokep , thanks for the comments.

I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.

These Success/Failure events are for the batched requests that we are executing in our task. These events will be useful to track the state and result of the each request (which is a part of batched request list) in the monitoring tools like Flower. Otherwise request will be in 'Started' state even after we save (i.e mark_as_done) the request's result to results-backend.

It would probably be easier to look at a pull request to really see the changes (and let CI run)

Sure, I'll raise a pull request with the above mentioned changes

srinivaas14 avatar Jun 04 '21 07:06 srinivaas14

Hi @clokep , thanks for the comments.

I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.

These Success/Failure events are for the batched requests that we are executing in our task. These events will be useful to track the state and result of the each request (which is a part of batched request list) in the monitoring tools like Flower. Otherwise request will be in 'Started' state even after we save (i.e mark_as_done) the request's result to results-backend.

Right, I think it is good to send the events, but I think the implementation given above won't work with the way BatchTask needs to be written. It seems to depend on that returning an ordered list of results, but maybe I'm misunderstanding.

clokep avatar Jun 04 '21 18:06 clokep

but I think the implementation given above won't work with the way BatchTask needs to be written. It seems to depend on that returning an ordered list of results, but maybe I'm misunderstanding.

Hi @clokep , Yes you are correct related to the Success/Failure events implementation. Here I'm assuming, that the task will return an ordered list of results. As you mentioned, I'll try to re-implement this to be compatible with the current BatchTask implementation

Apart from this , I've raised a pull request, which has a feature for task-received and task-started events

Let me know If any further info is required

srinivaas14 avatar Jun 07 '21 05:06 srinivaas14