blog icon indicating copy to clipboard operation
blog copied to clipboard

动手写RPC框架 - GeeRPC第二天 支持并发与异步的客户端 | 极客兔兔

Open geektutu opened this issue 3 years ago • 58 comments

https://geektutu.com/post/geerpc-day2.html

7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第二天实现了一个支持异步(asynchronous)和并发(concurrent)的客户端。

geektutu avatar Oct 08 '20 02:10 geektutu

// client.go
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
	if done == nil {
		done = make(chan *Call, 10)
	} else if cap(done) == 0 {
		log.Panic("rpc client: done channel is unbuffered")
	}

done = make(chan *Call, 10) 官方库这里对chan的处理感觉很奇怪呀,为啥要buffered大小是10个,会有这种场景么

furthergo avatar Oct 10 '20 12:10 furthergo

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

geektutu avatar Oct 10 '20 12:10 geektutu

client.GO这个异步接口该怎么调用呢

yangchen97 avatar Oct 22 '20 03:10 yangchen97

@yangchen97

Go 返回了 call,call.Done 是一个信道(chan),没有什么特别的地方,按照普通信道来处理就好了。比如直接阻塞就是同步调用,类似于 Call 里的做法,如果不想阻塞,新启动一个协程等待结果,其他函数继续往下执行。

比如:

call := client.Go( ... )
# 新启动协程,异步等待
go func(call *Call) {
	select {
		<-call.Done:
			# do something
		<-otherChan:
			# do something
	}
}(call)

otherFunc() # 不阻塞,继续执行其他函数。

geektutu avatar Oct 22 '20 03:10 geektutu

@geektutu 明白了。还想再问一个问题, client.Go() 函数里的 client.send() ,是否应该为 go client.send() ?我认为返回 call 不需要等待 client.send() 执行完。

yangchen97 avatar Oct 22 '20 07:10 yangchen97

@yangchen97

我觉得你的理解是对的,我看了下,确实没有等待 send() 完成的必要。我给 golang 标准库提了一个 PR,看看官方的解释是什么样的~

geektutu avatar Oct 24 '20 06:10 geektutu

@yangchen97

我觉得你的理解是对的,我看了下,确实没有等待 send() 完成的必要。我给 golang 标准库提了一个 PR,看看官方的解释是什么样的~

go的网络IO本身就是异步的,加上这个go 提前返回也没必要吧。如果真想这么做, go func() { call := client.Go(...) } () ?

qingyunha avatar Oct 25 '20 07:10 qingyunha

@yangchen97 @qingyunha 在这个地方,网络IO是同步等待的,不是异步的。请求发送成功,send() 才会返回。不过 golang 的回复是 net/rpc 已经冻结了,不再接受新的特性了。但是就这个点而言,如果网络情况不太好的情况下,发送请求耗时很长,确实对性能会有一定的影响。

Rob Pike Patchset 1 15:54 As the documentation says, The net/rpc package is frozen and is not accepting new features.

异步请求确实有很多种其他的方式,但是 client.Go 的好处在于参数 done chan *Call 可以自定义缓冲区的大小,可以给多个 client.Go 传入同一个 chan 对象,从而控制异步请求并发的数量。其他方式就需要自己控制和实现了。

geektutu avatar Oct 25 '20 08:10 geektutu

client.Dial方法中的defer不是很明白,此处的执行顺序是1.NewClient 2.defer 3.return 这个顺序吗

andcarefree avatar Dec 11 '20 15:12 andcarefree

@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。

你可以写个简单的函数验证下:

func test() (ans int) {
	defer func() {
		fmt.Println(ans)
	}()
	return 10
}

func main() {
	test()
}

输出是 10。

geektutu avatar Dec 11 '20 15:12 geektutu

@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。

你可以写个简单的函数验证下:

func test() (ans int) {
	defer func() {
		fmt.Println(ans)
	}()
	return 10
}

func main() {
	test()
}

输出是 10。

多谢解惑

andcarefree avatar Dec 11 '20 16:12 andcarefree

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。

chinawilon avatar Feb 24 '21 11:02 chinawilon

为什么方法的返回值(仅单个返回值)不需要的时候要用_抛弃,而不是直接调用不取返回值呢

chenshuidejidan avatar Mar 10 '21 09:03 chenshuidejidan

seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return }

请问这个seq不是当前call的seq加1吗 这样的话 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) 这里remove的seq不应该是seq-1吗

echo-li1024 avatar Mar 10 '21 13:03 echo-li1024

type Call struct {
	Seq           uint64
	ServiceMethod string      // format "<service>.<method>"
	Args          interface{} // arguments to the function
	Reply         interface{} // reply from the function
	Error         error       // if error occurs, it will be set
	Done          chan *Call  // Strobes when call is complete.
}

type Header struct {
	ServiceMethod string 
	Seq           uint64 
	Error         string 

想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢

JesseStutler avatar Mar 12 '21 09:03 JesseStutler

我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

IAOTW avatar Mar 14 '21 08:03 IAOTW

唉 决定了 下个月就转行卖烧烤去了 再会了 同志们

wilgx0 avatar Mar 16 '21 08:03 wilgx0

@yudidi 请问五个条件的出处是哪里呢?

不要纠结了 一起卖烧烤去

wilgx0 avatar Mar 17 '21 04:03 wilgx0

@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

因为每个请求的ServiceMethod和Seq是不一样的,一方面服务端需要根据ServiceMethod来调用相应的方法 另一面 客户端中的func (client *Client) receive() 接收功能 需要根据Seq来判断服务器端的响应消息

wilgx0 avatar Mar 17 '21 09:03 wilgx0

//registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq
func (c *Client) registerCall(call *Call) (uint64, error) {
        
	// todo client前面被锁住了,这里为什么还需要再锁一次呢?
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.closing || c.shutdown {
		return 0, ErrShutDown
	}
	call.Seq = c.seq
	c.pending[call.Seq] = call
	c.seq++
	return call.Seq, nil
}

调用registerCall时,client刚被锁住,在registerCall中再锁一次是否多余?

junxxie avatar Mar 28 '21 13:03 junxxie

func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }

请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。

receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。

我的理解是client向远端发送报文的时候才需要加sending锁

GodXuebi avatar May 11 '21 08:05 GodXuebi

@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀

我觉得client.header字段的目的是减少内存申请。一般来说,每次调用cc.write都需要新建一个Header类型的临时变量。尽管客户端是多协程的,但sending互斥量确保了每次只有一个协程可以调用cc.write方法,因此可以直接使用client.header字段。这也就是sending注释中protect following的含义。

shengxiang19 avatar May 27 '21 14:05 shengxiang19

@JesseStutler

type Call struct {
	Seq           uint64
	ServiceMethod string      // format "<service>.<method>"
	Args          interface{} // arguments to the function
	Reply         interface{} // reply from the function
	Error         error       // if error occurs, it will be set
	Done          chan *Call  // Strobes when call is complete.
}

type Header struct {
	ServiceMethod string 
	Seq           uint64 
	Error         string 

想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢

我对于这里的理解是, 确实存在公共字段, 不重用 Header 理由不是代码原因, 而是语义原因, 因为 Header 的存在是为了通讯, 而 Call 本身不涉及通讯, 他只是客户端发起的调用, 客户端理论上来说是不知道底层是如何调用的, 所以这里没有使用 Header , 而且使用了 Header 的话, 在初始化 Call 的时候, 代码也会复杂一点.

gu18168 avatar Jun 04 '21 08:06 gu18168

@chinawilon

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。 基于同一个chan的话就需要客户端传入chan,客户端如果传入了chan,就不会走初始化为10的逻辑了,所以我觉得你这个逻辑不太成立

cuglaiyp avatar Jun 21 '21 11:06 cuglaiyp

		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}

有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊

nanfeng1999 avatar Jul 02 '21 04:07 nanfeng1999

@yzy-github

		call := client.removeCall(h.Seq)
		switch {
		case call == nil:
			// it usually means that Write partially failed
			// and call was already removed.
			err = client.cc.ReadBody(nil)
		case h.Error != "":
			call.Error = fmt.Errorf(h.Error)
			err = client.cc.ReadBody(nil)
			call.done()
		default:
			err = client.cc.ReadBody(call.Reply)
			if err != nil {
				call.Error = errors.New("reading body " + err.Error())
			}
			call.done()
		}

有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊

这里只是从服务器返回的信息当中拿到第一个被处理的call 解码后call.done是为了通知调用者这个call已经完成了

walkmiao avatar Jul 06 '21 11:07 walkmiao

@chinawilon

@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。

如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。

@GodXuebi func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }

请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。

receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。

我的理解是client向远端发送报文的时候才需要加sending锁

这里获取sending锁可以防止新的Call被注册到pending map中。

Jaime1129 avatar Aug 24 '21 13:08 Jaime1129

var _ io.Closer = (*Client)(nil) 你好,这段代码是什么意思呢?在网上也没找到太多的信息。

zyb284629791 avatar Sep 03 '21 14:09 zyb284629791

另外,client中的mu锁和sending为什么要分开,也没有太搞懂。我理解大多数时候都是用mu锁是因为client可以复用,为了避免相互干扰。但在send的时候使用sending锁,如果这时候有其他协程调用了close之类的方法又怎么处理呢?

zyb284629791 avatar Sep 03 '21 14:09 zyb284629791

var _ io.Closer = (*Client)(nil) 你好,这段代码是什么意思呢?在网上也没找到太多的信息。

断言client实现了closer接口,go里面这个写法很常见

andcarefree avatar Sep 03 '21 15:09 andcarefree