CGraph icon indicating copy to clipboard operation
CGraph copied to clipboard

超时机制导致element 切割两份commit,出现循环依赖

Open yeshenyong opened this issue 10 months ago • 2 comments

简单模仿样例

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>

class ThreadPool {
public:
    // 构造函数,启动工作线程
    ThreadPool() : done(false), worker_thread(&ThreadPool::worker_thread_func, this) {}

    // 禁止拷贝构造函数和拷贝赋值操作符
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    // 析构函数,结束工作线程
    ~ThreadPool() {
        // 通知工作线程退出
        {
            std::lock_guard<std::mutex> lock(queue_mutex);
            done = true;
        }
        // 唤醒所有等待线程
        condition.notify_all();
        if (worker_thread.joinable()) {
            worker_thread.join();
        }
    }

    // 添加任务到线程池
    template <typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
        typedef typename std::result_of<FunctionType()>::type result_type;

        // 封装任务成std::packaged_task
        std::packaged_task<result_type()> task(std::move(f));
        std::future<result_type> res(task.get_future());
        {
            // 将任务添加到队列中
            std::lock_guard<std::mutex> lock(queue_mutex);
            if (done) {
                throw std::runtime_error("submit on stopped ThreadPool");
            }
            tasks.emplace(std::move(task));
        }
        // 通知一个等待中的线程
        condition.notify_one();
        return res;
    }

private:
    // 工作线程函数
    void worker_thread_func() {
        while (true) {
            std::packaged_task<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                condition.wait(lock, [this]() { return done || !tasks.empty(); });
                if (done && tasks.empty()) {
                    // 如果完成标志已设置且任务队列为空,则退出线程
                    return;
                }
                task = std::move(tasks.front());
                tasks.pop();
            }
            // 执行任务
            task();
        }
    }

private:
    std::atomic<bool> done; // 线程池完成标志
    std::mutex queue_mutex;  // 任务队列互斥锁
    std::condition_variable condition;  // 任务队列条件变量
    std::queue<std::packaged_task<void()>> tasks; // 任务队列
    std::thread worker_thread; // 工作线程
};

// 以下是使用ThreadPool的示例代码

#include <iostream>
ThreadPool pool;

void timeoutTask() {
    // 睡眠2s
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Executing timeoutTask on thread " << std::this_thread::get_id() << std::endl;
}

void exampleTask() {
    std::cout << "Executing exampleTask on thread " << std::this_thread::get_id() << std::endl;
    // 此处模仿CGraph 运行机制 = GElement::Run() + GElement 超时机制
    auto future = pool.submit(timeoutTask);
    auto futStatus = future.wait_for(std::chrono::seconds(5)); // 等待1s,如果超时,则打印超时信息
    std::cout << "exampleTask finished on thread " << std::this_thread::get_id() << std::endl;
    if (futStatus == std::future_status::timeout) {
        std::cout << "exampleTask timeout on thread " << std::this_thread::get_id() << std::endl;
    }
}

int main() {
    auto future = pool.submit(exampleTask);
    future.wait(); // 如果需要,这里可以等待任务完成
    return 0;
}

涉及CGraph 代码


// GElement.cpp
CStatus GElement::asyncRun() {
    CGRAPH_FUNCTION_BEGIN
    CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!isAsync(), "[" + name_ + "] cannot async run.")

    async_result_ = thread_pool_->commit([this] {  // 二次commit
        return run();
    }, CGRAPH_POOL_TASK_STRATEGY);

    auto futStatus = async_result_.wait_for(std::chrono::milliseconds(timeout_));
    if (std::future_status::ready == futStatus) {
        status = getAsyncResult();
    } else {
        CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION( GElementTimeoutStrategy::AS_ERROR == timeout_strategy_,    \
        "[" + name_ + "] running time more than [" + std::to_string(timeout_) + "]ms")
        cur_state_.store(GElementState::TIMEOUT, std::memory_order_release);
    }

    CGRAPH_FUNCTION_END
}
// GDynamicEngine.cpp
CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
    if (unlikely(cur_status_.isErr() || element->done_)) {
        /**
         * 如果已经有异常逻辑,
         * 或者传入的element,是已经执行过的了(理论上不会出现这种情况,由于提升性能的原因,取消了atomic计数的逻辑,故添加这一处判定,防止意外情况)
         * 则直接停止当前流程
         */
        return;
    }

    const auto& execute = [this, element] {
        const CStatus& curStatus = element->fatProcessor(CFunctionType::RUN);
        if (unlikely(curStatus.isErr())) {
            // 当且仅当整体状正常,且当前状态异常的时候,进入赋值逻辑。确保不重复赋值
            cur_status_ += curStatus;
        }
        afterElementRun(element);
    };

    if (affinity
        && CGRAPH_DEFAULT_BINDING_INDEX == element->getBindingIndex()) {
        // 如果 affinity=true,表示用当前的线程,执行这个逻辑。以便增加亲和性
        execute();
    } else {
        thread_pool_->commit(execute, calcIndex(element)); // 一次commit
    }
}

yeshenyong avatar Apr 02 '24 09:04 yeshenyong

批主想知道,你打算如何修改这一块的内容呢?

ChunelFeng avatar Apr 16 '24 16:04 ChunelFeng

您好,我是叶燊泳,已收到你的邮件。

yeshenyong avatar Apr 16 '24 16:04 yeshenyong

您好,我是叶燊泳,已收到你的邮件。

yeshenyong avatar Jul 06 '24 17:07 yeshenyong