apalis icon indicating copy to clipboard operation
apalis copied to clipboard

0.6.0-rc7 crashing

Open kdesjard opened this issue 1 year ago • 2 comments

After making a number of changes to get 0.6.0-rc7 to compile, I've set the storage backend as always (which includes my request type):

.backend(storage.clone())

But now I get:

internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyType>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyType>>)

Why do I need to use .data now??

kdesjard avatar Sep 17 '24 11:09 kdesjard

There is always something that passes through a refactor. I will write a quick fix

geofmureithi avatar Sep 17 '24 16:09 geofmureithi

Ok thanks. I do like the new Request struct layout as I was doing this myself in my own struct. Overall, it's much simpler now.

kdesjard avatar Sep 17 '24 17:09 kdesjard

After consideration, this will possibly be resolved externally. Here are some thoughts:

  1. Adding back an implicit Data Layer for the backend would impose a clone for all backends which is not the case for Redis.
  2. We need to standardize dependency injection, we cant inject the backend sparingly.
  3. Not every job needs access the storage, I think an extra .data( line would not hurt your case.

Let me know your thoughts.

geofmureithi avatar Sep 22 '24 08:09 geofmureithi

Ok, if I add the storage via .data, then I get:

Worker missing required context: Missing the an entry for apalis_core::task::task_id::TaskId. Did you forget to add `.data(<apalis_core::task::task_id::TaskId>)

kdesjard avatar Sep 22 '24 10:09 kdesjard

I am updating documentation on this. Since the introduction of Parts, the following are injected directly:

  • TaskId
  • Attempt
  • Namespace
  • Ctx dependent on the Backend

These are no longer available via Data(T), just inject the directly as T into your job function.

geofmureithi avatar Sep 22 '24 11:09 geofmureithi

Let me know if that helps. You can also look at the fn-args example in the v0.6 tree.

geofmureithi avatar Sep 24 '24 07:09 geofmureithi

Creating the worker pool:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .backend(storage.clone())
    .build_fn(execute);

The build_fn like the fn-args example:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>,
    task_id: Data<TaskId>,
    _ctx: Data<SqlContext>,

then I get the crash:

thread 'tokio-runtime-worker' panicked at ..cargo/git/checkouts/apalis-2e5337d3a5750988/d62281f/packages/apalis-core/src/worker/mod.rs:468:45:                                                                                                                   
internal error: entered unreachable code: Worker missing required context: Missing the an entry for `apalis_sql::postgres::PostgresStorage<MyReq>`. Did you forget to add `.data(<apalis_sql::postgres::PostgresStorage<MyReq>>) 

kdesjard avatar Sep 25 '24 11:09 kdesjard

You need to do the following:

let storage: PostgresStorage<MyReq> = PostgresStorage::new(pool.clone());

let worker = WorkerBuilder::new("MyWorker")
    .chain(|srv| srv.layer(TraceLayer::new()))
    .data(storage.clone())
    .backend(storage.clone())
    .build_fn(execute);

And for your job fn:

pub async fn execute(
    mut proc: MyReq,
    worker_id: Data<WorkerId>,
    _worker_ctx: Context<TokioExecutor>,
    _sqlite: Data<PostgresStorage<MyReq>>, //We now include this via data
    task_id: TaskId, // TaskId Injected directly
    _ctx: SqlContext // Ctx is injected directly
)

geofmureithi avatar Sep 26 '24 08:09 geofmureithi

With that setup I get many trait errors like:

72 | .build_fn(execute); | ^^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>, which is required by apalis::prelude::WorkerBuilder<MyReq, SqlContext, PostgresStorage<MyReq>, Stack<Data<PostgresStorage<MyReq>>, Stack<TraceLayer, Identity>>, _>: ap alis::prelude::WorkerFactory<_, _, apalis::prelude::ServiceFn<_, _, _, _>>

71 | .backend(storage.clone()) | ^^^^^^^ the trait Service<apalis::prelude::Request<MyReq, SqlContext>> is not implemented for ServiceFn<fn(MyReq, Data<WorkerId>, Context<TokioExecutor>, Data<PostgresStorage<MyReq>>, TaskId, SqlContext) -> impl Future<Output = Result<OverallResult, Error>> {execute}, MyReq, SqlContext, _>

kdesjard avatar Sep 26 '24 09:09 kdesjard

Well seems I fkd up somewhere. Looking at the docs, seems I am wrong about some of those. Looks like I missed that during the refactor. I will write a patch and clarify.

geofmureithi avatar Sep 26 '24 09:09 geofmureithi

Hey @kdesjard Please check out https://github.com/geofmureithi/apalis/pull/425 and tell me if that branch works for you.

geofmureithi avatar Sep 28 '24 04:09 geofmureithi

Compiles and runs, but now the job gets stuck in the running state.

kdesjard avatar Sep 28 '24 10:09 kdesjard

Ok interesting. Could you try and pick a test from here, and provide a failing test?

geofmureithi avatar Sep 30 '24 07:09 geofmureithi

Is this related to https://github.com/geofmureithi/apalis/discussions/422 ?

My build_fn finishes fine, and I see job.done:

DEBUG ThreadId(06) task: apalis::layers::tracing::on_response: job.done done_in=1ms result=OverallResult { ... }

but the status does not change to Done (attempt also equals 0 ?)

    parts: Parts {
        task_id: TaskId(
            Ulid(
                2088692579879522513577402759446946266,
            ),
        ),
        data: Extensions,
        attempt: Attempt(
            0,
        ),
        context: SqlContext {
            status: Running,
            run_at: 2024-09-30T19:53:27.660139Z,
            max_attempts: 1,
            last_error: None,
            lock_at: Some(
                1727726007,
            ),
            lock_by: Some(
                WorkerId {
                    name: "WORKER",
                    instance: None,
                },
            ),
            done_at: None,
        },

kdesjard avatar Sep 30 '24 19:09 kdesjard

Tower consumes the request as self, so after the job is done, the parts part might need to be refreshed via fetch_by_id. Where and when are you debugging the context?

geofmureithi avatar Oct 01 '24 06:10 geofmureithi

I submit the job and run storage.fetch_by_id to poll the task which is where I'm debugging the context. Even without polling, the DB still has the status as Running:

             id             | status  | attempts | max_attempts |            run_at             |            done_at            
----------------------------+---------+----------+--------------+-------------------------------+-------------------------------
 01J93SDK5HAJE7KGKC17XE4P6B | Running |        0 |            1 | 2024-10-01 10:18:38.130224+00 | 

kdesjard avatar Oct 01 '24 10:10 kdesjard

Ack on postgres is chucked and lazy. Can you try sleeping a second or two?

geofmureithi avatar Oct 01 '24 17:10 geofmureithi

It doesn't matter if I check or not as I have two modes, sync and async, the sync requests do the polling to check for completion within a timeout period. For async jobs, the tasks are fire and forget essentially. So, even for async jobs, the DB still has them as running even though they did complete.

kdesjard avatar Oct 01 '24 17:10 kdesjard

This is really interesting. I have integration tests checking this and they are passing. I will dive more into it and get back to you.

geofmureithi avatar Oct 02 '24 09:10 geofmureithi

@kdesjard I found the bug I was missing a critical implementation in the worker setup. Thanks for highlighting and always trying the latest version.

Basically the AckLayer was not being called, but now that's fixed with a test added for each storage. Please give it a try before I merge it.

geofmureithi avatar Oct 03 '24 19:10 geofmureithi

Jobs are finishing now, but I'm getting trait errors if I include the TaskId in the build_fn's args.

kdesjard avatar Oct 03 '24 19:10 kdesjard

Could you pull the latest from the 0.6 branch and give it a try? There was some other fixes on FromRequest

geofmureithi avatar Oct 04 '24 05:10 geofmureithi

All working now, thank you very much.

kdesjard avatar Oct 04 '24 09:10 kdesjard