celery-batches
celery-batches copied to clipboard
Support success event for batched tasks
HI @clokep , thanks for the package. I'm interested to know a bit more about the class SimpleRequest
. As mentioned in the doc string for class SimpleRequest
(SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request
), but in the code class SimpleRequest
has only the subset of properties , compared to the class:~celery.worker.request.Request
.
So if I want to use some of the methods (ex. send_event ) or properties of the class:~celery.worker.request.Request
, what will be the suggested/best way that I can do with the class SimpleRequest
My use case : I want to send the send_event('task-succeeded')
after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.
HI @clokep , thanks for the package.
You're welcome!
I'm interested to know a bit more about the
class SimpleRequest
. As mentioned in the doc string forclass SimpleRequest
(SimpleRequest generally should have the same properties as :class:~celery.worker.request.Request
), but in the codeclass SimpleRequest
has only the subset of properties , compared to the class:~celery.worker.request.Request
.
I recently tried to make this a bit clearer, but the main difference is that Request
is not serializable, while SimpleRequest
is. (This is necessary because the prefork pool pickles data when sending from the main process to the worker processes -- celery itself only sends a dictionary, not an object, see the code around https://github.com/celery/celery/blob/8d6778810c5153c9e4667eed618de2d0bf72663e/celery/worker/request.py#L317-L382).
It is probably missing a few properties as those aren't necessary to actually execute a task. I'm not sure it really makes sense for this class to have any methods on it though.
So if I want to use some of the methods (ex. send_event ) or properties of the class:
~celery.worker.request.Request
, what will be the suggested/best way that I can do with theclass SimpleRequest
If there's "simple" properties missing we should just add them, that should be straightforward (which one in particular do you need?)
My use case : I want to send the
send_event('task-succeeded')
after processing and saving(in result backend) the result of the each request(which I received from the list of batched requests), so Flower (monitoring tool) or any other event listener can mark that the request was successfully processed. Other wise the request is in pending state only.
For the send_event
use-case, do you know where that gets fired from for a normal Celery task? It sounds like this might just be missing in way tasks get called in celery-batches?
We do fire some events already:
https://github.com/clokep/celery-batches/blob/29d5f56bfa0ed085fff0b3c4808b1bccd6693906/celery_batches/init.py#L305-L309
Celery seems to fire this by calling on_success
in one of two places (both places?)
But I'm unsure where the appropriate place to fire success is. Some options spring to mind, but there might be other:
- Fire the success signal in the callback of the
Batch
task (like around lines 308-309 in the above). - Leave it the task to call the event manually (similar to how we do with
mark_as_done
). - A bigger refactoring where the result of each task request is handled automatically so the
Batch
task knows if each individual request succeeded or not (this sounds hard and not backwards compatible).
Sorry for the long answer! I need to look up again what those events mean, but I think one of those is a reasonable option. 😄
@clokep , Thanks for your informative answer.
Currently I'm calling request.send_event("task-succeeded", result=response)
after saving the request's result in the backend (i.e after mark_as_done).
But I'm also checking the celery code to know how it is calling this event with appropriate data (like task runtime, received_time, start_time etc..).
Currently I'm calling
request.send_event("task-succeeded", result=response)
after saving the request's result in the backend (i.e after mark_as_done).
When you say "currently" are you using Celery batches now?
But I'm also checking the celery code to know how it is calling this event with appropriate data (like task runtime, received_time, start_time etc..).
Great. 👍 Hopefully it isn't too hard to make it work.
(I've also retitled this, hopefully it captures what you were going for.)
Hi @clokep , I've checked the celery code related to the following events during the task life-cycle.
- Received
- Started
- Success
- Failure(with error Traceback )
so to emit these events for the tasks which we were currently processed in the batches, I've done the following updates to task class class Batches(Task)
- Event : Task Received
# emits task received signal
from celery import signals
def Strategy(self, task, app, consumer):
def task_message_handler(message, body, ack, reject, callbacks, \*\*kw):
events = eventer and eventer.enabled
send_event = eventer and eventer.send
task_sends_events = events and task.send_events
signals.task_received.send(sender=consumer, request=request) # emits task received signal
if task_sends_events:
send_event(
"task-received",
uuid=request.id,
name=request.name,
args=request.argsrepr,
kwargs=request.kwargsrepr,
root_id=request.root_id,
parent_id=request.parent_id,
retries=request.request_dict.get("retries", 0),
eta=request.eta and request.eta.isoformat(),
expires=request.expires and request.expires.isoformat(),
)
- Event : Started
def flush(self, requests):
def on_accepted(pid, time_accepted):
[req.acknowledge() for req in acks_late[False]]
for req in requests:
req.start_time = time_accepted
req.send_event("task-started") # emits request started event
- Event : Success & Failure
from time import monotonic
from billiard.einfo import ExceptionInfo
def flush(self, requests):
def on_return(results):
""" Handler for the returned value from the task function(i.e consumer of batched requests) """
if isinstance(results, (list,)):
[req.acknowledge() for req in acks_late[True]]
for index, result in enumerate(results):
request = requests[index] # input request, which was a part batch
request_runtime = monotonic() - request.start_time
if isinstance(result, (Exception)):
type_, _, tb = type(result), result, result.__traceback__
# Using billiard's exception wrapper utility, so to make the exceptions raised by task function
# to be compatable with celery's `on_failure` handler
excep = ExceptionInfo((type_, _, tb))
failed__retval__runtime = (1, excep, request_runtime)
else:
failed__retval__runtime = (0, result, request_runtime)
# emits request success event, which will be propagated by celery
request.on_success(failed__retval__runtime)
@clokep , can you please review and give the feedback, so I can update the code as per the suggestions. let me know if any further info is required.
@srinivaas14 The Received and Started changes look pretty reasonable, I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.
It would probably be easier to look at a pull request to really see the changes (and let CI run).
Hi @clokep , thanks for the comments.
I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.
These Success/Failure events are for the batched requests that we are executing in our task. These events will be useful to track the state and result of the each request (which is a part of batched request list) in the monitoring tools like Flower. Otherwise request will be in 'Started' state even after we save (i.e mark_as_done
) the request's result to results-backend.
It would probably be easier to look at a pull request to really see the changes (and let CI run)
Sure, I'll raise a pull request with the above mentioned changes
Hi @clokep , thanks for the comments.
I'm a bit more skeptical about the Success/Failure ones though since I don't think we require that any particular results be returned from the task.
These Success/Failure events are for the batched requests that we are executing in our task. These events will be useful to track the state and result of the each request (which is a part of batched request list) in the monitoring tools like Flower. Otherwise request will be in 'Started' state even after we save (i.e
mark_as_done
) the request's result to results-backend.
Right, I think it is good to send the events, but I think the implementation given above won't work with the way BatchTask
needs to be written. It seems to depend on that returning an ordered list of results, but maybe I'm misunderstanding.
but I think the implementation given above won't work with the way
BatchTask
needs to be written. It seems to depend on that returning an ordered list of results, but maybe I'm misunderstanding.
Hi @clokep , Yes you are correct related to the Success/Failure events implementation. Here I'm assuming, that the task will return an ordered list of results. As you mentioned, I'll try to re-implement this to be compatible with the current BatchTask
implementation
Apart from this , I've raised a pull request, which has a feature for task-received
and task-started
events
Let me know If any further info is required