blog icon indicating copy to clipboard operation
blog copied to clipboard

动手写RPC框架 - GeeRPC第四天 超时处理(timeout) | 极客兔兔

Open geektutu opened this issue 3 years ago • 33 comments

https://geektutu.com/post/geerpc-day4.html

7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第四天为RPC框架提供了处理超时的能力(timeout processing)。

geektutu avatar Oct 08 '20 02:10 geektutu

大佬威武!!!

wilgx0 avatar Oct 09 '20 06:10 wilgx0

@wilgx0 哈哈,这个系列才上线,就看到第四天惹,感谢你的不懈支持~

geektutu avatar Oct 09 '20 07:10 geektutu

誓死追随大佬

wilgx0 avatar Oct 09 '20 08:10 wilgx0

大佬你好,我发现在 day4-timeout 时,执行 go test -v 会有一定概率出现测试卡死无响应的bug。我已经在该项目下提了 issue,你可以看一下。issue#26

Phoenix500526 avatar Dec 07 '20 04:12 Phoenix500526

handleRequest里新开的协程,在超时的场景下好像会泄露? 没有测试,看着像是

FinaLone avatar Jan 27 '21 14:01 FinaLone

servecodec中调用handlerequest少了个参数。。

gnehcein avatar Apr 09 '21 06:04 gnehcein

还有这种写法: case <-called: <-sent } sent如果堵了怎么办。。

gnehcein avatar Apr 09 '21 06:04 gnehcein

@FinaLone handleRequest里新开的协程,在超时的场景下好像会泄露? 没有测试,看着像是

应该是会泄露的,因为timeout以后,chan阻塞,设置为有缓存的chan应该就可以不阻塞

liyuxuan89 avatar Apr 17 '21 06:04 liyuxuan89

@liyuxuan89 不太懂为啥会泄露,是需要在case <-time.After(timeout)的最后加上<-called 和<-sent吗

IAOTW avatar Apr 18 '21 14:04 IAOTW

@ls8725 @liyuxuan89 不太懂为啥会泄露,是需要在case <-time.After(timeout)的最后加上<-called 和<-sent吗

超时以后,无缓存chan sent和called没有办法发送,协程被阻塞了,退出不了

liyuxuan89 avatar Apr 20 '21 10:04 liyuxuan89

请问下在func dialTimeout中用go协程创建client之后协程是怎么回收的,会关闭么,如果不关闭的话不会造成内存浪费么

wbjy avatar May 23 '21 03:05 wbjy

ch := make(chan clientResult)
go func() {
	client, err := f(conn, opt)
	ch <- clientResult{client: client, err: err}
}()
if opt.ConnectTimeout == 0 {
	result := <-ch
	return result.client, result.err
}
select {
case <-time.After(opt.ConnectTimeout):
	return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
case result := <-ch:
	return result.client, result.err
}

如果 time.After(opt.ConnectTimeout)先到, 后续goroutine中的执行到 ch <- clientResult{client: client, err: err} 由于ch是无缓冲的,会一直阻塞在这里,造成gotoutine泄漏。

valiner avatar Jun 26 '21 08:06 valiner

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	go func() {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
        select{
        case called <- struct{}{}:
        case:
           return
        }
		
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
	}()

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

通过在goroutine加上select解决协程泄漏问题

GodXuebi avatar Jul 29 '21 15:07 GodXuebi

https://www.cnblogs.com/luoming1224/p/11174927.html 内存泄露的原因是serveCodec方法for循环 go server.handleRequest(cc, req, sending, wg)有select time.After

1911860538 avatar Sep 01 '21 09:09 1911860538

@1911860538 https://www.cnblogs.com/luoming1224/p/11174927.html 内存泄露的原因是serveCodec方法for循环 go server.handleRequest(cc, req, sending, wg)有select time.After

同学,我觉得在此代码中,time.After是可以接受的,每个请求都需要有自己独立的超时时间。上面的朋友讨论的是ch通道没缓存导致协程无法正常关闭的问题。比如下面的例子:

func main() {
	ch := make(chan int)
	// 开启协程
	go func() {
		// 注释掉这行就不会超时,协程就可以正常关闭
		time.Sleep(2 * time.Second)
		println("end to sleep")
		ch <- 1
		println("通道赋值成功")
	}()

	select {
	case <-time.After(1 * time.Second):
		println("连接超时")
	case <-ch:
		println("正常完成")
	}
	// 阻塞main方法不结束
	for {

	}
}

xiaoguyu avatar Oct 06 '21 06:10 xiaoguyu

请问检查创建连接超时的时候,为什么除了net.DialTimeout函数之外,还要检测newClient函数超时呢?net.DialTimeout返回成功的话,newClient函数还会阻塞吗?

royal-ai avatar Nov 16 '21 08:11 royal-ai

@GodXuebi

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	go func() {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
        select{
        case called <- struct{}{}:
        case:
           return
        }
		
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
	}()

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

通过在goroutine加上select解决协程泄漏问题

case:有问题吧

cuglaiyp avatar Dec 31 '21 06:12 cuglaiyp

@gnehcein servecodec中调用handlerequest少了个参数。。

serveCodec应该再加个参数,传入客户端协商好的opt,以确定服务端处理超时的时间

liujing-siyang avatar Jan 28 '22 06:01 liujing-siyang

	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func(ctx context.Context) {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		called <- struct{}{}
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
		select {
		case <-ctx.Done():
			return
		}
	}(ctx)

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		//如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

再加上一个context以告知子协程退出

liujing-siyang avatar Jan 28 '22 06:01 liujing-siyang

@liujing-siyang

	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func(ctx context.Context) {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		called <- struct{}{}
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
		select {
		case <-ctx.Done():
			return
		}
	}(ctx)

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		//如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

再加上一个context以告知子协程退出

如果在 called <- struct{}{} 那边阻塞了,下面的 select ctx 还有用吗?

PeiLeizzz avatar May 05 '22 07:05 PeiLeizzz

// Client.dialTimeout()
go func() {
	client, err := f(conn, opt)
	ch <- clientResult{
		client: client,
		err:    err,
	}
}()

如果超时返回,这里的 goroutine 也是会泄漏的

PeiLeizzz avatar May 05 '22 08:05 PeiLeizzz

@GodXuebi

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	go func() {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
        select{
        case called <- struct{}{}:
        case:
           return
        }
		
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
	}()

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

通过在goroutine加上select解决协程泄漏问题

感觉这样还是存在问题,如果goroutine中的select case语句先于下方的select语句执行,语句在called通道接收准备完成之前尝试写入数据,那就会直接return结束,并不应该猜测两者的先后执行顺序

ShiMaRing avatar Jul 31 '22 07:07 ShiMaRing

@PeiLeizzz

@liujing-siyang

	defer wg.Done()
	called := make(chan struct{})
	sent := make(chan struct{})
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func(ctx context.Context) {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		called <- struct{}{}
		if err != nil {
			req.h.Error = err.Error()
			server.sendResponse(cc, req.h, invalidRequest, sending)
			sent <- struct{}{}
			return
		}
		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
		sent <- struct{}{}
		select {
		case <-ctx.Done():
			return
		}
	}(ctx)

	if timeout == 0 {
		<-called
		<-sent
		return
	}
	select {
	case <-time.After(timeout):
		//如果在timeout后call才调用结束,但已经超时,直接返回,将不会接受called,存在goroutines泄露
		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
		server.sendResponse(cc, req.h, invalidRequest, sending)
	case <-called:
		<-sent
	}
}

再加上一个context以告知子协程退出

如果在 called <- struct{}{} 那边阻塞了,下面的 select ctx 还有用吗?

我感觉也是,会一直阻塞在called <- struct{}{} 这里而不会继续执行

ShiMaRing avatar Jul 31 '22 07:07 ShiMaRing

    var finish = make(chan struct{})
	defer close(finish)
	
	go func() {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		select {
		case <-finish:
			close(called)
			close(sent)
			return
		case called <- struct{}{}:
			if err != nil {
				req.h.Error = err.Error()
				server.sendResponse(c, req.h, invalidRequest, lock)
				sent <- struct{}{}
				return
			}
			server.sendResponse(c, req.h, req.replyv.Interface(), lock)
			sent <- struct{}{}
		}
	}()

ShiMaRing avatar Jul 31 '22 07:07 ShiMaRing

func (client *Client) receive() {
	var err error
	for err == nil {
		var h codec.Header
		if err = client.cc.ReadHeader(&h); err != nil {
			break
		}
		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}
	}
	// error occurs, so terminateCalls pending calls
	client.terminateCalls(err)
}

正常sendResponse确实应该保证只调用一次,但客户端在接受时对于已经响应的的会removeCall,那么下次同一个seq的响应call为nil,无法调用call.done(),也就不会阻塞,所以好像也没什么影响

liujing-siyang avatar Aug 17 '22 09:08 liujing-siyang

@ShiMaRing

    var finish = make(chan struct{})
	defer close(finish)
	
	go func() {
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		select {
		case <-finish:
			close(called)
			close(sent)
			return
		case called <- struct{}{}:
			if err != nil {
				req.h.Error = err.Error()
				server.sendResponse(c, req.h, invalidRequest, lock)
				sent <- struct{}{}
				return
			}
			server.sendResponse(c, req.h, req.replyv.Interface(), lock)
			sent <- struct{}{}
		}
	}()

目前看来这个是对的 原先代码server和client都存在goroutine泄露的问题 client如下

finish := make(chan struct{})
defer close(finish)
go func() {
	client, err := f(conn, opt)
	select {
	case <-finish:
		close(ch)
		return
	case ch <- clientResult{client: client, err: err}:
	}
}()

ruokeqx avatar Aug 21 '22 10:08 ruokeqx

大佬们对于第二个测试用例的场景2,我把测试函数TestClient_Call改名为TestClientCall,然后场景2就过不了测试了,进程就直接退出了,有人有这种情况吗

CSWYF3634076 avatar Oct 24 '22 09:10 CSWYF3634076

type clientResult struct {
	client *Client
	err    error
}

type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)

func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
	opt, err := parseOptions(opts...)
	if err != nil {
		return nil, err
	}
	conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
	if err != nil {
		return nil, err
	}
	// close the connection if client is nil
	defer func() {
		if err != nil {
			_ = conn.Close()
		}
	}()
	ch := make(chan clientResult)
	go func() {
		client, err := f(conn, opt)
		ch <- clientResult{client: client, err: err}
	}()
	if opt.ConnectTimeout == 0 {
		result := <-ch
		return result.client, result.err
	}
	select {
	case <-time.After(opt.ConnectTimeout):
		return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
	case result := <-ch:
		return result.client, result.err
	}
}

想问一下这里为什么一定要把client包装成一个clientResult结构体呢,直接用*Client类型的channel不可以吗

ch := make(chan *Client)
...
case client = <-ch:
	return client, err

Yuuxi777 avatar Jan 09 '23 16:01 Yuuxi777

是否可以改进一下 handleRequest

func (s *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
	defer wg.Done()
	// 教程的写法有问题,会导致 goroutine 泄露
	// 这里改用 sync.Once 控制 s.sendResponse 只被调用一次
	done := make(chan struct{})
	var sendOnce sync.Once

	go func() {
		var errStr string
		var body interface{}
		defer func() {
			sendOnce.Do(func() {
				req.h.Error = errStr
				s.sendResponse(cc, req.h, body, sending)
			})
			close(done)
		}()
		err := req.svc.call(req.mtype, req.argv, req.replyv)
		if err != nil {
			errStr = err.Error()
			body = invalidRequest
			return
		}
		body = req.replyv.Interface()
	}()

	if timeout == 0 {
		<-done
		return
	}
	select {
	case <-done:
		return
	case <-time.After(timeout):
		sendOnce.Do(func() {
			req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
			s.sendResponse(cc, req.h, invalidRequest, sending)
		})
	}
}

bwzdxl avatar Mar 06 '23 18:03 bwzdxl

@wbjy 请问下在func dialTimeout中用go协程创建client之后协程是怎么回收的,会关闭么,如果不关闭的话不会造成内存浪费么

goroutine不会自动关闭,我所了解的做法有通过chan或者context来控制goroutine的生命周期

NOS-AE avatar Mar 20 '23 12:03 NOS-AE