workerpool icon indicating copy to clipboard operation
workerpool copied to clipboard

Pause a workerpool through keyboard

Open Numenorean opened this issue 3 years ago • 16 comments

Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example

Numenorean avatar Nov 11 '20 18:11 Numenorean

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

gammazero avatar Nov 11 '20 18:11 gammazero

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

i'm really don't know how to do it, i have tried this, but it have not work for me

       if err := keyboard.Open(); err != nil {
		panic(err)
	}
	defer func() {
		_ = keyboard.Close()
	}()
	ctx, cancel := context.WithCancel(context.Background())
	for i, str := range resources.data {
                char, _, _ := keyboard.GetKey()
	        if char == 112 {
			wp.Pause(ctx)
		} else if char == 115 {
			cancel()
		}
		str := str
		i := i
		wp.Submit(func() {
                    doSomeWork()
                }

Numenorean avatar Nov 11 '20 20:11 Numenorean

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
	"context"
	"fmt"

	"github.com/eiannone/keyboard"
	"github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
	wp := workerpool.New(5)

	if err := keyboard.Open(); err != nil {
		panic(err)
	}

	var cancel context.CancelFunc
	defer func() {
		keyboard.Close()
		if cancel != nil {
			cancel()
		}
	}()

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for _, str := range data {
		char, _, err := keyboard.GetKey()
		if err != nil {
			panic(err)
		}
		switch char {
		case 'p':
			if cancel == nil {
				// pause work
				var ctx context.Context
				ctx, cancel = context.WithCancel(context.Background())
				wp.Pause(ctx)
				fmt.Println("--- paused ---")
			} else {
				// already paused, so resume work
				fmt.Println("--- resuming ---")
				cancel()
				cancel = nil
			}
		case 's':
			wp.StopWait()
			fmt.Println("--- stopped ---")
			return
		}
		str := str
		wp.Submit(func() {
			doSomeWork(str)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s string) {
	fmt.Println("doWork", s)
}

gammazero avatar Nov 12 '20 00:11 gammazero

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
	"context"
	"fmt"

	"github.com/eiannone/keyboard"
	"github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
	wp := workerpool.New(5)

	if err := keyboard.Open(); err != nil {
		panic(err)
	}

	var cancel context.CancelFunc
	defer func() {
		keyboard.Close()
		if cancel != nil {
			cancel()
		}
	}()

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for _, str := range data {
		char, _, err := keyboard.GetKey()
		if err != nil {
			panic(err)
		}
		switch char {
		case 'p':
			if cancel == nil {
				// pause work
				var ctx context.Context
				ctx, cancel = context.WithCancel(context.Background())
				wp.Pause(ctx)
				fmt.Println("--- paused ---")
			} else {
				// already paused, so resume work
				fmt.Println("--- resuming ---")
				cancel()
				cancel = nil
			}
		case 's':
			wp.StopWait()
			fmt.Println("--- stopped ---")
			return
		}
		str := str
		wp.Submit(func() {
			doSomeWork(str)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s string) {
	fmt.Println("doWork", s)
}

thanks, but it isn't what i want. I have been trying to do system like that:

  • user launch the loop with something data
  • program do some work with this data automatically, without user who pressing keys
  • user want to pause a program, he click "p" and program stops work
  • a few moments later user wants to resume work, he click "p" again and program launches from the place that is stopped
  • user want to sleep, he click on "s" and program save all remaining data(i'll do it myself) and completely stops It's not really cos keyboard.GetKey() wait for any key and my loop just stands still

Numenorean avatar Nov 12 '20 14:11 Numenorean

@gammazero could u help me? i have a problem with pausing workerpool. I've solved the problem with pressing keyboard key to pause, but it doesn't work correctly. When i try to pause workerpool with queue ~6k, it waits for complete all tasks. is there any way to limit tasks queue? Smth like limiting queue to workers num, so i will be able to just wait for all workers are finished

Numenorean avatar Nov 18 '20 18:11 Numenorean

When i try to pause workerpool with queue ~6k, it waits for complete all tasks

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks. One reason for this was to avoid doing any additional checks for pause signal in the critical path. I see that for your use case, you would prefer that workerpool pauses without consuming additional queued items. I will evaluate a minimal approach this week.

is there any way to limit tasks queue?

The purpose of workerpool is to accept tasks without ever blocking. If the task queue size is limited, then workerpool will need to block (I do not think discarding tasks is appropriate). My thought on providing this is to have a new flavor of workerpool that is created using a new constructor: workerpool.NewBlocking(maxWorkers, queueSize int). I will also be able to look into this and provide an experimental version this week.

For your use case(s), do you see one or both of these features as useful?

gammazero avatar Nov 18 '20 19:11 gammazero

@gammazero thanks. just for me first feature would be more useful. But now i realise one big problem, when i call pause method it blocks main goroutine because it waits for complete all tasks, so i cannot resume working(function that pause pool doesn't call anyway). Here is example, i believe there is an any solution:

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/Numenorean/keypresses"
	"github.com/gammazero/workerpool"
)

var cancel context.CancelFunc

func checkPauseOrStop(wp *workerpool.WorkerPool) {
	if keypresses.IsKeyPressedGlobal(0x50, false) {
		if cancel == nil {
			// pause work
			var ctx context.Context
			ctx, cancel = context.WithCancel(context.Background())
			wp.Pause(ctx)
			fmt.Println("--- paused ---")
		} else {
			// already paused, so resume work
			fmt.Println("--- resuming ---")
			cancel()
			cancel = nil
		}
	} else if keypresses.IsKeyPressedGlobal(0x53, false) {
		wp.StopWait()
		fmt.Println("--- stopped ---")
		os.Exit(1)
	}
}

func main() {
	wp := workerpool.New(5)

	fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
	for i := 0; i < 0xf4240; i++ {
		checkPauseOrStop(wp)
		fmt.Println("Queue:", wp.WaitingQueueSize())
		wp.Submit(func() {
			doSomeWork(i)
		})
	}
	wp.StopWait()
	fmt.Println("no more data")
}

func doSomeWork(s int) {
	fmt.Println("Work:", s)
	time.Sleep(5 * time.Second)
}

Numenorean avatar Nov 18 '20 20:11 Numenorean

@gammazero i've posted an issue above this post, is there anyway to resolve that?

Numenorean avatar Nov 22 '20 09:11 Numenorean

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

gammazero avatar Nov 23 '20 02:11 gammazero

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

any news?

Numenorean avatar Dec 31 '20 22:12 Numenorean

Hey man, any news?

Numenorean avatar Feb 20 '21 16:02 Numenorean

any news?

Numenorean avatar Jun 02 '21 13:06 Numenorean

I do have a couple implementations that I can publish in a branch. Here are some variations. Do you need any other than 1?

  1. Pause immediately (before running more queued tasks) and continue to queue submitted tasks
  2. Pause immediately and block new tasks from being submitted
  3. Pause after running all tasks that were already submitted, and continue queuing new tasks
  4. Pause after running all tasks that were already submitted, and block any new tasks

3 and 4 might better be called SyncPause. I am considering that blocking submission of new tasks (2 and 4) could be a parameter to pause. Thoughts?

gammazero avatar Jun 02 '21 18:06 gammazero

continue to queue submitted tasks

Sorry for not replying right away. I don't quite understand if I have, for example, 1 million tasks, after pause -> waiting for all the tasks to be queued -> resume, will I be able to pause it one more time? And what about memory usage, after all the million tasks will be queued?

Numenorean avatar Jul 18 '21 06:07 Numenorean

Hello @gammazero hope u are doing well. Did you eventually published somewhere in a branch option 1 of your latest comment ?

Cheers

Jbaukens avatar Feb 21 '22 00:02 Jbaukens

@Jbaukens, @Numenorean https://github.com/gammazero/workerpool/tree/pause-immediate

gammazero avatar Aug 17 '22 01:08 gammazero