关于协程调度的问题
问题背景
我正在学习协程的内容,我参照 example 中的例子尝试用 co_context 库写了一个 echo_client 和 echo_server,其中 echo_client 借鉴了 muduo 中的例子,并想基于此和 muduo 进行性能对比,但程序运行结果和预想的不一致。
问题描述
我的运行环境为:阿里云 ECS 云服务器,2核(vCPU) 2GiB,Ubuntu 22.04,g++ 11.4.0
muduo 的实现代码链接为 https://github.com/chenshuo/muduo/tree/master/examples/pingpong
使用 muduo 中提供的 脚本 直接运行,测试结果为:
pingpong_server: no process found Bufsize: 16384 Threads: 1 Sessions: 1 20240423 08:05:48.429949Z 630711 INFO pid = 630711, tid = 630711 - server.cc:38 20240423 08:05:49.430760Z 630713 INFO pid = 630713, tid = 630713 - client.cc:197 20240423 08:05:49.431014Z 630713 WARN all connected - client.cc:122 20240423 08:05:59.430831Z 630713 WARN stop - client.cc:158 20240423 08:05:59.431107Z 630713 WARN all disconnected - client.cc:130 20240423 08:05:59.431125Z 630713 WARN 4261986304 total bytes read - client.cc:139 20240423 08:05:59.431132Z 630713 WARN 260131 total messages read - client.cc:140 20240423 08:05:59.431138Z 630713 WARN 16384 average message size - client.cc:141 20240423 08:05:59.431149Z 630713 WARN 406.4546875 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 10 20240423 08:06:04.435839Z 630754 INFO pid = 630754, tid = 630754 - server.cc:38 20240423 08:06:05.437393Z 630757 INFO pid = 630757, tid = 630757 - client.cc:197 20240423 08:06:05.437965Z 630757 WARN all connected - client.cc:122 20240423 08:06:15.437484Z 630757 WARN stop - client.cc:158 20240423 08:06:15.437957Z 630757 WARN all disconnected - client.cc:130 20240423 08:06:15.437972Z 630757 WARN 17275011072 total bytes read - client.cc:139 20240423 08:06:15.437974Z 630757 WARN 1054383 total messages read - client.cc:140 20240423 08:06:15.438011Z 630757 WARN 16384 average message size - client.cc:141 20240423 08:06:15.438021Z 630757 WARN 1647.4734375 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 100 20240423 08:06:20.442012Z 630820 INFO pid = 630820, tid = 630820 - server.cc:38 20240423 08:06:21.442723Z 630823 INFO pid = 630823, tid = 630823 - client.cc:197 20240423 08:06:21.447357Z 630823 WARN all connected - client.cc:122 20240423 08:06:31.443814Z 630823 WARN stop - client.cc:158 20240423 08:06:31.445412Z 630823 WARN all disconnected - client.cc:130 20240423 08:06:31.445427Z 630823 WARN 19154436096 total bytes read - client.cc:139 20240423 08:06:31.445430Z 630823 WARN 1169094 total messages read - client.cc:140 20240423 08:06:31.445431Z 630823 WARN 16384 average message size - client.cc:141 20240423 08:06:31.445446Z 630823 WARN 1826.709375 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 1000 20240423 08:06:36.449339Z 630884 INFO pid = 630884, tid = 630884 - server.cc:38 20240423 08:06:37.450658Z 630889 INFO pid = 630889, tid = 630889 - client.cc:197 20240423 08:06:37.495573Z 630889 WARN all connected - client.cc:122 20240423 08:06:47.451004Z 630889 WARN stop - client.cc:158 20240423 08:06:47.484683Z 630889 WARN all disconnected - client.cc:130 20240423 08:06:47.484710Z 630889 WARN 12080676864 total bytes read - client.cc:139 20240423 08:06:47.484859Z 630889 WARN 737346 total messages read - client.cc:140 20240423 08:06:47.484863Z 630889 WARN 16384 average message size - client.cc:141 20240423 08:06:47.484912Z 630889 WARN 1152.103125 MiB/s throughput - client.cc:143 Bufsize: 16384 Threads: 1 Sessions: 10000 20240423 08:06:52.490363Z 630949 INFO pid = 630949, tid = 630949 - server.cc:38 20240423 08:06:53.490886Z 630963 INFO pid = 630963, tid = 630963 - client.cc:197 20240423 08:06:53.909448Z 630963 WARN all connected - client.cc:122 20240423 08:07:03.537228Z 630963 WARN stop - client.cc:158 20240423 08:07:03.933908Z 630963 WARN all disconnected - client.cc:130 20240423 08:07:03.934199Z 630963 WARN 10123739136 total bytes read - client.cc:139 20240423 08:07:03.934213Z 630963 WARN 617904 total messages read - client.cc:140 20240423 08:07:03.934215Z 630963 WARN 16384 average message size - client.cc:141 20240423 08:07:03.934220Z 630963 WARN 965.475 MiB/s throughput - client.cc:143
我写的 echo_client 和 echo_server 程序贴在末尾。 运行结果为: echo_server: no process found Bufsize: 16384 Threads: 1 Sessions: 1 Tue Apr 23 16:20:17 2024 client start... all connected stop. all disconnected 4071440384 total bytes read 248501 total messages read 16384 bytes per message 388.283 MiB/s throughout Tue Apr 23 16:20:27 2024 client end...
Bufsize: 16384 Threads: 1 Sessions: 10 Tue Apr 23 16:20:33 2024 client start... all connected stop. all disconnected 15644426240 total bytes read 954860 total messages read 16384 bytes per message 1491.97 MiB/s throughout Tue Apr 23 16:20:43 2024 client end...
Bufsize: 16384 Threads: 1 Sessions: 100 Tue Apr 23 16:20:49 2024 client start... all connected stop. all disconnected 17007493120 total bytes read 1038055 total messages read 16384 bytes per message 1621.96 MiB/s throughout Tue Apr 23 16:20:59 2024 client end...
Bufsize: 16384 Threads: 1 Sessions: 1000 Tue Apr 23 16:21:05 2024 client start... all connected stop. all disconnected 8327217152 total bytes read 508253 total messages read 16384 bytes per message 794.145 MiB/s throughout Tue Apr 23 16:21:15 2024 client end...
Bufsize: 16384 Threads: 1 Sessions: 10000 Tue Apr 23 16:21:21 2024 client start... stop. all disconnected 259922837504 total bytes read 15864431 total messages read 16384 bytes per message 24788.2 MiB/s throughout Tue Apr 23 16:26:12 2024 client end...
测试并发量的时长都是 10s
由上面贴出的muduo的运行结果和我写的测试程序的运行结果可知,当并发量为1000时,使用 co_context 写的测试程序在性能上有下降。当并发量为10000时,Client::handleTimeout函数执行时间太长(上面结果的标粗部分)。
我猜测是 Session::stop() 中 使用 mutex 保护了 co_await,但不确定,希望能够解答。若是 mutex 的原因,我该如何修改程序呢?
另一方面,对于初学协程的我,对协程调度的有疑惑。在 co_context 中:
- 我 co_spawn 了一个新的协程,这个 co_spawn 什么时候被调度呢?是不是在底层,有类似队列的东西?按先进先出的顺序调度。
- 当我 co_await 了一个操作,会暂停当前协程,让出控制权。那当这个操作完成后,会及时的恢复这个协程吗?
我的实现
测试脚本为:
#!/bin/sh
killall echo_server
timeout=${timeout:-10}
bufsize=${bufsize:-16384}
nothreads=1
for nosessions in 1 10 100 1000 10000; do
sleep 5
echo "Bufsize: $bufsize Threads: $nothreads Sessions: $nosessions"
taskset -c 0 ./echo_server 33333 & srvpid=$!
sleep 1
taskset -c 1 ./echo_client 127.0.0.1 33333 $nothreads $bufsize $nosessions $timeout
kill -9 $srvpid
done
// echo_server.cpp
#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
using namespace co_context;
task<> session(int sockfd)
{
co_context::socket sock{sockfd};
constexpr int len = 16384;
char buf[len];
while (true)
{
auto nr = co_await lazy::recv(sockfd, buf);
if (nr <= 0) {
co_await lazy::close(sockfd);
co_return;
}
co_await lazy::send(sockfd, {buf, static_cast<size_t>(nr)});
}
}
task<> server(const uint16_t port)
{
/*
使用 co_contenxt 提供的 helper
为简化工作,不考虑一些错误处理,可以直接使用 co_context 提供的 acceptor 类
*/
inet_address listen_addr{port};
const int listen_sockfd = ::socket(listen_addr.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP);
if (listen_sockfd < 0) {
std::cerr << "socket error: " << strerror(errno) << std::endl;
co_return;
}
int optval = 1;
if (::setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR,
&optval, static_cast<socklen_t>(sizeof(optval))) < 0) {
std::cerr << "setsockopt SO_REUSEADDR error: " << strerror(errno) << std::endl;
co_return;
}
optval = 1;
if (::setsockopt(
listen_sockfd, IPPROTO_TCP, TCP_NODELAY, &optval,
static_cast<socklen_t>(sizeof optval)
) < 0) {
std::cout << "setsockopt TCP_NODELAY error: " << strerror(errno) << std::endl;
co_return;
}
if (::bind(listen_sockfd, listen_addr.get_sockaddr(), listen_addr.length()) != 0) {
std::cerr << "bind error: " << strerror(errno) << std::endl;
co_return;
}
if (::listen(listen_sockfd, SOMAXCONN) != 0) {
std::cerr << "listen error: " << strerror(errno) << std::endl;
co_return;
}
for (; ;) {
int sockfd = co_await lazy::accept(listen_sockfd, nullptr, nullptr, 0);
if (sockfd < 0) {
std::cerr << "accept error: " << strerror(errno) << std::endl;
co_return;
}
co_spawn(session(sockfd));
}
}
/*
./echo_server 33333
*/
int main(int argc, char *argv[])
{
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <port>\n";
return 1;
}
uint16_t port = static_cast<uint16_t>(std::atoi(argv[1]));
io_context ctx;
ctx.co_spawn(server(port));
ctx.start();
ctx.join();
return 0;
}
// echo_client.cpp
#include <co_context/net.hpp>
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <string>
#include <atomic>
#include <vector>
#include <chrono>
#include <mutex>
using Second = int;
class Client;
class Session
{
public:
Session(co_context::io_context *io_context,
Client *client,
const co_context::inet_address &addr) : io_context_(io_context),
client_(client),
serverAddr_(addr),
socket_(::socket(serverAddr_.family(), SOCK_STREAM | SOCK_CLOEXEC, IPPROTO_TCP)),
bytesRead_(0),
byteWritten_(0),
messagesRead_(0)
{
}
co_context::task<> start();
co_context::task<> onMessage();
co_context::task<> stop();
int64_t bytesRead() const { return bytesRead_; }
int64_t messagesRead() const { return messagesRead_; }
private:
co_context::io_context *io_context_;
Client *client_;
co_context::inet_address serverAddr_;
co_context::socket socket_;
std::vector<char> buffers_;
int64_t bytesRead_;
int64_t byteWritten_;
int64_t messagesRead_;
std::mutex mutex_;
};
class Client
{
public:
Client(co_context::io_context *io_context,
const co_context::inet_address &serverAddr,
int blockSize,
int sessionCount,
Second timeout, int threadCount) : io_context_(io_context),
addr_(serverAddr),
blockSize_(blockSize),
sessionCount_(sessionCount),
timeout_(timeout),
numConntected_(0)
{
if (threadCount > 1)
{
// TODO: 多线程
}
/* 生成数据包 */
for (int i = 0; i < blockSize; ++i)
{
message_.push_back(static_cast<char>(i % 128));
}
}
co_context::task<> start()
{
for (int i = 0; i < sessionCount_; ++i)
{
Session *session = new Session(io_context_, this, addr_);
io_context_->co_spawn(session->start());
// co_await session->start();
sessions_.emplace_back(session);
}
co_await handleTimeout();
}
const std::string &message() const { return message_; }
int blockSize() const { return blockSize_; }
void onConnect()
{
// std::cout << numConntected_ << " connected" << std::endl;
if (++numConntected_ == sessionCount_)
{
std::cout << "all connected" << std::endl;
}
}
void onDisconnect()
{
// std::cout << numConntected_ << " disconnected" << std::endl;
if (--numConntected_ == 0)
{
std::cout << "all disconnected" << std::endl;
int64_t totalBytesRead = 0;
int64_t totalMessagesRead = 0;
for (const auto &session : sessions_)
{
totalBytesRead += session->bytesRead();
totalMessagesRead += session->messagesRead();
}
std::cout << totalBytesRead << " total bytes read" << std::endl;
std::cout << totalMessagesRead << " total messages read" << std::endl;
std::cout << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead)
<< " bytes per message" << std::endl;
std::cout << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024)
<< " MiB/s throughout" << std::endl;
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::cout << std::asctime(std::localtime(&now)) << " client end..." << std::endl;
}
}
private:
co_context::task<> handleTimeout()
{
auto deadline = std::chrono::steady_clock::now();
deadline += std::chrono::seconds(timeout_);
co_await co_context::timeout_at(deadline);
std::cout << "stop." << std::endl;
for (auto &session : sessions_)
{
co_await session->stop();
}
}
private:
co_context::io_context *io_context_;
const co_context::inet_address addr_;
int blockSize_;
int sessionCount_;
Second timeout_;
std::atomic<int> numConntected_;
std::string message_;
std::vector<std::unique_ptr<Session>> sessions_;
};
co_context::task<> Session::start()
{
// FIXME: connect error ?
int ret = co_await socket_.connect(serverAddr_);
if (ret < 0)
{
std::cerr << "connect error" << std::endl;
}
socket_.set_tcp_no_delay(true);
client_->onConnect();
io_context_->co_spawn(onMessage());
// FIXME: send error ?
int sn = co_await socket_.send(client_->message());
}
co_context::task<> Session::onMessage()
{
char buf[16384];
while (true)
{
int nr = co_await socket_.recv(buf);
if (nr <= 0)
{
co_return;
}
++messagesRead_;
bytesRead_ += nr;
int ns = co_await socket_.send(buf, static_cast<size_t>(nr));
if (ns <= 0)
{
co_return;
}
byteWritten_ += ns;
}
}
co_context::task<> Session::stop()
{
{
std::unique_lock<std::mutex> locker(mutex_);
co_await socket_.shutdown_write();
}
client_->onDisconnect();
}
/*
./echo_client 127.0.0.1 33333 1 16384 100 10
*/
int main(int argc, char *argv[])
{
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::cout << std::asctime(std::localtime(&now)) << " client start..." << std::endl;
if (argc != 7)
{
fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time>\n");
}
else
{
const char *ip = argv[1];
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
int threadCount = atoi(argv[3]);
int blockSize = atoi(argv[4]);
int sessionCount = atoi(argv[5]);
int timeout = atoi(argv[6]);
co_context::io_context io_ctx;
co_context::inet_address serverAddr(ip, port);
Client client(&io_ctx, serverAddr, blockSize, sessionCount, timeout, threadCount);
io_ctx.co_spawn(client.start());
io_ctx.start();
io_ctx.join();
}
}