wait_dequeue occasionally not working properly
// class CatAccountService member:
// consumer queue
moodycamel::BlockingConcurrentQueue<CatTaskField *> account_queue_con_task;
// producer queue,will init with fixed numbers of elements:128
moodycamel::ConcurrentQueue<CatTaskField *> account_queue_pro_task;
when in producer thread: 1: get element from account_queue_pro_task; 2: filled data, enqueue account_queue_con_task;
code below:
void CatCore::on_tick(CatTaskField *ctf) {
for (auto &account : vAccountsInfo) {
// when account start trading service
if (account->get_start_trading_service_flag()) {
CatTaskField *ts_data = nullptr;
bool succeeded = false;
int try_count = 0;
do
{
succeeded = account->account_queue_pro_task.try_dequeue(ts_data);
try_count++;
cat_console_logger->debug("CatCore::on_tick() succeeded:{} account->account_queue_pro_task.size_approx:{}",
succeeded, account->account_queue_pro_task.size_approx());
if (try_count == 128) {
cat_console_logger->error("CatCore::on_tick() try_count:{} account->account_queue_pro_task.size_approx:{}",
try_count, account->account_queue_pro_task.size_approx());
break;
}
} while (!succeeded);
if (ts_data != nullptr) {
ts_data->TaskType = CAT_RSP_ON_TICK;
memcpy(&ts_data->pDepthMarketData, &ctf->pDepthMarketData, sizeof(CThostFtdcDepthMarketDataField));
bool enqueue_flag = account->account_queue_con_task.enqueue(ts_data);
cat_console_logger->info("CatCore::on_tick() enqueue_flag:{} account->account_queue_con_task.size_approx:{}",
enqueue_flag, account->account_queue_con_task.size_approx());
}
else {
cat_console_logger->error("CatCore::on_tick() ts_data is nullptr!");
}
}
}
}
consumer thread: 1: get element from account_queue_con_task; 2: consume, then recycling element:enqueue account_queue_pro_task;
void CatAccountService::th_handle_task_queue() {
CatTaskField* ctf;
int strategy_id;
cat_console_logger->info("CatAccountService::th_handle_task_queue() IN :)");
this->b_start_trading_service = true;
while (1) {
ctf = nullptr;
this->account_queue_con_task.wait_dequeue(ctf);
if (ctf != nullptr) {
switch (ctf->TaskType)
{
// do something
default: {
} break;
}
// reset data
memset(ctf, 0x00, sizeof(CatTaskField));
// producer recycling
bool enqueue_flag = this->account_queue_pro_task.enqueue(ctf);
cat_console_logger->info("CatAccountService::th_handle_task_queue() enqueue_flag:{} account->account_queue_pro_task.size_approx:{}", enqueue_flag, this->account_queue_pro_task.size_approx());
}
else {
cat_console_logger->info("CatAccountService::th_handle_task_queue() ctf is NULLPTR!!! EXIT :)");
break;
}
}
this->b_start_trading_service = false;
b_exit_trading_serv = true;
this->cv_data_exit_trading_serv.notify_one();
cat_console_logger->info("CatAccountService::th_handle_task_queue() EXIT :)");
}
Most of the time it works normally, but occasionally there may be consumer threads no longer consuming, as if they are stuck and the producer queue keeps filling up the consumer queue. I don't know how to solve it.
below is some log I got:
I'm having trouble making sense of the logs:
- The con queue size jumps from 7 to 14 in the logs -- how is this possible with only one producer thread?
- The producer thread is logging on both dequeue then enqueue, but only the enqueue ones are appearing in the first two screenshots. Are these screenshots from different runs at different verbosity levels?
The sizes as seen by different threads can be out of date with respect to the other, so the last screenshot doesn't necessarily indicate a problem. The producer thread may take everything out of the pro queue and put them back in the con queue. We don't see if the consumer thread then picks up those elements or not.
Can you please narrow this down to a self-contained reproducible example?
- The con queue size jumps from 7 to 14 in the logs -- how is this possible with only one producer thread?
: sorry, I forgot,I have another thread to enqueue task element to account_queue_con_task;
- The producer thread is logging on both dequeue then enqueue, but only the enqueue ones are appearing in the first two screenshots. Are these screenshots from different runs at different verbosity levels?
: once I want to enqueue task element to account_queue_con_task,I try_dequeue from account_queue_pro_task, because I don't want to malloc in running time because this is for trading system, latency is important. The log was reorganized by me in order for you to know the problem
Can you please narrow this down to a self-contained reproducible example?
I reserve 128 CatTaskField obj in advance and enqueue them to account_queue_pro_task,during running time, when I want to put task to account_queue_con_task, I get blank element from account_queue_pro_task. I do this just for forbid malloc in running time.
// class CatAccountService member:
// consumer queue
moodycamel::BlockingConcurrentQueue<CatTaskField *> account_queue_con_task;
// producer queue,will init with fixed numbers of elements:128
moodycamel::ConcurrentQueue<CatTaskField *> account_queue_pro_task;
pre init account_queue_pro_task:
// CAT_ACCOUNT_TASK_QUEUE_MAXIMUM:128
this->v_bulk_tasks.reserve(CAT_ACCOUNT_TASK_QUEUE_MAXIMUM);
for (int i = 0; i < CAT_ACCOUNT_TASK_QUEUE_MAXIMUM; ++i) {
this->v_bulk_tasks.emplace_back();
this->account_queue_pro_task.enqueue(&this->v_bulk_tasks[i]);
}
producer-1 thread (on tick, handle market data):
void CatCore::on_tick(CatTaskField *ctf) {
for (auto &account : vAccountsInfo) {
// when account start trading service
if (account->get_start_trading_service_flag()) {
CatTaskField *ts_data = nullptr;
bool succeeded = false;
int try_count = 0;
do
{
succeeded = account->account_queue_pro_task.try_dequeue(ts_data);
try_count++;
cat_console_logger->debug("CatCore::on_tick() succeeded:{} account->account_queue_pro_task.size_approx:{}",
succeeded, account->account_queue_pro_task.size_approx());
if (try_count == 128) {
cat_console_logger->error("CatCore::on_tick() try_count:{} account->account_queue_pro_task.size_approx:{}",
try_count, account->account_queue_pro_task.size_approx());
break;
}
} while (!succeeded);
if (ts_data != nullptr) {
ts_data->TaskType = CAT_RSP_ON_TICK;
memcpy(&ts_data->pDepthMarketData, &ctf->pDepthMarketData, sizeof(CThostFtdcDepthMarketDataField));
bool enqueue_flag = account->account_queue_con_task.enqueue(ts_data);
cat_console_logger->info("CatCore::on_tick() enqueue_flag:{} account->account_queue_con_task.size_approx:{}",
enqueue_flag, account->account_queue_con_task.size_approx());
}
else {
cat_console_logger->error("CatCore::on_tick() ts_data is nullptr!");
}
}
}
}
producer-2 thread (on rsp order, handle trading flow data):
void CatTd::OnRtnOrder(CThostFtdcOrderField *pOrder) {
if (pOrder != nullptr) {
CatTaskField *ts_data = nullptr;
bool succeeded = false;
int try_count = 0;
do
{
succeeded = this->cat_account_serv->account_queue_pro_task.try_dequeue(ts_data);
try_count++;
cat_console_logger->debug("CatTd::OnRtnOrder() dequeue ts_data succeeded:{} account_queue_pro_task size:{}",
succeeded, this->cat_account_serv->account_queue_pro_task.size_approx());
if (try_count == 128) {
cat_console_logger->error("CatTd::OnRtnOrder() try_count:{} account->account_queue_pro_task.size_approx:{}",
try_count, this->cat_account_serv->account_queue_pro_task.size_approx());
break;
}
} while (!succeeded);
if (ts_data != nullptr) {
switch (/**/)
{
// do something
default: {
} break;
}
// enqueue task to consumer
this->cat_account_serv->account_queue_con_task.enqueue(ts_data);
}
else {
cat_console_logger->error("CatTd::OnRtnOrder() ts_data is nullptr!");
}
}
}
consumer thread (handle task):
void CatAccountService::th_handle_task_queue() {
CatTaskField* ctf;
int strategy_id;
cat_console_logger->info("CatAccountService::th_handle_task_queue() IN :)");
this->b_start_trading_service = true;
while (1) {
ctf = nullptr;
this->account_queue_con_task.wait_dequeue(ctf);
if (ctf != nullptr) {
switch (ctf->TaskType)
{
// do something
default: {
} break;
}
// reset data
memset(ctf, 0x00, sizeof(CatTaskField));
// producer recycling
bool enqueue_flag = this->account_queue_pro_task.enqueue(ctf);
cat_console_logger->info("CatAccountService::th_handle_task_queue() enqueue_flag:{} account->account_queue_pro_task.size_approx:{}", enqueue_flag, this->account_queue_pro_task.size_approx());
}
else {
cat_console_logger->info("CatAccountService::th_handle_task_queue() ctf is NULLPTR!!! EXIT :)");
break;
}
}
this->b_start_trading_service = false;
b_exit_trading_serv = true;
this->cv_data_exit_trading_serv.notify_one();
cat_console_logger->info("CatAccountService::th_handle_task_queue() EXIT :)");
}
Simple demo, but difficult to reproduce, I am still testing it
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <cstring>
#include <chrono>
#include "blockingconcurrentqueue.h"
#include "concurrentqueue.h"
struct TaskField {
int TaskType;
};
class TaskManager {
public:
TaskManager() {
v_bulk_tasks.reserve(TASK_QUEUE_MAXIMUM);
for (int i = 0; i < TASK_QUEUE_MAXIMUM; ++i) {
v_bulk_tasks.emplace_back();
pro_queue.enqueue(&v_bulk_tasks[i]);
}
}
void start() {
std::thread t1(&TaskManager::on_tick, this);
std::thread t2(&TaskManager::on_order, this);
std::thread t3(&TaskManager::handle_task, this);
t1.detach();
t2.detach();
t3.detach();
}
private:
static const int TASK_QUEUE_MAXIMUM = 10;
std::vector<TaskField> v_bulk_tasks;
moodycamel::BlockingConcurrentQueue<TaskField *> con_queue;
moodycamel::ConcurrentQueue<TaskField *> pro_queue;
void on_tick() {
TaskField *ts_data = nullptr;
bool succeeded = false;
while (true) {
do {
succeeded = pro_queue.try_dequeue(ts_data);
} while (!succeeded);
if (ts_data != nullptr) {
ts_data->TaskType = 1;
bool enqueue_flag = con_queue.enqueue(ts_data);
std::cout << "on_tick() enqueue_flag: " << enqueue_flag
<< " con_queue.size_approx: " << con_queue.size_approx() << std::endl;
} else {
std::cerr << "on_tick() ts_data is nullptr!" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
void on_order() {
TaskField *ts_data = nullptr;
bool succeeded = false;
while (true) {
do {
succeeded = pro_queue.try_dequeue(ts_data);
} while (!succeeded);
if (ts_data != nullptr) {
ts_data->TaskType = 2;
bool enqueue_flag = con_queue.enqueue(ts_data);
std::cout << "on_order() enqueue_flag: " << enqueue_flag
<< " con_queue.size_approx: " << con_queue.size_approx() << std::endl;
} else {
std::cerr << "on_order() ts_data is nullptr!" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
void handle_task() {
TaskField *ctf;
while (1) {
con_queue.wait_dequeue(ctf);
if (ctf != nullptr) {
std::cout << "handle_task() TaskType: " << ctf->TaskType << std::endl;
// reset data
memset(ctf, 0, sizeof(TaskField));
// recycle to producer queue
bool enqueue_flag = pro_queue.enqueue(ctf);
std::cout << "handle_task() enqueue_flag: " << enqueue_flag
<< " pro_queue.size_approx: " << pro_queue.size_approx() << std::endl;
} else {
std::cout << "handle_task() ctf is NULLPTR!!! EXIT :)" << std::endl;
}
}
}
};
int main() {
TaskManager manager;
manager.start();
while (1) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
return 0;
}
Thanks for the standalone example. What am I looking for in the output? It seems to run without hanging locally.
thanks for your reply。I used this small demo for testing and ran it for a long time, but the bug still failed to reappear. You can close the issue first。I will issue again when I reproduce the bug.