blog
blog copied to clipboard
动手写RPC框架 - GeeRPC第四天 超时处理(timeout) | 极客兔兔
https://geektutu.com/post/geerpc-day4.html
7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第四天为RPC框架提供了处理超时的能力(timeout processing)。
大佬威武!!!
@wilgx0 哈哈,这个系列才上线,就看到第四天惹,感谢你的不懈支持~
誓死追随大佬
大佬你好,我发现在 day4-timeout 时,执行 go test -v
会有一定概率出现测试卡死无响应的bug。我已经在该项目下提了 issue,你可以看一下。issue#26
handleRequest里新开的协程,在超时的场景下好像会泄露? 没有测试,看着像是
servecodec中调用handlerequest少了个参数。。
还有这种写法: case <-called: <-sent } sent如果堵了怎么办。。
@FinaLone handleRequest里新开的协程,在超时的场景下好像会泄露? 没有测试,看着像是
应该是会泄露的,因为timeout以后,chan阻塞,设置为有缓存的chan应该就可以不阻塞
@liyuxuan89 不太懂为啥会泄露,是需要在case <-time.After(timeout)的最后加上<-called 和<-sent吗
@ls8725 @liyuxuan89 不太懂为啥会泄露,是需要在case <-time.After(timeout)的最后加上<-called 和<-sent吗
超时以后,无缓存chan sent和called没有办法发送,协程被阻塞了,退出不了
请问下在func dialTimeout中用go协程创建client之后协程是怎么回收的,会关闭么,如果不关闭的话不会造成内存浪费么
ch := make(chan clientResult)
go func() {
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
result := <-ch
return result.client, result.err
}
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}
如果 time.After(opt.ConnectTimeout)先到, 后续goroutine中的执行到 ch <- clientResult{client: client, err: err} 由于ch是无缓冲的,会一直阻塞在这里,造成gotoutine泄漏。
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
called := make(chan struct{})
sent := make(chan struct{})
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv)
select{
case called <- struct{}{}:
case:
return
}
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()
if timeout == 0 {
<-called
<-sent
return
}
select {
case <-time.After(timeout):
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}
通过在goroutine加上select解决协程泄漏问题
https://www.cnblogs.com/luoming1224/p/11174927.html 内存泄露的原因是serveCodec方法for循环 go server.handleRequest(cc, req, sending, wg)有select time.After
@1911860538 https://www.cnblogs.com/luoming1224/p/11174927.html 内存泄露的原因是serveCodec方法for循环 go server.handleRequest(cc, req, sending, wg)有select time.After
同学,我觉得在此代码中,time.After是可以接受的,每个请求都需要有自己独立的超时时间。上面的朋友讨论的是ch通道没缓存导致协程无法正常关闭的问题。比如下面的例子:
func main() {
ch := make(chan int)
// 开启协程
go func() {
// 注释掉这行就不会超时,协程就可以正常关闭
time.Sleep(2 * time.Second)
println("end to sleep")
ch <- 1
println("通道赋值成功")
}()
select {
case <-time.After(1 * time.Second):
println("连接超时")
case <-ch:
println("正常完成")
}
// 阻塞main方法不结束
for {
}
}
请问检查创建连接超时的时候,为什么除了net.DialTimeout函数之外,还要检测newClient函数超时呢?net.DialTimeout返回成功的话,newClient函数还会阻塞吗?
@GodXuebi
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) { defer wg.Done() called := make(chan struct{}) sent := make(chan struct{}) go func() { err := req.svc.call(req.mtype, req.argv, req.replyv) select{ case called <- struct{}{}: case: return } if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct{}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct{}{} }() if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
通过在goroutine加上select解决协程泄漏问题
case:
有问题吧
@gnehcein servecodec中调用handlerequest少了个参数。。
serveCodec应该再加个参数,传入客户端协商好的opt,以确定服务端处理超时的时间
defer wg.Done()
called := make(chan struct{})
sent := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func(ctx context.Context) {
err := req.svc.call(req.mtype, req.argv, req.replyv)
called <- struct{}{}
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
select {
case <-ctx.Done():
return
}
}(ctx)
if timeout == 0 {
<-called
<-sent
return
}
select {
case <-time.After(timeout):
//如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called:
<-sent
}
}
再加上一个context以告知子协程退出
@liujing-siyang
defer wg.Done() called := make(chan struct{}) sent := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func(ctx context.Context) { err := req.svc.call(req.mtype, req.argv, req.replyv) called <- struct{}{} if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct{}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct{}{} select { case <-ctx.Done(): return } }(ctx) if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): //如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露 req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
再加上一个context以告知子协程退出
如果在 called <- struct{}{}
那边阻塞了,下面的 select ctx 还有用吗?
// Client.dialTimeout()
go func() {
client, err := f(conn, opt)
ch <- clientResult{
client: client,
err: err,
}
}()
如果超时返回,这里的 goroutine 也是会泄漏的
@GodXuebi
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) { defer wg.Done() called := make(chan struct{}) sent := make(chan struct{}) go func() { err := req.svc.call(req.mtype, req.argv, req.replyv) select{ case called <- struct{}{}: case: return } if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct{}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct{}{} }() if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
通过在goroutine加上select解决协程泄漏问题
感觉这样还是存在问题,如果goroutine中的select case语句先于下方的select语句执行,语句在called通道接收准备完成之前尝试写入数据,那就会直接return结束,并不应该猜测两者的先后执行顺序
@PeiLeizzz
@liujing-siyang
defer wg.Done() called := make(chan struct{}) sent := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func(ctx context.Context) { err := req.svc.call(req.mtype, req.argv, req.replyv) called <- struct{}{} if err != nil { req.h.Error = err.Error() server.sendResponse(cc, req.h, invalidRequest, sending) sent <- struct{}{} return } server.sendResponse(cc, req.h, req.replyv.Interface(), sending) sent <- struct{}{} select { case <-ctx.Done(): return } }(ctx) if timeout == 0 { <-called <-sent return } select { case <-time.After(timeout): //如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露 req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout) server.sendResponse(cc, req.h, invalidRequest, sending) case <-called: <-sent } }
再加上一个context以告知子协程退出
如果在
called <- struct{}{}
那边阻塞了,下面的 select ctx 还有用吗?
我感觉也是,会一直阻塞在called <- struct{}{}
这里而不会继续执行
var finish = make(chan struct{})
defer close(finish)
go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv)
select {
case <-finish:
close(called)
close(sent)
return
case called <- struct{}{}:
if err != nil {
req.h.Error = err.Error()
server.sendResponse(c, req.h, invalidRequest, lock)
sent <- struct{}{}
return
}
server.sendResponse(c, req.h, req.replyv.Interface(), lock)
sent <- struct{}{}
}
}()
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// error occurs, so terminateCalls pending calls
client.terminateCalls(err)
}
正常sendResponse确实应该保证只调用一次,但客户端在接受时对于已经响应的的会removeCall,那么下次同一个seq的响应call为nil,无法调用call.done(),也就不会阻塞,所以好像也没什么影响
@ShiMaRing
var finish = make(chan struct{}) defer close(finish) go func() { err := req.svc.call(req.mtype, req.argv, req.replyv) select { case <-finish: close(called) close(sent) return case called <- struct{}{}: if err != nil { req.h.Error = err.Error() server.sendResponse(c, req.h, invalidRequest, lock) sent <- struct{}{} return } server.sendResponse(c, req.h, req.replyv.Interface(), lock) sent <- struct{}{} } }()
目前看来这个是对的 原先代码server和client都存在goroutine泄露的问题 client如下
finish := make(chan struct{})
defer close(finish)
go func() {
client, err := f(conn, opt)
select {
case <-finish:
close(ch)
return
case ch <- clientResult{client: client, err: err}:
}
}()
大佬们对于第二个测试用例的场景2,我把测试函数TestClient_Call改名为TestClientCall,然后场景2就过不了测试了,进程就直接退出了,有人有这种情况吗
type clientResult struct {
client *Client
err error
}
type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)
func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
return nil, err
}
// close the connection if client is nil
defer func() {
if err != nil {
_ = conn.Close()
}
}()
ch := make(chan clientResult)
go func() {
client, err := f(conn, opt)
ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
result := <-ch
return result.client, result.err
}
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
return result.client, result.err
}
}
想问一下这里为什么一定要把client包装成一个clientResult结构体呢,直接用*Client类型的channel不可以吗
ch := make(chan *Client)
...
case client = <-ch:
return client, err
是否可以改进一下 handleRequest
func (s *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()
// 教程的写法有问题,会导致 goroutine 泄露
// 这里改用 sync.Once 控制 s.sendResponse 只被调用一次
done := make(chan struct{})
var sendOnce sync.Once
go func() {
var errStr string
var body interface{}
defer func() {
sendOnce.Do(func() {
req.h.Error = errStr
s.sendResponse(cc, req.h, body, sending)
})
close(done)
}()
err := req.svc.call(req.mtype, req.argv, req.replyv)
if err != nil {
errStr = err.Error()
body = invalidRequest
return
}
body = req.replyv.Interface()
}()
if timeout == 0 {
<-done
return
}
select {
case <-done:
return
case <-time.After(timeout):
sendOnce.Do(func() {
req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
s.sendResponse(cc, req.h, invalidRequest, sending)
})
}
}
@wbjy 请问下在func dialTimeout中用go协程创建client之后协程是怎么回收的,会关闭么,如果不关闭的话不会造成内存浪费么
goroutine不会自动关闭,我所了解的做法有通过chan或者context来控制goroutine的生命周期