apalis
apalis copied to clipboard
When returning an error from a job, 'last_error' is not set and 'status' is in Done
Hello, I have been evaluating apalis w/ postgres as an executor and have a reproducible error where failures in a job are not recorded as failed in the jobs
table. As such, I'm not sure with how to proceed with having the job retried on failure.
Consider the following code snippet:
In this snippet I create a message, based on the job/ TokenDetails, and attempt to publish the message to a destination queue. I return OK or an Err based on the result of whether the message was able to be published
impl Job for TokenDetails {
const NAME: &'static str = "apalis::TokenDetails";
}
/// Create the SQS client, and send TokenDetails as the message content
pub async fn send_token_event_example(job: TokenDetails, _ctx: JobContext) -> Result<(), JobError> {
....
match queue_client.send_one(&message).await {
Ok(_) => {
log::info!("Message sent successfully from job");
Ok(())
},
Err(e) => {
//apalis reference: https://github.com/geofmureithi/apalis/blob/master/examples/sentry/src/main.rs#L93
log::error!("Message send failure from job. Is your AWS default environment region correct? Error: {:#?}", e);
Err(JobError::Failed(Box::new(e)))
},
}
}
I can trigger an error in this process by changing my AWS default region. Via logs I confirm this is the case that my behavior is causing an error:
{"v":0,"name":"authn_service","msg":"Error when sending message via SQS lib: <....>}
{"v":0,"name":"authn_service","msg":"Message send failure from job. Is your AWS default environment region correct?"}
But there is no error recorded in jobs
.last_error
job | id | job_type | status | attempts | max_attempts | run_at | last_error | lock_at | lock_by | done_at |
---|---|---|---|---|---|---|---|---|---|---|
... | JID-01HCFZGTDGH674EX2SSAGWCKHZ | apalis::TokenDetails | Done | 0 | 25 | 2023-10-11 18:20:55.604719+00 | 2023-10-11 18:20:55.716267+00 | tasty-orange-0 | 2023-10-11 18:20:56.270764+00 |
For reference, this is how I am creating the apalis worker. futures
are polled later on with the API actix-web server
let worker = Monitor::new()
// Start with an initial worker count of one
.register_with_count(1, move |c| {
WorkerBuilder::new(format!("tasty-orange-{c}"))
// .layer(TraceLayer::new())
.layer(BufferLayer::<JobRequest<TokenDetails>>::new(250))
.with_storage_config(worker_pg.clone(), |cfg| {
cfg
// Set the buffer size to 10 ( Pick 10 jobs per query)
.buffer_size(10)
// Lower the fetch interval because postgres is waiting for notifications
.fetch_interval(Duration::from_millis(200))
})
.build_fn(send_token_event_example)
})
.run();
futures.push(Box::pin(worker));
Currently the functionality is not implemented implicitly (I can add that in 0.5).
To do what you need. Consider using a custom Layer and calling update_with_id
.
OK. I am not committed to marking a job Failed
and having it re-executed implicitly (though, implictly has a nice feel to it). If this has to be done explicitly, that's fine too! I just need to be able to mark a job as Failed
and have it re-executed at all.
Since I failed to find an example for how to do this explicitly in /examples
, I assumed re-executing on failure "just worked" implicitly based on how the result is returned in the sentry example.
I would happily submit a PR back with an example in either case if you wouldn't mind giving me a starting point for re-executing a job on failure, either explicitly or implicitly.
I was able to work around this by adding a cron service, and updating to 0.4.5
, to rerun the individual job logic for each item that hasn't been marked as received by the downstream system.
Haven't had the opportunity to figure out the custom layers & explicit method yet but will update when I can
I encourage you to look at https://github.com/geofmureithi/apalis/blob/master/packages/apalis-core/src/layers/ack/mod.rs Ideally you would want to:
- Create a Retry trait
- Create a
RetryLayer
which would take in a type that implementsRetry
- Implement
Retry
for the different backends
You can start with the first two for the PR and then we can discuss.
To be honest, we might be just needing a combination of Ack and Retry in the same layer.
https://github.com/geofmureithi/apalis/discussions/215#discussioncomment-7774965 Also applies here.
Currently AckLayer
just handles success, This can be easily be added.
This has been resolved and tested in #374