stream rpc 接收端已断开连接,但发送端继续操作返回错误码可能与接口说明不符
Describe the bug (描述bug) 假设发送端和接收端的 max_body_size 都是 64M(FLAGS_max_body_size 默认值),发送端通过 stream 发送一个 128M 的消息可以成功,但是接收端接受会失败,并且接收端的 stream 会被 close,而发送端此时会无法发现 stream 已经 close 了,具体表现为:
- 发送端所注册 handler 的 on_closed 回调不会被调用
- 调用 brpc::StreamWrite 不会返回表示 stream 已经 close 的错误码 EINVAL,而是会返回 EAGAIN,这与接口文档说明不符,且文档中提到遇到 EAGAIN 可以调用 brpc::StreamWait 等待接收端消费缓冲区内消息
- 如果继续调用 brpc::StreamWait,在未设置超时时间时会卡死在 brpc::StreamWait 调用;在设置超时时间时,仍然不会返回表示 stream 已经 close 的错误码 EINVAL,而是会等到超时后返回 ETIMEOUT,用户此时如果循环重试 brpc::StreamWait 可能会死循环
To Reproduce (复现方法) 在 brpc_streaming_rpc_unittest.cpp 加上下面的单测重新编译
#include "brpc/protocol.h"
namespace brpc {
DECLARE_uint64(max_body_size);
}
TEST_F(StreamingRpcTest, auto_close_if_receive_too_big_data) {
HandlerControl hc;
OrderedInputHandler handler(&hc);
hc.block = true;
brpc::StreamOptions opt;
opt.handler = &handler;
const int N = 10000;
opt.max_buf_size = sizeof(uint32_t) * N;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
request_stream_options.max_buf_size = sizeof(uint32_t) * N;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
butil::IOBuf out;
out.append(std::string(brpc::FLAGS_max_body_size, '0'));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out));
ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
timespec wait_timeout = butil::seconds_to_timespec(1);
// FIXME: should notice that stream is closed and return EINVAL so we can avoid infinite loop
while(brpc::StreamWait(request_stream, &wait_timeout) == ETIMEDOUT) {
usleep(100);
}
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(0, handler._expected_next_value);
}
运行
cd build; ./test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data
Expected behavior (期望行为) 能够通过上面加上的单测
Versions (各种版本) OS: CentOS7 Compiler: gcc-9.3.1 brpc: master (53f6436a63f4b4c23d22b9e07e5d6d9af8af7fe7) protobuf: 3.6.1
Additional context/screenshots (更多上下文/截图) 运行上面的单测后会一直卡住
[root@orkv-mgr incubator-brpc]# ./build/test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data Running main() from gtest_main.cc Note: Google Test filter = StreamingRpcTest.auto_close_if_receive_too_big_data [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from StreamingRpcTest [ RUN ] StreamingRpcTest.auto_close_if_receive_too_big_data I0804 18:21:34.318899 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1046] Server[MyServiceWithStream] is serving on port=9007. I0804 18:21:34.319361 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1049] Check out http://orkv-mgr:9007 in web browser. I0804 18:21:34.321214 36261 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:78] Created response_stream=8589934595 E0804 18:21:34.409823 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:115] A message from 127.0.0.1:46330(protocol=streaming_rpc) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages W0804 18:21:34.410057 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:250] Close Socket{id=128 fd=11 addr=127.0.0.1:46330:9007} (0x3664000): too big data I0804 18:21:34.413154 36265 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:42] Service side stream: 8589934595 is closed W0804 18:21:34.413194 36306 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:214] Fail to read from Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Connection reset by peer W0804 18:21:34.413346 36272 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:1616] Fail to keep-write into Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Broken pipe I0804 18:21:34.513589 36281 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2202] Checking Socket{id=1 addr=127.0.0.1:9007} (0x33ca200) I0804 18:21:34.513832 36300 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2262] Revived Socket{id=1 addr=127.0.0.1:9007} (0x33ca200) (Connectable)
上面举例的 128M 可能比较极端,但这个问题导致的主要风险在于:发送端一般情况下是不感知接收端的 FLAGS_max_body_size 是多少的,因此理论上发送端发送的任何消息都有可能超出接收端的 FLAGS_max_body_size,而这个后果却需要发送端来承担(因为感知不到连接断开,发送端可能一直在尝试写入)
我重新跑了上面的测试,看上去应该能过了。用 gdb 跟了一下,发现调用了 SendStreamClose。想问一下是哪个 commit 修了吗?
[ RUN ] StreamingRpcTest.auto_close_if_receive_too_big_data
I1025 17:04:21.815020 182870 0 /home/zj/src/brpc/src/brpc/server.cpp:1212] Server[MyServiceWithStream] is serving on port=9007.
I1025 17:04:21.816590 182870 0 /home/zj/src/brpc/src/brpc/server.cpp:1215] Check out http://bang:9007 in web browser.
W1025 17:04:21.816755 182870 0 /home/zj/src/brpc/src/brpc/stream.cpp:83] options.min_buf_size is larger than options.max_buf_size, it will be set to 0.
W1025 17:04:21.818237 182874 4294967298 /home/zj/src/brpc/src/brpc/stream.cpp:83] options.min_buf_size is larger than options.max_buf_size, it will be set to 0.
I1025 17:04:21.818269 182874 4294967298 /home/zj/src/brpc/test/brpc_streaming_rpc_unittest.cpp:61] Created response_stream=8589934595
E1025 17:04:21.880653 182875 8589935105 /home/zj/src/brpc/src/brpc/input_messenger.cpp:157] A message from 127.0.0.1:54338(protocol=streaming_rpc) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages
W1025 17:04:21.880685 182875 8589935105 /home/zj/src/brpc/src/brpc/input_messenger.cpp:253] Close Socket{id=93 fd=10 addr=127.0.0.1:54338:9007} (0x560bee0b6000): too big data
W1025 17:04:21.880899 182879 8589936900 /home/zj/src/brpc/src/brpc/socket.cpp:1826] Fail to keep-write into Socket{id=1 fd=9 addr=127.0.0.1:9007:54338} (0x560bedd722c0): Connection reset by peer
/home/zj/src/brpc/test/brpc_streaming_rpc_unittest.cpp:622: Failure
Value of: handler.failed()
Actual: true
Expected: false
I1025 17:04:21.882303 182870 0 /home/zj/src/brpc/src/brpc/server.cpp:1272] Server[MyServiceWithStream] is going to quit
[ FAILED ] StreamingRpcTest.auto_close_if_receive_too_big_data (76 ms)
[----------] 1 test from StreamingRpcTest (76 ms total)