co_context icon indicating copy to clipboard operation
co_context copied to clipboard

关于协程调度的问题

Open coolhuhu opened this issue 1 year ago • 0 comments

问题背景

我正在学习协程的内容,我参照 example 中的例子尝试用 co_context 库写了一个 echo_client 和 echo_server,其中 echo_client 借鉴了 muduo 中的例子,并想基于此和 muduo 进行性能对比,但程序运行结果和预想的不一致。

问题描述

我的运行环境为:阿里云 ECS 云服务器,2核(vCPU) 2GiB,Ubuntu 22.04,g++ 11.4.0

muduo 的实现代码链接为 https://github.com/chenshuo/muduo/tree/master/examples/pingpong

使用 muduo 中提供的 脚本 直接运行,测试结果为:

pingpong_server: no process found Bufsize: 16384 Threads: 1 Sessions: 1 20240423 08:05:48.429949Z 630711 INFO pid = 630711, tid = 630711 - server.cc:38 20240423 08:05:49.430760Z 630713 INFO pid = 630713, tid = 630713 - client.cc:197 20240423 08:05:49.431014Z 630713 WARN all connected - client.cc:122 20240423 08:05:59.430831Z 630713 WARN stop - client.cc:158 20240423 08:05:59.431107Z 630713 WARN all disconnected - client.cc:130 20240423 08:05:59.431125Z 630713 WARN 4261986304 total bytes read - client.cc:139 20240423 08:05:59.431132Z 630713 WARN 260131 total messages read - client.cc:140 20240423 08:05:59.431138Z 630713 WARN 16384 average message size - client.cc:141 20240423 08:05:59.431149Z 630713 WARN 406.4546875 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 10 20240423 08:06:04.435839Z 630754 INFO pid = 630754, tid = 630754 - server.cc:38 20240423 08:06:05.437393Z 630757 INFO pid = 630757, tid = 630757 - client.cc:197 20240423 08:06:05.437965Z 630757 WARN all connected - client.cc:122 20240423 08:06:15.437484Z 630757 WARN stop - client.cc:158 20240423 08:06:15.437957Z 630757 WARN all disconnected - client.cc:130 20240423 08:06:15.437972Z 630757 WARN 17275011072 total bytes read - client.cc:139 20240423 08:06:15.437974Z 630757 WARN 1054383 total messages read - client.cc:140 20240423 08:06:15.438011Z 630757 WARN 16384 average message size - client.cc:141 20240423 08:06:15.438021Z 630757 WARN 1647.4734375 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 100 20240423 08:06:20.442012Z 630820 INFO pid = 630820, tid = 630820 - server.cc:38 20240423 08:06:21.442723Z 630823 INFO pid = 630823, tid = 630823 - client.cc:197 20240423 08:06:21.447357Z 630823 WARN all connected - client.cc:122 20240423 08:06:31.443814Z 630823 WARN stop - client.cc:158 20240423 08:06:31.445412Z 630823 WARN all disconnected - client.cc:130 20240423 08:06:31.445427Z 630823 WARN 19154436096 total bytes read - client.cc:139 20240423 08:06:31.445430Z 630823 WARN 1169094 total messages read - client.cc:140 20240423 08:06:31.445431Z 630823 WARN 16384 average message size - client.cc:141 20240423 08:06:31.445446Z 630823 WARN 1826.709375 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 1000 20240423 08:06:36.449339Z 630884 INFO pid = 630884, tid = 630884 - server.cc:38 20240423 08:06:37.450658Z 630889 INFO pid = 630889, tid = 630889 - client.cc:197 20240423 08:06:37.495573Z 630889 WARN all connected - client.cc:122 20240423 08:06:47.451004Z 630889 WARN stop - client.cc:158 20240423 08:06:47.484683Z 630889 WARN all disconnected - client.cc:130 20240423 08:06:47.484710Z 630889 WARN 12080676864 total bytes read - client.cc:139 20240423 08:06:47.484859Z 630889 WARN 737346 total messages read - client.cc:140 20240423 08:06:47.484863Z 630889 WARN 16384 average message size - client.cc:141 20240423 08:06:47.484912Z 630889 WARN 1152.103125 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 10000 20240423 08:06:52.490363Z 630949 INFO pid = 630949, tid = 630949 - server.cc:38 20240423 08:06:53.490886Z 630963 INFO pid = 630963, tid = 630963 - client.cc:197 20240423 08:06:53.909448Z 630963 WARN all connected - client.cc:122 20240423 08:07:03.537228Z 630963 WARN stop - client.cc:158 20240423 08:07:03.933908Z 630963 WARN all disconnected - client.cc:130 20240423 08:07:03.934199Z 630963 WARN 10123739136 total bytes read - client.cc:139 20240423 08:07:03.934213Z 630963 WARN 617904 total messages read - client.cc:140 20240423 08:07:03.934215Z 630963 WARN 16384 average message size - client.cc:141 20240423 08:07:03.934220Z 630963 WARN 965.475 MiB/s throughput - client.cc:143

我写的 echo_client 和 echo_server 程序贴在末尾。 运行结果为: echo_server: no process found Bufsize: 16384 Threads: 1 Sessions: 1 Tue Apr 23 16:20:17 2024 client start... all connected stop. all disconnected 4071440384 total bytes read 248501 total messages read 16384 bytes per message 388.283 MiB/s throughout Tue Apr 23 16:20:27 2024 client end...

Bufsize: 16384 Threads: 1 Sessions: 10 Tue Apr 23 16:20:33 2024 client start... all connected stop. all disconnected 15644426240 total bytes read 954860 total messages read 16384 bytes per message 1491.97 MiB/s throughout Tue Apr 23 16:20:43 2024 client end...

Bufsize: 16384 Threads: 1 Sessions: 100 Tue Apr 23 16:20:49 2024 client start... all connected stop. all disconnected 17007493120 total bytes read 1038055 total messages read 16384 bytes per message 1621.96 MiB/s throughout Tue Apr 23 16:20:59 2024 client end...

Bufsize: 16384 Threads: 1 Sessions: 1000 Tue Apr 23 16:21:05 2024 client start... all connected stop. all disconnected 8327217152 total bytes read 508253 total messages read 16384 bytes per message 794.145 MiB/s throughout Tue Apr 23 16:21:15 2024 client end...

Bufsize: 16384 Threads: 1 Sessions: 10000 Tue Apr 23 16:21:21 2024 client start... stop. all disconnected 259922837504 total bytes read 15864431 total messages read 16384 bytes per message 24788.2 MiB/s throughout Tue Apr 23 16:26:12 2024 client end...

测试并发量的时长都是 10s

由上面贴出的muduo的运行结果和我写的测试程序的运行结果可知,当并发量为1000时,使用 co_context 写的测试程序在性能上有下降。当并发量为10000时,Client::handleTimeout函数执行时间太长(上面结果的标粗部分)。

我猜测是 Session::stop() 中 使用 mutex 保护了 co_await,但不确定,希望能够解答。若是 mutex 的原因,我该如何修改程序呢?

另一方面,对于初学协程的我,对协程调度的有疑惑。在 co_context 中:

  1. 我 co_spawn 了一个新的协程,这个 co_spawn 什么时候被调度呢?是不是在底层,有类似队列的东西?按先进先出的顺序调度。
  2. 当我 co_await 了一个操作,会暂停当前协程,让出控制权。那当这个操作完成后,会及时的恢复这个协程吗?

我的实现

测试脚本为:

#!/bin/sh

killall echo_server
timeout=${timeout:-10}
bufsize=${bufsize:-16384}
nothreads=1

for nosessions in 1 10 100 1000 10000; do
  sleep 5
  echo "Bufsize: $bufsize Threads: $nothreads Sessions: $nosessions"
  taskset -c 0 ./echo_server 33333 & srvpid=$!
  sleep 1
  taskset -c 1 ./echo_client 127.0.0.1 33333 $nothreads $bufsize $nosessions $timeout
  kill -9 $srvpid
done
// echo_server.cpp

#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
using namespace co_context;

task<> session(int sockfd)
{
    co_context::socket sock{sockfd};
    constexpr int len = 16384;
    char buf[len];

    while (true)
    {
        auto nr = co_await lazy::recv(sockfd, buf);
        if (nr <= 0) {
            co_await lazy::close(sockfd);
            co_return;
        }

        co_await lazy::send(sockfd, {buf, static_cast<size_t>(nr)});
    }
}

task<> server(const uint16_t port)
{
    /*
        使用 co_contenxt 提供的 helper
        为简化工作,不考虑一些错误处理,可以直接使用 co_context 提供的 acceptor 类
    */ 
    inet_address listen_addr{port};
    const int listen_sockfd = ::socket(listen_addr.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
    if (listen_sockfd < 0) {
        std::cerr << "socket error: " << strerror(errno) << std::endl;
        co_return;
    }
    
    int optval = 1;
    if (::setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, 
        &optval, static_cast<socklen_t>(sizeof(optval))) < 0) {
        std::cerr << "setsockopt SO_REUSEADDR error: " << strerror(errno) << std::endl;
        co_return;
    }

    optval = 1;
    if (::setsockopt(
            listen_sockfd, IPPROTO_TCP, TCP_NODELAY, &optval,
            static_cast<socklen_t>(sizeof optval)
        ) < 0) {
        std::cout << "setsockopt TCP_NODELAY error: " << strerror(errno) << std::endl;
        co_return;
    }

    if (::bind(listen_sockfd, listen_addr.get_sockaddr(), listen_addr.length()) != 0) {
        std::cerr << "bind error: " << strerror(errno) << std::endl;
        co_return;
    }

    if (::listen(listen_sockfd, SOMAXCONN) != 0) {
        std::cerr << "listen error: " << strerror(errno) << std::endl;
        co_return;
    }

    for (; ;) {
        int sockfd = co_await lazy::accept(listen_sockfd, nullptr, nullptr, 0);
        if (sockfd < 0) {
            std::cerr << "accept error: " << strerror(errno) << std::endl;
            co_return;
        }
        co_spawn(session(sockfd));
    }
}


/*
    ./echo_server 33333
*/
int main(int argc, char *argv[])
{
    if (argc != 2) {
        std::cerr << "Usage: " << argv[0] << " <port>\n";
        return 1;
    }

    uint16_t port = static_cast<uint16_t>(std::atoi(argv[1]));
    io_context ctx;
    ctx.co_spawn(server(port));
    ctx.start();
    ctx.join();
    return 0;
}
// echo_client.cpp

#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <string>
#include <atomic>
#include <vector>
#include <chrono>
#include <mutex>

using Second = int;

class Client;

class Session
{
public:
    Session(co_context::io_context *io_context,
            Client *client,
            const co_context::inet_address &addr) : io_context_(io_context),
                                                    client_(client),
                                                    serverAddr_(addr),
                                                    socket_(::socket(serverAddr_.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP)),
                                                    bytesRead_(0),
                                                    byteWritten_(0),
                                                    messagesRead_(0)
    {
    }

    co_context::task<> start();

    co_context::task<> onMessage();

    co_context::task<> stop();

    int64_t bytesRead() const { return bytesRead_; }

    int64_t messagesRead() const { return messagesRead_; }

private:
    co_context::io_context *io_context_;
    Client *client_;
    co_context::inet_address serverAddr_;
    co_context::socket socket_;
    std::vector<char> buffers_;
    int64_t bytesRead_;
    int64_t byteWritten_;
    int64_t messagesRead_;
    std::mutex mutex_;
};

class Client
{
public:
    Client(co_context::io_context *io_context,
           const co_context::inet_address &serverAddr,
           int blockSize,
           int sessionCount,
           Second timeout, int threadCount) : io_context_(io_context),
                                             addr_(serverAddr),
                                             blockSize_(blockSize),
                                             sessionCount_(sessionCount),
                                             timeout_(timeout),
                                             numConntected_(0)
    {
        if (threadCount > 1)
        {
            // TODO: 多线程
        }

        /* 生成数据包 */
        for (int i = 0; i < blockSize; ++i)
        {
            message_.push_back(static_cast<char>(i % 128));
        }
    }

    co_context::task<> start()
    {

        for (int i = 0; i < sessionCount_; ++i)
        {
            Session *session = new Session(io_context_, this, addr_);
            io_context_->co_spawn(session->start());
            // co_await session->start();
            sessions_.emplace_back(session);
        }  

        co_await handleTimeout();
    }

    const std::string &message() const { return message_; }

    int blockSize() const { return blockSize_; }

    void onConnect()
    {
        // std::cout << numConntected_ << " connected" << std::endl;
        if (++numConntected_ == sessionCount_)
        {
            std::cout << "all connected" << std::endl;
        }
    }

    void onDisconnect()
    {
        // std::cout << numConntected_ << " disconnected" << std::endl;
        if (--numConntected_ == 0)
        {
            std::cout << "all disconnected" << std::endl;

            int64_t totalBytesRead = 0;
            int64_t totalMessagesRead = 0;

            for (const auto &session : sessions_)
            {
                totalBytesRead += session->bytesRead();
                totalMessagesRead += session->messagesRead();
            }

            std::cout << totalBytesRead << " total bytes read" << std::endl;
            std::cout << totalMessagesRead << " total messages read" << std::endl;
            std::cout << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead)
                      << " bytes per message" << std::endl;
            std::cout << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024)
                      << " MiB/s throughout" << std::endl;

            auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
            std::cout << std::asctime(std::localtime(&now)) << " client end..." << std::endl;
        }
    }

private:
    co_context::task<> handleTimeout()
    {
        auto deadline = std::chrono::steady_clock::now();
        deadline += std::chrono::seconds(timeout_);
        co_await co_context::timeout_at(deadline);

        std::cout << "stop." << std::endl;
        for (auto &session : sessions_)
        {
            co_await session->stop();
        }
    }

private:
    co_context::io_context *io_context_;
    const co_context::inet_address addr_;
    int blockSize_;
    int sessionCount_;
    Second timeout_;
    std::atomic<int> numConntected_;
    std::string message_;
    std::vector<std::unique_ptr<Session>> sessions_;
};


co_context::task<> Session::start()
{
    // FIXME: connect error ?
    int ret = co_await socket_.connect(serverAddr_);
    if (ret < 0)
    {
        std::cerr << "connect error" << std::endl;
    }

    socket_.set_tcp_no_delay(true);
    client_->onConnect();

    io_context_->co_spawn(onMessage());
    
    // FIXME: send error ?
    int sn = co_await socket_.send(client_->message());
}

co_context::task<> Session::onMessage()
{
    char buf[16384];

    while (true)
    {
        int nr = co_await socket_.recv(buf);
        if (nr <= 0)
        {
            co_return;
        }
        ++messagesRead_;
        bytesRead_ += nr;

        int ns = co_await socket_.send(buf, static_cast<size_t>(nr));
        if (ns <= 0)
        {
            co_return;
        }
        byteWritten_ += ns;
    }
}

co_context::task<> Session::stop()
{
    {
        std::unique_lock<std::mutex> locker(mutex_);
        co_await socket_.shutdown_write();
    }
    
    client_->onDisconnect();
}


/*
    ./echo_client 127.0.0.1 33333 1 16384 100 10
*/
int main(int argc, char *argv[])
{
    auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    std::cout << std::asctime(std::localtime(&now)) << " client start..." << std::endl;

    if (argc != 7)
    {
        fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time>\n");
    }
    else
    {
        const char *ip = argv[1];
        uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
        int threadCount = atoi(argv[3]);
        int blockSize = atoi(argv[4]);
        int sessionCount = atoi(argv[5]);
        int timeout = atoi(argv[6]);

        co_context::io_context io_ctx;
        co_context::inet_address serverAddr(ip, port);
        
        Client client(&io_ctx, serverAddr, blockSize, sessionCount, timeout, threadCount);
        io_ctx.co_spawn(client.start());
        io_ctx.start();
        io_ctx.join();
    }
}

coolhuhu avatar Apr 23 '24 08:04 coolhuhu