fang icon indicating copy to clipboard operation
fang copied to clipboard

How to process/retrieve the result of a task and how to pass a non serializable context to the task?

Open aurelien-clu opened this issue 1 year ago • 7 comments

Hello,

Thank you for your work. :)

I am wondering the following things:

  • how could someone retrieve the result of a task?
  • how could someone run a callback or call another service with the result of a task?
  • how could someone use clients or database pool from within a run function?

(Feel free to tell me if you would like me to split this into multiple issues.)

E.g. I would like to store the result inside the database or transmit the result to another service (in-memory) without doing an API call to localhost if I am running the workers within an API. And I don't want to build the client/db pool for every task from environment variables.

async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError>

run signature is Result<(), FangError> thus I could not make my own AsyncWorker<AQueue> or AsyncWorkerPool<AQueue> that would handle the result of a task.

And since a task should be serializable, I cannot provide to it clients (database or otherwise) that I would not want serializable but that I would want to access in the run function.

I am thinking that the only way would be to attempt to access a global reference to a singleton or something alike, e.g.:

async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
    // do stuff
    let result = ...;
    let pool: Pool<Postgres> = get_global_postgres_pool().expect("Posgres pool is not initialized");
    pool.acquire().await?.execute("<INSERT result into table>").await?;
    Ok(())
}

Though I am not sure that the compiler would allow me to do so.

  1. Did I miss something?
  2. Or do you currently use a workaround to achieve this behavior?
  3. Or would that kind of feature be out of scope of fang and you don't need this?

Maybe it is part or could be part of the following discussion https://github.com/ayrat555/fang/discussions/101 ?

I understand that this could/would considerably complicate the implementation, but no harm in asking 😇.

aurelien-clu avatar Jan 09 '24 13:01 aurelien-clu

Seems doable to have a global pool, e.g. https://stackoverflow.com/a/63153898

Still interested in your take on this ;)

aurelien-clu avatar Jan 09 '24 13:01 aurelien-clu

Hello, Have you figured out how to retrieve the result of a task? Thanks

PainOchoco avatar Mar 11 '24 11:03 PainOchoco

More specifically I would want the task to output a stream of data

PainOchoco avatar Mar 11 '24 11:03 PainOchoco

It would be nice if we could pass a MPSC sender instance through the task's run method. It would allow data streaming between the consumer and the senders (workers).

Let me know if there's already a way to do that, I can't find a way to pass a sender instance to the tasks.

PainOchoco avatar Mar 12 '24 11:03 PainOchoco

There is no current way to pass non-serializable object to a task.

One work-around you can do is to make the Task fields to be database pool, or connetion url which are both serializable and the work will store in the database the stuff you want. After that, do another task that will fetch from database every cetain amount of time.

The main problem that this

async fn run<T>(&self, queue: &mut dyn AsyncQueueable) -> Result<T, FangError>

can not be implemented yet, I think (I am not sure because Rust has been changing since i have implemented it).

The reason is that async traits do not allow generics, at least at the moment we have done the implementation.

pxp9 avatar Apr 01 '24 14:04 pxp9

I have been reasearching about this and I think since Rust 1.75 this change could be possible.

As soon as Sqlite PR and Mysql closes #141 , I will open a experimental PR exploring this.

pxp9 avatar Apr 12 '24 09:04 pxp9

Seems doable to have a global pool, e.g. https://stackoverflow.com/a/63153898

Still interested in your take on this ;)

not a good choice in global.

sealter avatar Apr 21 '24 13:04 sealter