hpx
hpx copied to clipboard
Need a synchronization method for batches of tasks
It frequently occurs that we have loops of the kind
for (int i=0; i<large_number; ++i) {
hpx::async(task, stuff);
}
wait_here_for_tasks_to_complete();
// access some shared structure that is being modified by the tasks
Examples of this can be seen in tests like the future overhead performance test, the network storage tests and various other places in the code. https://github.com/STEllAR-GROUP/hpx/blob/6df436bf5b260716aad5663c943c2f7f5d76857e/tests/performance/local/future_overhead.cpp
In some ways this is related to fork_join parallelism since we have to do N lots of work and block on the result. In the future_overhead test we have a wait_each
, wait_all
and a thread counter implementation. The wait_all
version is most well suited to the hpx way of doing things, but is expensive since we must store futures and wait on them. the thread counter implementation is intrusive since it queries the pool at start and at end to ensure no other tasks are running. Another version in my own local tree uses a sliding semaphore, but this requires the user to decorate their task with a decrement of the counter. In other tests, we have used a simepl atomic counter and this also requires modification of the user's task to update the value.
Ideally, we should be able to add tasks (to a thread pool for example) and supply a latch
so that we can be signalled when the last one has completed.
const in num_tasks = 1000000;
hpx::lco::latch my_latch(num_tasks);
for (int i=0; i<num_tasks; ++i) {
hpx::async(my_latch, task, args);
}
my_latch.wait();
An alternative that uses the existing executor style API
const in num_tasks = 1000000;
hpx::lco::latch my_latch(num_tasks);
for (int i=0; i<num_tasks; ++i) {
hpx::async(an_executor.with(my_latch);, task, args);
}
my_latch.wait();
When tasks complete, the existing internal plumbing would decrement the latch.
I believe something of this kind would be a very useful mechanism to have in our code. I welcome comments on what the best API might be. It could be used on some of the parallel::algorithms to remove some wait_all
uses.
This is also connected to the idea of N-ary tasks. In that case, promoting the callable object (as suggested in #3348) to be created earlier and storing things like counters in a task structure. I didn't want to write this in the issue descriptions, but just tag it here as a random thought.
Note that in the examples I pasted in above, it would actually be nice to have this functionality with hpx::apply
since we don't need to return the futures and wait on them.
I'm not against this idea at all, but I think for example the future_overhead
test is somewhat artificial because there we're really just interested in testing task spawning. In practice one wants to avoid spawning that many tasks and would instead use for_loop
or for_each
:
for_loop(par, 0, num_tasks, [](const int /*don't care about the index*/) { task(args); });
This essentially does what you want but with chunking. It removes the syntactic overhead of (e.g.) having to use a latch but adds some with the lambda (is there a cleaner way to write that if task
doesn't take an index as it's first argument?). Is there an argument that for_loop
is not the semantically correct thing to use?
Internally for_loop
is creating tasks and then waiting on the futures. I could use the for_loop
in my example (and I will for the time being), but I wanted to address the general case where you create a large number of tasks and want to wait on them without storing a vector of futures and using wait_all
or something similar.
the idea of the latch is simply to simplify the user experience and provide a hook that links the scheduler back to the user's code.
A for loop becomes much harder when we wish to mix some futures and throw away tasks as follows (which is my main intention in the long run).
for (int i=0; i<large_number; ++i) {
if (i%10==0) {
my_futures.emplace_back(std::move(hpx::async(latch, blah)));
}
else {
hpx::async(latch, task, stuff);
}
// also might spawn tasks without a latch in same loop as we don't need them to be counted
}
latch.wait_here_for_tasks_to_complete();
for (auto && f : my_futures) {
// do things
}
Using a latch here for some tasks, and no latch for others, saving some futures and not saving others. This pattern keeps cropping up.
@biddisco FWIW, we do have a latch (see here: https://github.com/STEllAR-GROUP/hpx/blob/master/hpx/lcos/local/latch.hpp). Also, if you don't need the future returned by hpx::async
, just use hpx::apply
(i.e. fire and forget) - that saves you creating the future in the first place.
Yes. I am aware of the existence of the latch - this is what I want to use. My question is about how we could nicely fire off tasks and use a latch to keep track of them, without the user having to do anything to their actual tasks.
My question is about how we could nicely fire off tasks and use a latch to keep track of them, without the user having to do anything to their actual tasks.
I surmise that hpx::wait_all
is not what you want? Would a task_block
work for you?
As I wrote above. I would like to mix tasks that are 'important' and should be counted in the latch, with tasks that are not important and can just run as they want, but synchronize cheaply on the completion. A task_block would not allow me to mix between the two.
I think your key point is this:
synchronize cheaply on the completion
no? Is the code you posted above with the latch your ideal way of writing that or would go further if you had anything available? Also, I'm really curious about the actual code where you need this pattern. I think this is an interesting problem, but it's not clear yet how general this pattern might be. You can always write your own for_loop
like thing that uses a latch or semaphore internally to keep track of the tasks instead of having the user do that. It's also a question of how much performance you actually lose by using the parallel for_loop
or even just wait_all/when_all
.
It seems I didn't really explain the problem very well - suppose I do the following
auto f = hpx::parallel::for_loop(
hpx::parallel::execution::par_task, 0, N,
[&](int i) {
hpx::async(task, stuff);
});
}
f.get();
This only waits on the loop, it does not wait on the tasks generated inside the loop. The loop will be chunked and there will be C chunks and C futures waited on, but the N async tasks will not complete until later. What I want to do be using a construct of this kind
const in N = 1000000;
hpx::lco::latch my_latch(N);
for (int i=0; i<N; ++i) {
hpx::async(my_latch, task, args);
}
my_latch.wait();
is wait on the N tasks so that I know when they have all completed without having to have a vector<future>
of size N that I then wait on. The specific reason I want it is to implement one of the loops in DCA++, but I have had to use vectors of futures in numerous tests previously.
Is the code you posted above with the latch your ideal way of writing that or would go further if you had anything available?
Well it seems like a very simple way of doing it and could be implemented using a custom applier that is instantiated by an executor that is 'aware' of it. The other obvious ways involve modifying the task itself to signal, but that is intrusive. It is also related to the use of a sliding semaphore in other tests to limit the number of tasks, in those, we have to modify the tasks to signal, but with this approach, it would be possible to do it via the executor/scheduler and leave the tasks untouched - I believe it would be a cleaner way of handling it.
With the risk of getting sidetracked, why would you call async
inside the parallel for loop (if all you want to do is call task(stuff)
for each index)?
If you were copying 100 arrays of small data to the GPU for example, you would asynchronously call copystuff in a loop - but you want to know when they complete. Saving 100 futures is ok, but if you could do it without saving any futures and having an executor that signals you when they are complete, then your code is simpler and there are less overheads.
the most consistent and, IMHO, leanest way to accomplish this is via an executor. On the executor side of things, you can block until all tasks are completed, the implementation can then be completely hidden inside that executor
I agree. This new executor could even be a wrapper around any other (already existing) executor, just adding the ability to wait for all executed tasks. Also, some of our executors already block on destruction. That could be a good starting point.
I am imagining that we have an executor that creates an applier - with an additional function object that is called when the task completes - instead of (or as well as) setting a future, it can decrement a counter/latch/semaphore or suchlike. This could be used for apply or for async - (I didn't look at the code for apply recently, so I'm not sure if it uses an applier, or something else).
We would need to add a template param to applier, and have the usual executors pass a default param to these that would be the function that sets the future. The latching_executor
or whatever, would decorate this function with it's extra features, or replace the future set with other operations.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Issues #3348 and #3553 should not be closed as they are under consideration at the time of writing.