0.6.0-rc7 crashing
After making a number of changes to get 0.6.0-rc7 to compile, I've set the storage backend as always (which includes my request type):
.backend(storage.clone())
But now I get:
internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyType>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyType>>)
Why do I need to use .data now??
There is always something that passes through a refactor. I will write a quick fix
Ok thanks. I do like the new Request struct layout as I was doing this myself in my own struct. Overall, it's much simpler now.
After consideration, this will possibly be resolved externally. Here are some thoughts:
- Adding back an implicit
DataLayer for the backend would impose a clone for all backends which is not the case for Redis. - We need to standardize dependency injection, we cant inject the backend sparingly.
- Not every job needs access the storage, I think an extra
.data(line would not hurt your case.
Let me know your thoughts.
Ok, if I add the storage via .data, then I get:
Worker missing required context: Missing the an entry for apalis_core::task::task_id::TaskId. Did you forget to add `.data(<apalis_core::task::task_id::TaskId>)
I am updating documentation on this.
Since the introduction of Parts, the following are injected directly:
-
TaskId -
Attempt -
Namespace -
Ctxdependent on theBackend
These are no longer available via Data(T), just inject the directly as T into your job function.
Let me know if that helps. You can also look at the fn-args example in the v0.6 tree.
Creating the worker pool:
let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());
let worker = WorkerBuilder::new("MyWorker")
.chain(|srv| srv.layer(TraceLayer::new()))
.backend(storage.clone())
.build_fn(execute);
The build_fn like the fn-args example:
pub async fn execute(
mut proc: MyReq,
worker_id: Data<WorkerId>,
_worker_ctx: Context<TokioExecutor>,
_sqlite: Data<PostgresStorage<MyReq>>,
task_id: Data<TaskId>,
_ctx: Data<SqlContext>,
then I get the crash:
thread 'tokio-runtime-worker' panicked at ..cargo/git/checkouts/apalis-2e5337d3a5750988/d62281f/packages/apalis-core/src/worker/mod.rs:468:45:
internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyReq>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyReq>>)
You need to do the following:
let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());
let worker = WorkerBuilder::new("MyWorker")
.chain(|srv| srv.layer(TraceLayer::new()))
.data(storage.clone())
.backend(storage.clone())
.build_fn(execute);
And for your job fn:
pub async fn execute(
mut proc: MyReq,
worker_id: Data<WorkerId>,
_worker_ctx: Context<TokioExecutor>,
_sqlite: Data<PostgresStorage<MyReq>>, //We now include this via data
task_id: TaskId, // TaskId Injected directly
_ctx: SqlContext // Ctx is injected directly
)
With that setup I get many trait errors like:
72 | .build_fn(execute);
| ^^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>, which is required by apalis::prelude::WorkerBuilder<MyReq, SqlContext, PostgresStorage<MyReq>, Stack<Data<PostgresStorage<MyReq>>, Stack<TraceLayer, Identity>>, _>: ap alis::prelude::WorkerFactory<_, _, apalis::prelude::ServiceFn<_, _, _, _>>
71 | .backend(storage.clone())
| ^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>
Well seems I fkd up somewhere. Looking at the docs, seems I am wrong about some of those. Looks like I missed that during the refactor. I will write a patch and clarify.
Hey @kdesjard Please check out https://github.com/geofmureithi/apalis/pull/425 and tell me if that branch works for you.
Compiles and runs, but now the job gets stuck in the running state.
Ok interesting. Could you try and pick a test from here, and provide a failing test?
Is this related to https://github.com/geofmureithi/apalis/discussions/422 ?
My build_fn finishes fine, and I see job.done:
DEBUG ThreadId(06) task: apalis::layers::tracing::on_response: job.done done_in=1ms result=OverallResult { ... }
but the status does not change to Done (attempt also equals 0 ?)
parts: Parts {
task_id: TaskId(
Ulid(
2088692579879522513577402759446946266,
),
),
data: Extensions,
attempt: Attempt(
0,
),
context: SqlContext {
status: Running,
run_at: 2024-09-30T19:53:27.660139Z,
max_attempts: 1,
last_error: None,
lock_at: Some(
1727726007,
),
lock_by: Some(
WorkerId {
name: "WORKER",
instance: None,
},
),
done_at: None,
},
Tower consumes the request as self, so after the job is done, the parts part might need to be refreshed via fetch_by_id.
Where and when are you debugging the context?
I submit the job and run storage.fetch_by_id to poll the task which is where I'm debugging the context. Even without polling, the DB still has the status as Running:
id | status | attempts | max_attempts | run_at | done_at
----------------------------+---------+----------+--------------+-------------------------------+-------------------------------
01J93SDK5HAJE7KGKC17XE4P6B | Running | 0 | 1 | 2024-10-01 10:18:38.130224+00 |
Ack on postgres is chucked and lazy. Can you try sleeping a second or two?
It doesn't matter if I check or not as I have two modes, sync and async, the sync requests do the polling to check for completion within a timeout period. For async jobs, the tasks are fire and forget essentially. So, even for async jobs, the DB still has them as running even though they did complete.
This is really interesting. I have integration tests checking this and they are passing. I will dive more into it and get back to you.
@kdesjard I found the bug I was missing a critical implementation in the worker setup. Thanks for highlighting and always trying the latest version.
Basically the AckLayer was not being called, but now that's fixed with a test added for each storage. Please give it a try before I merge it.
Jobs are finishing now, but I'm getting trait errors if I include the TaskId in the build_fn's args.
Could you pull the latest from the 0.6 branch and give it a try? There was some other fixes on FromRequest
All working now, thank you very much.