workflow icon indicating copy to clipboard operation
workflow copied to clipboard

机器学习场景的使用的最佳实践是什么(实用案例推荐)

Open rockyzhengwu opened this issue 2 years ago • 18 comments

场景

服务端是一个 PyTorch C++ 实现的 CTR 预估的服务,预估之前需要去 KV(类似 redis) 里读取特征作为 CTR 预估的输入。一个客户端请求可能读一次,也可能读两次 KV 。

详细流程可能是这样: 拿到请求体 -> 解析 -> 读 kv -> 产生模型输入 -> CTR 模型 -> calibration 模型 -> 其他逻辑 -> 返回 。

现在的实现

看了下 demo 然后就写了大概这样的逻辑

static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo) { wait_group.done(); }

int main(int argc, char *argv[]) {
  // ....   something init

  unsigned short port = 8083;
  signal(SIGINT, sig_handler);
  signal(SIGTERM, sig_handler);
  WFHttpServer server(process);
  if (server.start(port) == 0) {
    wait_group.wait();
    server.stop();
  } else {
    perror("Cannot start server");
    exit(1);
  }
  return 0;
}

void process(WFHttpTask *server_task) {
  protocol::HttpRequest *req = server_task->get_req();
  protocol::HttpResponse *resp = server_task->get_resp();
  long long seq = server_task->get_task_seq();
  
  resp->set_http_version("HTTP/1.1");
  resp->set_status_code("200");

  std::string body;
  auto uri = req->get_request_uri();
  if (std::strcmp(uri, "/cvr") == 0) {
    const void *body;
    size_t size;
    req->get_parsed_body(&body, &size);
    std::string req_body = static_cast<const char *>(body);
    std::string response_body = "";
    MODL_MANAGER->predict(std::move(req_body),  response_body);
    resp->append_output_body(response_body.data(), response_body.size());
  } else {
    resp->set_status_code("404");
  }
}

遇到的问题

由于在 MODL_MANAGER->predict(std::move(req_body), response_body); 里面实现了等待读取 KV 的逻辑和模型预估的部分。但读 KV 有可能会卡住(看了很久还不是很理解这个问题)。 想不占线程的读 KV,不知道怎么实现合适。一番讨论发现从我开始用 workflow 的时候就好像用错了,似乎在最开始就应该把逻辑分成不同的 task 。所以想是不是有什么最佳的实践。

补充一下, workflow 线上用了一年多了,在我要读 kv 之前都没任何问题, 50 ms 的预估时间,之前尝试过很多都不能很好的解决超时, workflow 可以..... 目前 p99 30ms cpu 打满基本不超时。

rockyzhengwu avatar Jun 08 '22 15:06 rockyzhengwu

你好。这个里面好几个问题。我一个个回复。 首先,如果你的整个predict过程是一个纯计算,那么,最好的实现是吧predict包装成一个计算任务,并且push_back到server task所在的series。这么做比直接在process里计算要好一些,可以让计算运行在计算线程,而不是占用网络线程。示例:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void process(WFHttpTask *task)
{
    void *body;
    size_t size;

    task->get_req()->get_parsed_body(&body, &size);
    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([task](WFGoTask *predict_task) {
        std::string *response_body = (std::string *)task->user_data;
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(predict_task);
}

很多用户误认为process函数结束就是服务处理流程结束了。其实,服务处理流程是series而不是process函数,否者,我们无法实现全异步的server了。这个示例,我们产生一个计算任务(predict_task)并添加到series,在计算任务的callback里填写了resp。由于想要最佳实践,这里用了append_output_body_nocopy,并且在server task的callback里释放response_body。 这里就涉及了多个调用的时机。predict_fn调用显然不在process函数线程里,而是process函数结束之后,在某个计算线程里被调用。而predict_task的callback在predict_fn之后在同一个线程里被执行,我们在这里填写resp,并且使用nocopy接口,因为response_body在回复结束之前,都还没有被delete。 还有一个函数,就是server task的callback。server task的callback是回复结束之后被调用(这和http client task一致,都是http交互完成调用callback),在这里我们释放了response_body。 当然,如果不那么要求性能,这个实现可以简单一些。比如不用_nocopy接口。

Barenboim avatar Jun 08 '22 17:06 Barenboim

第二个问题,你需要在计算中插入通讯。我不知道你的kv是什么协议的通讯,我们就假设是redis(自定义协议的话可以自己实现,参考相关文档)。如果"解析"部分计算量不大,直接在process里算就可以了。我们假设这个计算量确实不大,那么,现在就是先访问redis再predict计算,然后填写回复。可以这么实现:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([](WFGoTask *predict_task) {
        WFHttpTask *server_task = (WFHttpTask *)series_of(predict_task)->get_context(); // 取回server task
        std::string *response_body = (std::string *)server_task->user_data;
        server_task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });

    series_of(task)->push_back(predict_task); // 把计算任务放进series
}

void process(WFHttpTask *task)
{
    SeriesWork *series = series_of(task);

    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    redis_task->get_req()->set_request(....);

    series->set_context(task);    // 把server_task指针放在series上下文
    series->push_back(redis_task);
}

这个就构成了一个网络任务+计算任务的处理流程。整个过程无需等待,也没有占有网络线程进行复杂的计算。如果在redis任务之前,还需要进行比较复杂的计算,同理增加一个计算任务即可。 但是,我们发现,如果处理流程过于复杂,你就不的不让每个任务callback里发起下一步操作。这会让代码很零碎。特别是,当A任务是功能X的最后一个任务,任务B是功能Y的第一个任务。因为X功能之后执行Y,就需要让A的callback去创建B,这是很不合理的。那么,对于这种复杂的需求,就可以使用我们的模块任务。

Barenboim avatar Jun 08 '22 17:06 Barenboim

当若干个任务完成一个特定的功能,就可以使用模块任务了。例如,我们认为redis和predict构成一个完整的功能,那么上例中最不合理的就是让predict_task的callback去填写response。用模块可以这样改造:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    std::string *response_body = new std::string;
    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    // 不再需要给predict_task设置callback了。耦合性降低。计算结果放在series context。
    series_of(task)->set_context(response_body);
    series_of(task)->push_back(predict_task); // 把计算任务放进series。这个series其实是module内的sub_series
}

void process(WFHttpTask *task)
{
    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    WFModuleTask *module = WFTaskFactory::create_module_task(redis_task, [task](WFModuleTask *mod) {
        std::string *response_body = mod->sub_series()->get_context();  // 这里是sub_series不是series_of
        task->user_data = response_body;
        task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(module);
}

可以看一下module的文档:https://github.com/sogou/workflow/blob/master/docs/about-module.md

Barenboim avatar Jun 08 '22 18:06 Barenboim

总结

  • server处理流程是整个series,不是process函数。不要在process函数里等待或进行复杂计算。
  • 如果处理流程是一个纯计算,包装成计算任务push到series里,在任务callback里填写resp。
  • 如果是计算和通讯结合,那么,在每一个任务callback,创建并push下一个任务,最后填写resp。
  • 逻辑复杂的server,可以使用模块任务(WFModuleTask)进行模块级封装,降低任务之间的耦合。

Barenboim avatar Jun 08 '22 18:06 Barenboim

@Barenboim 明白了,感谢这么优秀的项目,特别耐心的解答

rockyzhengwu avatar Jun 09 '22 03:06 rockyzhengwu

@rockyzhengwu 我之前也遇到过类似的需求,如果模型处理时间超过一定范围就进行其他的处理逻辑。我想到的解决思路是首先设置一个counter任务当作塞子用来唤醒后续的任务流,接着在模型计算任务函数中设置一个定时任务(定时时间为超时时间),然后在模型计算任务的callback函数和定时任务的callback函数中分别count之前的counter,这样无论是计算函数先完成还是定时任务先完成均可以顺利进入counter的callback函数。在counter的callback函数中push back你需要的后续流程就可以了。

大致的代码差不多长这个样子,不知道能不能解决你的问题哈:)

void do_segmentation() {
    auto segmentation_timeout_timer = WFTaskFactory::create_timer_task(20, 0,[&](const WFTimerTask* task) {
        LOG(INFO) << "segmentation task timeout at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });
    segmentation_timeout_timer->start();
    LOG(INFO) << "start doing segmentation: " << Timestamp::now().to_format_str();
    std::this_thread::sleep_for(std::chrono::seconds(25));
}

int main(int argc, char** argv) {

    WFFacilities::WaitGroup wait_group(1);
    auto* series = Workflow::create_series_work(
            WFTaskFactory::create_empty_task(),
            [&](const SeriesWork* work){
                LOG(INFO) << "Series complete at: " << Timestamp::now().to_format_str();
            });

    WFTimerTask* get_image_data = WFTaskFactory::create_timer_task(
            1000000 * 5, [&](const WFTimerTask* task) {
                LOG(INFO) << "successfully get image data at: " << Timestamp::now().to_format_str();
            });
    WFCounterTask *counter = WFTaskFactory::create_counter_task("count", 1, [&](const WFCounterTask* task) {
        LOG(INFO) << "counter task complete at: " << Timestamp::now().to_format_str();
        auto* postprocess_work = WFTaskFactory::create_go_task("postprocess", [&]() {
            LOG(INFO) << "do postprocess task at: " << Timestamp::now().to_format_str();
        });
        *series_of(task) << postprocess_work;
    });
    counter->start();

    auto* segmentaiton_work = WFTaskFactory::create_go_task("do_segmentation", do_segmentation);
    segmentaiton_work->set_callback([&](const WFGoTask* task) {
        LOG(INFO) << "complete segmentation at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });

    *series << get_image_data;
    *series << segmentaiton_work;
    series->start();
    
    LOG(INFO) << "start whole process at: " << Timestamp::now().to_format_str();
    wait_group.wait();
    return 1;
}

@Barenboim 也请大佬帮忙看看这样写对不对,或者有没有更好的解决方案。

还有个小疑问,就是更加理想的状态是不应该在超时之后就取消计算任务,但是我查了现在我的文档,好像一旦start的任务是不能cancle的,这样如果我的计算任务一直卡死的话,这个线程是不是就被一直占用,浪费了。

MaybeShewill-CV avatar Jun 09 '22 13:06 MaybeShewill-CV

先回答小疑问。运行中的计算任务是你的一个函数,显然框架无法中断你执行中的函数。这个我们也许可以考虑搞一个进程任务(感觉有点像actor模式了),时间到了直接kill听起来很刺激,起码是个理论上可行的方案。 我们内部有workflow的低代码框架,就是可以通过一个配置自动生成workflow代码。这里可以给计算任务设置超时或中断。但这个肯定需要在函数里加入检查点,让函数可以中途自行返回。代码写成死循环是没有办法的。 actor模式可以想一下。

Barenboim avatar Jun 09 '22 15:06 Barenboim

@Barenboim 感谢解答。下来去了解下actor哈:)

MaybeShewill-CV avatar Jun 09 '22 15:06 MaybeShewill-CV

@rockyzhengwu 我之前也遇到过类似的需求,如果模型处理时间超过一定范围就进行其他的处理逻辑。我想到的解决思路是首先设置一个counter任务当作塞子用来唤醒后续的任务流,接着在模型计算任务函数中设置一个定时任务(定时时间为超时时间),然后在模型计算任务的callback函数和定时任务的callback函数中分别count之前的counter,这样无论是计算函数先完成还是定时任务先完成均可以顺利进入counter的callback函数。在counter的callback函数中push back你需要的后续流程就可以了。

大致的代码差不多长这个样子,不知道能不能解决你的问题哈:)

void do_segmentation() {
    auto segmentation_timeout_timer = WFTaskFactory::create_timer_task(20, 0,[&](const WFTimerTask* task) {
        LOG(INFO) << "segmentation task timeout at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });
    segmentation_timeout_timer->start();
    LOG(INFO) << "start doing segmentation: " << Timestamp::now().to_format_str();
    std::this_thread::sleep_for(std::chrono::seconds(25));
}

int main(int argc, char** argv) {

    WFFacilities::WaitGroup wait_group(1);
    auto* series = Workflow::create_series_work(
            WFTaskFactory::create_empty_task(),
            [&](const SeriesWork* work){
                LOG(INFO) << "Series complete at: " << Timestamp::now().to_format_str();
            });

    WFTimerTask* get_image_data = WFTaskFactory::create_timer_task(
            1000000 * 5, [&](const WFTimerTask* task) {
                LOG(INFO) << "successfully get image data at: " << Timestamp::now().to_format_str();
            });
    WFCounterTask *counter = WFTaskFactory::create_counter_task("count", 1, [&](const WFCounterTask* task) {
        LOG(INFO) << "counter task complete at: " << Timestamp::now().to_format_str();
        auto* postprocess_work = WFTaskFactory::create_go_task("postprocess", [&]() {
            LOG(INFO) << "do postprocess task at: " << Timestamp::now().to_format_str();
        });
        *series_of(task) << postprocess_work;
    });
    counter->start();

    auto* segmentaiton_work = WFTaskFactory::create_go_task("do_segmentation", do_segmentation);
    segmentaiton_work->set_callback([&](const WFGoTask* task) {
        LOG(INFO) << "complete segmentation at:" << Timestamp::now().to_format_str();
        WFTaskFactory::count_by_name("count");
    });

    *series << get_image_data;
    *series << segmentaiton_work;
    series->start();
    
    LOG(INFO) << "start whole process at: " << Timestamp::now().to_format_str();
    wait_group.wait();
    return 1;
}

@Barenboim 也请大佬帮忙看看这样写对不对,或者有没有更好的解决方案。

还有个小疑问,就是更加理想的状态是不应该在超时之后就取消计算任务,但是我查了现在我的文档,好像一旦start的任务是不能cancle的,这样如果我的计算任务一直卡死的话,这个线程是不是就被一直占用,浪费了。

你这个程序写太乱了,有点看不懂。一般就是通过一个目标值为1的counter,如果上一个计算完成,或timer到期都count一下,让流程可以继续。 感觉你这个demo里,用了太多的timer和sleep来模拟读库和计算,很难一眼看出哪里是用来控制超时,哪里是用来模拟读库的延迟,只能说不是一个好的demo。 另外,如果用运算符重载,可以直接*series << get_image_data << segmentaiton_work,也可以**task << postprocess_work。

Barenboim avatar Jun 10 '22 09:06 Barenboim

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

MaybeShewill-CV avatar Jun 10 '22 13:06 MaybeShewill-CV

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

不是**series哈,是**task。*task得到task的引用,task对象的'*'运算符,得到task所在series的引用,所以可以: **task << task1 << task2

Barenboim avatar Jun 10 '22 13:06 Barenboim

@Barenboim 嗯嗯 哈哈哈 键盘敲错了 ==!

MaybeShewill-CV avatar Jun 10 '22 16:06 MaybeShewill-CV

@MaybeShewill-CV 你看一下这个新功能:https://github.com/sogou/workflow/pull/938

class WFTaskFactory
{
    /* Create 'Go' task with running time limit in seconds plus nanoseconds.
     * If time exceeded, state WFT_STATE_ABORTED will be got in callback. */
    template<class FUNC, class... ARGS>
    static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
                                         const std::string& queue_name,
                                         FUNC&& func, ARGS&&... args);
};

Barenboim avatar Jun 13 '22 12:06 Barenboim

@Barenboim 好滴,感觉这样很方便了,可以傻瓜式编程了。明天抽空试试看:)

MaybeShewill-CV avatar Jun 13 '22 12:06 MaybeShewill-CV

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

MaybeShewill-CV avatar Jun 14 '22 12:06 MaybeShewill-CV

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

主要是从你的demo上看出来,计算时间限制好像用户非常有需求,所以就想到了实现这个功能。然后发现go task上加这个功能非常的自然,因为任务的类型是确定的。在实现的过程中,也帮我们优化了一些核心代码。果然用户是我们发展的原动力。

Barenboim avatar Jun 14 '22 15:06 Barenboim

@MaybeShewill-CV 现在thread task也支持加超时了。

template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
public:
    static T *create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);
};

Barenboim avatar Oct 20 '22 09:10 Barenboim

@Barenboim 点赞! 回去看看之前几个thread_task的任务能不能更新下:)

MaybeShewill-CV avatar Oct 20 '22 09:10 MaybeShewill-CV