High thread counts lead to non-terminating tests
Platform
- SeqAn version:
- Operating system:
- Compiler:
Description
After https://github.com/seqan/seqan3/pull/2276, tests and snippets using the parallel_queue do not terminate if there are too many threads accessing the queue.
If there are 160 threads, there is one thread trying to close the queue, while there are 159 threads acquiring the same lock to check if the queue is closed.
In the parallel align_pairwise, you can track it to https://github.com/seqan/seqan3/blob/d37ba83a333992a551ec835097e564d8b9a00604/include/seqan3/core/algorithm/detail/algorithm_executor_blocking.hpp#L236 which then calls https://github.com/seqan/seqan3/blob/d37ba83a333992a551ec835097e564d8b9a00604/include/seqan3/core/algorithm/detail/execution_handler_parallel.hpp#L195
How to repeat the problem
Expected behaviour
Actual behaviour
Can you just try this? Not sure if this is 100% correct, because it was late but maybe it gives us a hint if that is really a starvation problem on the exclusive lock of the shared mutex.
diff --git a/include/seqan3/contrib/parallel/buffer_queue.hpp b/include/seqan3/contrib/parallel/buffer_queue.hpp
index c5a16a44a..be125d2e6 100644
--- a/include/seqan3/contrib/parallel/buffer_queue.hpp
+++ b/include/seqan3/contrib/parallel/buffer_queue.hpp
@@ -168,15 +168,18 @@ public:
value_type value{};
for (;;)
{
- auto status = try_pop(value);
+ if (!writer_waiting.load())
+ {
+ auto status = try_pop(value);
- if (status == queue_op_status::closed)
- throw queue_op_status::closed;
- else if (status == queue_op_status::success)
- return value;
+ if (status == queue_op_status::closed)
+ throw queue_op_status::closed;
+ else if (status == queue_op_status::success)
+ return value;
- assert(status != queue_op_status::full);
- assert(status == queue_op_status::empty);
+ assert(status != queue_op_status::full);
+ assert(status == queue_op_status::empty);
+ }
delay.wait(); // pause and then try again.
}
}
@@ -188,13 +191,16 @@ public:
queue_op_status status;
for (;;)
{
- status = try_pop(value);
+ if (!writer_waiting.load())
+ {
+ status = try_pop(value);
- if (status == queue_op_status::closed || status == queue_op_status::success)
- break;
+ if (status == queue_op_status::closed || status == queue_op_status::success)
+ break;
- assert(status != queue_op_status::full);
- assert(status == queue_op_status::empty);
+ assert(status != queue_op_status::full);
+ assert(status == queue_op_status::empty);
+ }
delay.wait(); // pause and then try again.
}
return status;
@@ -214,10 +220,22 @@ public:
/*!\name State operations
* \{
*/
- void close() noexcept
+ void close()
{
- std::unique_lock write_lock{mutex};
- closed_flag = true;
+ if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
+ return;
+
+ try
+ {
+ std::unique_lock write_lock{mutex};
+ closed_flag = true;
+ writer_waiting.store(false); // reset the lock.
+ }
+ catch (...)
+ {
+ writer_waiting.store(false); // reset the lock.
+ std::rethrow_exception(std::current_exception());
+ }
}
bool is_closed() const noexcept
@@ -339,6 +357,7 @@ private:
alignas(std::hardware_destructive_interference_size) std::atomic<size_type> push_back_position{0};
alignas(std::hardware_destructive_interference_size) std::atomic<size_type> pending_push_back_position{0};
alignas(std::hardware_destructive_interference_size) std::atomic<size_type> ring_buffer_capacity{0};
+ alignas(std::hardware_destructive_interference_size) std::atomic_bool writer_waiting{false};
alignas(std::hardware_destructive_interference_size) bool closed_flag{false};
};
@@ -440,7 +459,7 @@ template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy
inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
{
// try to extract a value
- std::shared_lock read_lock{mutex};
+ std::shared_lock read_lock{mutex};
size_type local_pending_pop_front_position{};
size_type next_local_pop_front_position{};
Patch works :)
We merged the patch, but we still want to evaluate if the patch is the version we want.
So, at the moment we have stable tests and the mechanism seems to at least do not cause the threading issues as before. So I suggest we make a separate ticket to discuss the implementation details for the notification mechanism.