seqan3 icon indicating copy to clipboard operation
seqan3 copied to clipboard

High thread counts lead to non-terminating tests

Open eseiler opened this issue 5 years ago • 4 comments

Platform

  • SeqAn version:
  • Operating system:
  • Compiler:

Description

After https://github.com/seqan/seqan3/pull/2276, tests and snippets using the parallel_queue do not terminate if there are too many threads accessing the queue.

If there are 160 threads, there is one thread trying to close the queue, while there are 159 threads acquiring the same lock to check if the queue is closed.

In the parallel align_pairwise, you can track it to https://github.com/seqan/seqan3/blob/d37ba83a333992a551ec835097e564d8b9a00604/include/seqan3/core/algorithm/detail/algorithm_executor_blocking.hpp#L236 which then calls https://github.com/seqan/seqan3/blob/d37ba83a333992a551ec835097e564d8b9a00604/include/seqan3/core/algorithm/detail/execution_handler_parallel.hpp#L195

How to repeat the problem

Expected behaviour

Actual behaviour

eseiler avatar Nov 24 '20 20:11 eseiler

Can you just try this? Not sure if this is 100% correct, because it was late but maybe it gives us a hint if that is really a starvation problem on the exclusive lock of the shared mutex.

diff --git a/include/seqan3/contrib/parallel/buffer_queue.hpp b/include/seqan3/contrib/parallel/buffer_queue.hpp
index c5a16a44a..be125d2e6 100644
--- a/include/seqan3/contrib/parallel/buffer_queue.hpp
+++ b/include/seqan3/contrib/parallel/buffer_queue.hpp
@@ -168,15 +168,18 @@ public:
         value_type value{};
         for (;;)
         {
-            auto status = try_pop(value);
+            if (!writer_waiting.load())
+            {
+                auto status = try_pop(value);
 
-            if (status == queue_op_status::closed)
-                throw queue_op_status::closed;
-            else if (status == queue_op_status::success)
-                return value;
+                if (status == queue_op_status::closed)
+                    throw queue_op_status::closed;
+                else if (status == queue_op_status::success)
+                    return value;
 
-            assert(status != queue_op_status::full);
-            assert(status == queue_op_status::empty);
+                assert(status != queue_op_status::full);
+                assert(status == queue_op_status::empty);
+            }
             delay.wait(); // pause and then try again.
         }
     }
@@ -188,13 +191,16 @@ public:
         queue_op_status status;
         for (;;)
         {
-            status = try_pop(value);
+            if (!writer_waiting.load())
+            {
+                status = try_pop(value);
 
-            if (status == queue_op_status::closed || status == queue_op_status::success)
-                break;
+                if (status == queue_op_status::closed || status == queue_op_status::success)
+                    break;
 
-            assert(status != queue_op_status::full);
-            assert(status == queue_op_status::empty);
+                assert(status != queue_op_status::full);
+                assert(status == queue_op_status::empty);
+            }
             delay.wait(); // pause and then try again.
         }
         return status;
@@ -214,10 +220,22 @@ public:
     /*!\name State operations
      * \{
      */
-    void close() noexcept
+    void close()
     {
-        std::unique_lock write_lock{mutex};
-        closed_flag = true;
+        if (writer_waiting.exchange(true)) // First writer that closes the queue will continue, the rest returns.
+            return;
+
+        try
+        {
+            std::unique_lock write_lock{mutex};
+            closed_flag = true;
+            writer_waiting.store(false); // reset the lock.
+        }
+        catch (...)
+        {
+            writer_waiting.store(false); // reset the lock.
+            std::rethrow_exception(std::current_exception());
+        }
     }
 
     bool is_closed() const noexcept
@@ -339,6 +357,7 @@ private:
     alignas(std::hardware_destructive_interference_size) std::atomic<size_type>    push_back_position{0};
     alignas(std::hardware_destructive_interference_size) std::atomic<size_type>    pending_push_back_position{0};
     alignas(std::hardware_destructive_interference_size) std::atomic<size_type>    ring_buffer_capacity{0};
+    alignas(std::hardware_destructive_interference_size) std::atomic_bool          writer_waiting{false};
     alignas(std::hardware_destructive_interference_size) bool                      closed_flag{false};
 };
 
@@ -440,7 +459,7 @@ template <typename value_t, typename buffer_t, buffer_queue_policy buffer_policy
 inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
 {
     // try to extract a value
-    std::shared_lock  read_lock{mutex};
+    std::shared_lock read_lock{mutex};
 
     size_type local_pending_pop_front_position{};
     size_type next_local_pop_front_position{};

rrahn avatar Nov 24 '20 22:11 rrahn

Patch works :)

eseiler avatar Nov 24 '20 23:11 eseiler

We merged the patch, but we still want to evaluate if the patch is the version we want.

marehr avatar Dec 18 '20 20:12 marehr

So, at the moment we have stable tests and the mechanism seems to at least do not cause the threading issues as before. So I suggest we make a separate ticket to discuss the implementation details for the notification mechanism.

rrahn avatar Jan 18 '21 10:01 rrahn