rust-scheduled-executor icon indicating copy to clipboard operation
rust-scheduled-executor copied to clipboard

What is the best way to join this pool? #question

Open inv2004 opened this issue 8 years ago • 8 comments

Hello, Are you going to implement .join() like other executors? If not - how would you recommend to join the pool to main thread?

Thank you,

inv2004 avatar Sep 22 '17 05:09 inv2004

Hi, each tasks is registered to be executed "forever", so the thread pool will never terminate by itself. What would be your use case of a join function? Maybe something equivalent can be implemented.

fede1024 avatar Sep 22 '17 06:09 fede1024

I found that possibility to stop threads in pool is even more important. I have several threads which collect internet data and serialize it in the file. In case I interrupt the process in the middle of write -> data is broken.

I was thinking to wait for ctrl+C (did not find good library compatible with windows) in main thread and send termination to threads if found signal. After it I have to wait until threads a terminated.

inv2004 avatar Sep 26 '17 14:09 inv2004

I see. The underlying thread pool doesn't provide this mechanism, however I added a very simple mechanism to stop scheduled tasks (see TaskHandle).

One way would be to stop all your tasks using tasks handles, then wait for any currently running task to finish and terminate the program. If you don't want to add a sleep, i think a RwLock could be used: the read portion can be taken every time the task is executed, and the write portion only when all the tasks have been stopped. This will ensure that once the write lock is taken, no tasks will be executed.

fede1024 avatar Oct 16 '17 20:10 fede1024

@fede1024 Looks like that is what I wanted. Will check soon. Thank you.

inv2004 avatar Oct 17 '17 00:10 inv2004

Thank you. I just checked it, and it works exactly like I wanted:

   let (_, _) = core.run(stream.into_future()).ok().unwrap();

    task_handles.iter()
      .inspect(|x| info!("Stopping thread"))
      .for_each(|x| x.stop());

    while ! task_handles.iter().all(|x| x.stopped()) {
        thread::sleep(Duration::from_secs(1));
    }
^C2017-12-01 19:23:51.136572530 [main (0)] <WARN> Received SIGNAL
2017-12-01 19:23:51.136941289 [main (0)] <INFO> Stopping thread
2017-12-01 19:23:51.137146396 [main (0)] <INFO> Stopping thread

The only question (but it is optional) - is it possible to extract any info from TaskHandle? For better logging and control. Or execute callback on .stop() from the thread?

Thank you,

-- added -- I just reopened it for question. Will close it soon.

inv2004 avatar Dec 01 '17 19:12 inv2004

I'm glad it worked for you. What kind of information would you like to extract from the TaskHandle?

fede1024 avatar Dec 01 '17 19:12 fede1024

@fede1024 I am not sure, but I suppose it would be perfect to have fn stop(complete_fn: F) or maybe stop-future which would execute complete_fn right before exit the thread. (will it be the same thread like for scheduled_fn? )

Regards,

inv2004 avatar Dec 01 '17 20:12 inv2004

@fede1024 I did additional check:

fn save(...) {
  info!("start");
  thread::sleep(Duration:from_sec(5));
  info!("end);
}

and, after I received signal I do:

    task_handles.iter()
      .inspect(|_| info!("Stopping thread"))
      .for_each(|x| x.stop());

    while task_handles.iter().any(|x| !x.stopped()) {
        info!("Wait to stop");
        thread::sleep(Duration::from_secs(1));
    }

In logs I see "start", then "Stopping thread" and no ""Wait to stop" => program is over Looks like x.stopped() returns true, but thread is not stopped really yet.

Is it possible?

Thank you,

inv2004 avatar Dec 05 '17 07:12 inv2004