userver icon indicating copy to clipboard operation
userver copied to clipboard

SpscQueue race condition bug

Open akhoroshev opened this issue 9 months ago • 2 comments

Воспроизведение:

#include <userver/concurrent/queue.hpp>
#include <userver/utest/utest.hpp>
#include <userver/utils/async.hpp>


#include <iostream>

using namespace userver;

UTEST_MT(Queue, RaceCondition, 16) {
  struct Foo {};

  using ResponseQueue = userver::concurrent::SpscQueue<Foo>;

  auto queue = ResponseQueue::Create();
  auto consumer = queue->GetConsumer();
  auto producer = queue->GetProducer();


  auto background_routine = userver::engine::AsyncNoSpan([consumer = std::move(consumer)]() mutable {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::move(consumer).Reset();
    std::cout << "consumer reseted\n";
  });


  if (producer.Push(Foo{})) {
    std::cout << "first push successed\n";
  } else {
    std::cout << "first push failed\n";
  }

  if (producer.Push(Foo{})) {
    std::cout << "second push successed\n";
  } else {
    std::cout << "second push failed\n";
  }

  background_routine.Get();
}

Чтобы лучше воспроизводилось надо "пропатчиить" это место (суть патча добавить sleep чтобы разломать инварианты)

diff --git a/core/include/userver/concurrent/queue.hpp b/core/include/userver/concurrent/queue.hpp
index 97de7533a..706ba2ba4 100644
--- a/core/include/userver/concurrent/queue.hpp
+++ b/core/include/userver/concurrent/queue.hpp
@@ -404,6 +404,18 @@ class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
 
     used_capacity_.fetch_add(value_size);
     queue_.DoPush(token, std::move(value));
+
+    // Проблема в этом месте. Reset может сделать сброс "следующего" события.
+    //
+    // Пусть начали делать вставку в очередь и одновременно удалять единственный Consumer
+    // Во время начала вставки консьюемер еще был жив (NoMoreConsumers == false)
+    // Успешно вставляем элемент (queue_.DoPush)
+    // Consumer удаляется, при удалении он выставляет event в деструкторе (queue_->MarkConsumerIsDead())
+    // А теперь мы сбрасываем event (non_full_event_.Reset())
+    //
+    // Итого на выходе вернем true, и клиенты опять будут обращаться к Producer и блокироватья навсегда.
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+
     non_full_event_.Reset();
     return true;
   }

По итогу сниппет кода виснет намертво с таким stdout:

[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from Queue
[ RUN      ] Queue.RaceCondition
consumer reseted
first push successed

akhoroshev avatar May 09 '24 19:05 akhoroshev

Вообще есть еще одна претензия к DoPush

https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L401 и https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L405

не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более total_capacity_ элементов оказаться в очереди

akhoroshev avatar May 09 '24 19:05 akhoroshev

не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более total_capacity_ элементов оказаться в очереди

"Это не баг, это фича", длина очереди может незначительно превышать лимит под нагрузкой, как и описано в доке

Anton3 avatar May 09 '24 19:05 Anton3