CGraph
CGraph copied to clipboard
超时机制导致element 切割两份commit,出现循环依赖
简单模仿样例
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
class ThreadPool {
public:
// 构造函数,启动工作线程
ThreadPool() : done(false), worker_thread(&ThreadPool::worker_thread_func, this) {}
// 禁止拷贝构造函数和拷贝赋值操作符
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 析构函数,结束工作线程
~ThreadPool() {
// 通知工作线程退出
{
std::lock_guard<std::mutex> lock(queue_mutex);
done = true;
}
// 唤醒所有等待线程
condition.notify_all();
if (worker_thread.joinable()) {
worker_thread.join();
}
}
// 添加任务到线程池
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;
// 封装任务成std::packaged_task
std::packaged_task<result_type()> task(std::move(f));
std::future<result_type> res(task.get_future());
{
// 将任务添加到队列中
std::lock_guard<std::mutex> lock(queue_mutex);
if (done) {
throw std::runtime_error("submit on stopped ThreadPool");
}
tasks.emplace(std::move(task));
}
// 通知一个等待中的线程
condition.notify_one();
return res;
}
private:
// 工作线程函数
void worker_thread_func() {
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this]() { return done || !tasks.empty(); });
if (done && tasks.empty()) {
// 如果完成标志已设置且任务队列为空,则退出线程
return;
}
task = std::move(tasks.front());
tasks.pop();
}
// 执行任务
task();
}
}
private:
std::atomic<bool> done; // 线程池完成标志
std::mutex queue_mutex; // 任务队列互斥锁
std::condition_variable condition; // 任务队列条件变量
std::queue<std::packaged_task<void()>> tasks; // 任务队列
std::thread worker_thread; // 工作线程
};
// 以下是使用ThreadPool的示例代码
#include <iostream>
ThreadPool pool;
void timeoutTask() {
// 睡眠2s
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Executing timeoutTask on thread " << std::this_thread::get_id() << std::endl;
}
void exampleTask() {
std::cout << "Executing exampleTask on thread " << std::this_thread::get_id() << std::endl;
// 此处模仿CGraph 运行机制 = GElement::Run() + GElement 超时机制
auto future = pool.submit(timeoutTask);
auto futStatus = future.wait_for(std::chrono::seconds(5)); // 等待1s,如果超时,则打印超时信息
std::cout << "exampleTask finished on thread " << std::this_thread::get_id() << std::endl;
if (futStatus == std::future_status::timeout) {
std::cout << "exampleTask timeout on thread " << std::this_thread::get_id() << std::endl;
}
}
int main() {
auto future = pool.submit(exampleTask);
future.wait(); // 如果需要,这里可以等待任务完成
return 0;
}
涉及CGraph 代码
// GElement.cpp
CStatus GElement::asyncRun() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!isAsync(), "[" + name_ + "] cannot async run.")
async_result_ = thread_pool_->commit([this] { // 二次commit
return run();
}, CGRAPH_POOL_TASK_STRATEGY);
auto futStatus = async_result_.wait_for(std::chrono::milliseconds(timeout_));
if (std::future_status::ready == futStatus) {
status = getAsyncResult();
} else {
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION( GElementTimeoutStrategy::AS_ERROR == timeout_strategy_, \
"[" + name_ + "] running time more than [" + std::to_string(timeout_) + "]ms")
cur_state_.store(GElementState::TIMEOUT, std::memory_order_release);
}
CGRAPH_FUNCTION_END
}
// GDynamicEngine.cpp
CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
if (unlikely(cur_status_.isErr() || element->done_)) {
/**
* 如果已经有异常逻辑,
* 或者传入的element,是已经执行过的了(理论上不会出现这种情况,由于提升性能的原因,取消了atomic计数的逻辑,故添加这一处判定,防止意外情况)
* 则直接停止当前流程
*/
return;
}
const auto& execute = [this, element] {
const CStatus& curStatus = element->fatProcessor(CFunctionType::RUN);
if (unlikely(curStatus.isErr())) {
// 当且仅当整体状正常,且当前状态异常的时候,进入赋值逻辑。确保不重复赋值
cur_status_ += curStatus;
}
afterElementRun(element);
};
if (affinity
&& CGRAPH_DEFAULT_BINDING_INDEX == element->getBindingIndex()) {
// 如果 affinity=true,表示用当前的线程,执行这个逻辑。以便增加亲和性
execute();
} else {
thread_pool_->commit(execute, calcIndex(element)); // 一次commit
}
}
批主想知道,你打算如何修改这一块的内容呢?
您好,我是叶燊泳,已收到你的邮件。
您好,我是叶燊泳,已收到你的邮件。