OpenTESArena icon indicating copy to clipboard operation
OpenTESArena copied to clipboard

Job Manager

Open i-am-the-gabe opened this issue 1 year ago • 7 comments

This implements a job system (hopefully closes #245).

Examples:

// If the job manager has at least 3 threads to work with, this 
// only takes 1 second.
{
    auto jm = JobManager(3);
    
    Job sleep ([](){ 
       std::this_thread::sleep_for(1000ms);
    });
    jm.submitJobs ( { sleep, sleep, sleep } );
}
// This, too, should only take 1 second.
{
    auto max_threads = std::thread::hardware_concurrency(); 
    auto jm = JobManager(max_threads);
    
    std::vector< Job > job_list (
       max_threads,
       Job( [](){ std::this_thread::sleep_for(1000ms); } )
    );
     
    jm.submitJobs ( job_list );
}
// This will wait for all jobs to be done before resuming the calling thread.
{
    auto max_threads = std::thread::hardware_concurrency(); 
    auto jm = JobManager(max_threads);
    
    Job sleep ([](){ 
       std::this_thread::sleep_for(1000ms);
    });
    
    jm.submitJobs( { sleep } );
    
    jm.wait();
    std::cout << "\nDone!";
}

Essentially, when a JobManager is constructed with an n_threads argument, it creates a private ThreadPool. That ThreadPool, in turn, creates n_threads Workers.

JobManager::run() spawns a background thread to do the following:

  1. While the job queue is not empty, ask the thread pool for an idle worker.

    If the thread pool can't immediately provide an idle worker, the background thread will wait until one is available.

  2. When an idle worker has been obtained, pop a job from the front of the queue and tell the idle worker to do it.

JobManager, if given a list of jobs in its constructor (optional), immediately starts running the job distributor. Otherwise, JobManager::run() only runs when the following conditions are met:

  1. JobManager::submitJobs() has been called.

  2. JobManager::isRunning() returns false.

  3. The job queue is not empty.

This means the only job-performing method you need to worry about is JobManager::submitJobs(), and that adding jobs to an already-running queue is completely safe and intended behaviour:

{
    auto jm = JobManager(std::thread::hardware_concurrency());
    
    Job sleep ([](){ 
       std::this_thread::sleep_for(1h);
    });
    // Since the queue was empty before, this starts the job system, 
    // which will run for an hour.
    jm.submitJobs ( { sleep } ); 
    // Completely safe, and a new instance of sleep has been appended to
    // jm's already-existing queue.
    jm.submitJobs ( { sleep } ); 
}

To do:

[ ] Categories (filter which jobs JobManager::wait() waits for)
[ ] Decentralize job distribution a bit more.

i-am-the-gabe avatar Mar 29 '23 16:03 i-am-the-gabe

Thanks. I'll start reviewing this tonight and push revisions. The Readme is not necessary.

afritz1 avatar Mar 30 '23 00:03 afritz1

I need permission to write to your branch.

Enumerating objects: 54, done.
Counting objects: 100% (54/54), done.
Delta compression using up to 32 threads
Compressing objects: 100% (46/46), done.
Writing objects: 100% (47/47), 6.26 KiB | 6.26 MiB/s, done.
Total 47 (delta 34), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (34/34), completed with 5 local objects.        
remote: error: GH006: Protected branch update failed for refs/heads/Job-System.        
remote: error: At least 1 approving review is required by reviewers with write access.        
To https://github.com/i-am-the-gabe/OpenTESArena.git
 ! [remote rejected]   pr/247 -> Job-System (protected branch hook declined)
error: failed to push some refs to 'https://github.com/i-am-the-gabe/OpenTESArena.git'

afritz1 avatar Mar 30 '23 03:03 afritz1

I need permission to write to your branch.

Enumerating objects: 54, done.
Counting objects: 100% (54/54), done.
Delta compression using up to 32 threads
Compressing objects: 100% (46/46), done.
Writing objects: 100% (47/47), 6.26 KiB | 6.26 MiB/s, done.
Total 47 (delta 34), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (34/34), completed with 5 local objects.        
remote: error: GH006: Protected branch update failed for refs/heads/Job-System.        
remote: error: At least 1 approving review is required by reviewers with write access.        
To https://github.com/i-am-the-gabe/OpenTESArena.git
 ! [remote rejected]   pr/247 -> Job-System (protected branch hook declined)
error: failed to push some refs to 'https://github.com/i-am-the-gabe/OpenTESArena.git'

My bad, bungled the branch protection. Should work now.

i-am-the-gabe avatar Mar 30 '23 09:03 i-am-the-gabe

Okay, I made some changes to bring it more in line with what I'm looking for. Questions:

  1. Can we fix the 'here be dragons'?
  2. Can we make a submitJob() that takes one job? I think a common use case would be submitJob(...); submitJob(...); submitJob(...); wait();. It can use submitJobs() internally.
  3. Can we allocate the ThreadPool without std::unique_ptr?
  4. Can we fix the TODO on JobManager?

I'll handle # 2.

afritz1 avatar Mar 30 '23 14:03 afritz1

Taking down (4) as we speak.

Edit: Looked into it, and it might not be necessary. It just sounds strange to have a job scheduler manage and request worker threads while being a worker thread itself. It'd also make the wait() implementation that much more confusing, and we need to keep it as simple as possible before categories are implemented.

i-am-the-gabe avatar Mar 30 '23 15:03 i-am-the-gabe

I tried this in Game::loop() but I don't understand why the main thread is blocked until the jobs are done. Maybe I'm misunderstanding locks/mutexes.

	JobManager jobManager;
	jobManager.init(8);

	std::mutex mut;
	auto lamb = [&mut](int i)
	{
		return [&mut, i]()
		{
			std::unique_lock<std::mutex> lock(mut);
			std::cout << "lamb " << i << '\n';
			lock.unlock();

			std::this_thread::sleep_for(std::chrono::milliseconds(100));
		};
	};

	std::vector<Job> jobs;
	for (int i = 0; i < 200; i++)
	{
		jobs.emplace_back(lamb(i));
	}

	jobManager.submitJobs(jobs);

	Job job = [&mut]()
	{
		std::lock_guard<std::mutex> lock(mut);
		std::cout << "Last job" << '\n';
	};

	jobManager.submitJob(job);

	// Primary game loop.
	int frame = 0;
	while (this->running)
	{
		std::unique_lock<std::mutex> lock(mut);
		std::cout << "frame " << frame << '\n';
		lock.unlock();
		frame++;

The output is

...
lamb 197
lamb 198
lamb 199
Last job
frame 0
frame 1
frame 2

But I expected like

lamb 0
lamb 1
frame 0
lamb 3
lamb 2
...

Edit: looks like if I comment out the submitJob() then it works as expected.

afritz1 avatar Mar 30 '23 15:03 afritz1

Also I'm not sure if this is an easy change but Worker::invoke() should not create a new thread, it should reuse the std::thread it was initialized with. I don't want std::this_thread::get_id() changing for the same worker.

afritz1 avatar Mar 30 '23 16:03 afritz1