userver
userver copied to clipboard
SpscQueue race condition bug
Воспроизведение:
#include <userver/concurrent/queue.hpp>
#include <userver/utest/utest.hpp>
#include <userver/utils/async.hpp>
#include <iostream>
using namespace userver;
UTEST_MT(Queue, RaceCondition, 16) {
struct Foo {};
using ResponseQueue = userver::concurrent::SpscQueue<Foo>;
auto queue = ResponseQueue::Create();
auto consumer = queue->GetConsumer();
auto producer = queue->GetProducer();
auto background_routine = userver::engine::AsyncNoSpan([consumer = std::move(consumer)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::move(consumer).Reset();
std::cout << "consumer reseted\n";
});
if (producer.Push(Foo{})) {
std::cout << "first push successed\n";
} else {
std::cout << "first push failed\n";
}
if (producer.Push(Foo{})) {
std::cout << "second push successed\n";
} else {
std::cout << "second push failed\n";
}
background_routine.Get();
}
Чтобы лучше воспроизводилось надо "пропатчиить" это место (суть патча добавить sleep чтобы разломать инварианты)
diff --git a/core/include/userver/concurrent/queue.hpp b/core/include/userver/concurrent/queue.hpp
index 97de7533a..706ba2ba4 100644
--- a/core/include/userver/concurrent/queue.hpp
+++ b/core/include/userver/concurrent/queue.hpp
@@ -404,6 +404,18 @@ class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
used_capacity_.fetch_add(value_size);
queue_.DoPush(token, std::move(value));
+
+ // Проблема в этом месте. Reset может сделать сброс "следующего" события.
+ //
+ // Пусть начали делать вставку в очередь и одновременно удалять единственный Consumer
+ // Во время начала вставки консьюемер еще был жив (NoMoreConsumers == false)
+ // Успешно вставляем элемент (queue_.DoPush)
+ // Consumer удаляется, при удалении он выставляет event в деструкторе (queue_->MarkConsumerIsDead())
+ // А теперь мы сбрасываем event (non_full_event_.Reset())
+ //
+ // Итого на выходе вернем true, и клиенты опять будут обращаться к Producer и блокироватья навсегда.
+ std::this_thread::sleep_for(std::chrono::seconds(3));
+
non_full_event_.Reset();
return true;
}
По итогу сниппет кода виснет намертво с таким stdout:
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from Queue
[ RUN ] Queue.RaceCondition
consumer reseted
first push successed
Вообще есть еще одна претензия к DoPush
https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L401 и https://github.com/userver-framework/userver/blob/develop/core/include/userver/concurrent/queue.hpp#L405
не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более total_capacity_ элементов оказаться в очереди
не стоило делать одной транзакцией? Если два треда провалятся в DoPush одновременно в итоге может более total_capacity_ элементов оказаться в очереди
"Это не баг, это фича", длина очереди может незначительно превышать лимит под нагрузкой, как и описано в доке