brpc icon indicating copy to clipboard operation
brpc copied to clipboard

stream rpc 接收端已断开连接,但发送端继续操作返回错误码可能与接口说明不符

Open iamwuyunfeng opened this issue 3 years ago • 1 comments

Describe the bug (描述bug) 假设发送端和接收端的 max_body_size 都是 64M(FLAGS_max_body_size 默认值),发送端通过 stream 发送一个 128M 的消息可以成功,但是接收端接受会失败,并且接收端的 stream 会被 close,而发送端此时会无法发现 stream 已经 close 了,具体表现为:

  1. 发送端所注册 handler 的 on_closed 回调不会被调用
  2. 调用 brpc::StreamWrite 不会返回表示 stream 已经 close 的错误码 EINVAL,而是会返回 EAGAIN,这与接口文档说明不符,且文档中提到遇到 EAGAIN 可以调用 brpc::StreamWait 等待接收端消费缓冲区内消息
  3. 如果继续调用 brpc::StreamWait,在未设置超时时间时会卡死在 brpc::StreamWait 调用;在设置超时时间时,仍然不会返回表示 stream 已经 close 的错误码 EINVAL,而是会等到超时后返回 ETIMEOUT,用户此时如果循环重试 brpc::StreamWait 可能会死循环

To Reproduce (复现方法) 在 brpc_streaming_rpc_unittest.cpp 加上下面的单测重新编译

#include "brpc/protocol.h"

namespace brpc {
    DECLARE_uint64(max_body_size); 
}

TEST_F(StreamingRpcTest, auto_close_if_receive_too_big_data) {
    HandlerControl hc;
    OrderedInputHandler handler(&hc);
    hc.block = true;
    brpc::StreamOptions opt;
    opt.handler = &handler;
    const int N = 10000;
    opt.max_buf_size = sizeof(uint32_t) *  N;
    brpc::Server server;
    MyServiceWithStream service(opt);
    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
    ASSERT_EQ(0, server.Start(9007, NULL));
    brpc::Channel channel;
    ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
    brpc::Controller cntl;
    brpc::StreamId request_stream;
    brpc::StreamOptions request_stream_options;
    request_stream_options.max_buf_size = sizeof(uint32_t) * N;
    ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
    brpc::ScopedStream stream_guard(request_stream);
    test::EchoService_Stub stub(&channel);
    stub.Echo(&cntl, &request, &response, NULL);
    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
    butil::IOBuf out;
    out.append(std::string(brpc::FLAGS_max_body_size, '0'));
    ASSERT_EQ(0, brpc::StreamWrite(request_stream, out));
    ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
    timespec wait_timeout = butil::seconds_to_timespec(1);
    // FIXME: should notice that stream is closed and return EINVAL so we can avoid infinite loop
    while(brpc::StreamWait(request_stream, &wait_timeout) == ETIMEDOUT) {
      usleep(100);
    }
    while (!handler.stopped()) {
        usleep(100);
    }
    ASSERT_FALSE(handler.failed());
    ASSERT_EQ(0, handler.idle_times());
    ASSERT_EQ(0, handler._expected_next_value);
}

运行

cd build; ./test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data

Expected behavior (期望行为) 能够通过上面加上的单测

Versions (各种版本) OS: CentOS7 Compiler: gcc-9.3.1 brpc: master (53f6436a63f4b4c23d22b9e07e5d6d9af8af7fe7) protobuf: 3.6.1

Additional context/screenshots (更多上下文/截图) 运行上面的单测后会一直卡住

[root@orkv-mgr incubator-brpc]# ./build/test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data Running main() from gtest_main.cc Note: Google Test filter = StreamingRpcTest.auto_close_if_receive_too_big_data [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from StreamingRpcTest [ RUN ] StreamingRpcTest.auto_close_if_receive_too_big_data I0804 18:21:34.318899 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1046] Server[MyServiceWithStream] is serving on port=9007. I0804 18:21:34.319361 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1049] Check out http://orkv-mgr:9007 in web browser. I0804 18:21:34.321214 36261 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:78] Created response_stream=8589934595 E0804 18:21:34.409823 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:115] A message from 127.0.0.1:46330(protocol=streaming_rpc) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages W0804 18:21:34.410057 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:250] Close Socket{id=128 fd=11 addr=127.0.0.1:46330:9007} (0x3664000): too big data I0804 18:21:34.413154 36265 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:42] Service side stream: 8589934595 is closed W0804 18:21:34.413194 36306 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:214] Fail to read from Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Connection reset by peer W0804 18:21:34.413346 36272 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:1616] Fail to keep-write into Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Broken pipe I0804 18:21:34.513589 36281 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2202] Checking Socket{id=1 addr=127.0.0.1:9007} (0x33ca200) I0804 18:21:34.513832 36300 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2262] Revived Socket{id=1 addr=127.0.0.1:9007} (0x33ca200) (Connectable)

上面举例的 128M 可能比较极端,但这个问题导致的主要风险在于:发送端一般情况下是不感知接收端的 FLAGS_max_body_size 是多少的,因此理论上发送端发送的任何消息都有可能超出接收端的 FLAGS_max_body_size,而这个后果却需要发送端来承担(因为感知不到连接断开,发送端可能一直在尝试写入)

iamwuyunfeng avatar Aug 04 '22 10:08 iamwuyunfeng

我重新跑了上面的测试,看上去应该能过了。用 gdb 跟了一下,发现调用了 SendStreamClose。想问一下是哪个 commit 修了吗?

[ RUN      ] StreamingRpcTest.auto_close_if_receive_too_big_data
I1025 17:04:21.815020 182870     0 /home/zj/src/brpc/src/brpc/server.cpp:1212] Server[MyServiceWithStream] is serving on port=9007.
I1025 17:04:21.816590 182870     0 /home/zj/src/brpc/src/brpc/server.cpp:1215] Check out http://bang:9007 in web browser.
W1025 17:04:21.816755 182870     0 /home/zj/src/brpc/src/brpc/stream.cpp:83] options.min_buf_size is larger than options.max_buf_size, it will be set to 0.
W1025 17:04:21.818237 182874 4294967298 /home/zj/src/brpc/src/brpc/stream.cpp:83] options.min_buf_size is larger than options.max_buf_size, it will be set to 0.
I1025 17:04:21.818269 182874 4294967298 /home/zj/src/brpc/test/brpc_streaming_rpc_unittest.cpp:61] Created response_stream=8589934595
E1025 17:04:21.880653 182875 8589935105 /home/zj/src/brpc/src/brpc/input_messenger.cpp:157] A message from 127.0.0.1:54338(protocol=streaming_rpc) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages
W1025 17:04:21.880685 182875 8589935105 /home/zj/src/brpc/src/brpc/input_messenger.cpp:253] Close Socket{id=93 fd=10 addr=127.0.0.1:54338:9007} (0x560bee0b6000): too big data
W1025 17:04:21.880899 182879 8589936900 /home/zj/src/brpc/src/brpc/socket.cpp:1826] Fail to keep-write into Socket{id=1 fd=9 addr=127.0.0.1:9007:54338} (0x560bedd722c0): Connection reset by peer
/home/zj/src/brpc/test/brpc_streaming_rpc_unittest.cpp:622: Failure
Value of: handler.failed()
  Actual: true
Expected: false
I1025 17:04:21.882303 182870     0 /home/zj/src/brpc/src/brpc/server.cpp:1272] Server[MyServiceWithStream] is going to quit
[  FAILED  ] StreamingRpcTest.auto_close_if_receive_too_big_data (76 ms)
[----------] 1 test from StreamingRpcTest (76 ms total)

page4 avatar Oct 25 '24 09:10 page4