blog
blog copied to clipboard
动手写RPC框架 - GeeRPC第二天 支持并发与异步的客户端 | 极客兔兔
https://geektutu.com/post/geerpc-day2.html
7天用 Go语言/golang 从零实现 RPC 框架 GeeRPC 教程(7 days implement golang remote procedure call framework from scratch tutorial),动手写 RPC 框架,参照 golang 标准库 net/rpc 的实现,实现了服务端(server)、支持异步和并发的客户端(client)、消息编码与解码(message encoding and decoding)、服务注册(service register)、支持 TCP/Unix/HTTP 等多种传输协议。第二天实现了一个支持异步(asynchronous)和并发(concurrent)的客户端。
// client.go
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
done = make(chan *Call, 10)
官方库这里对chan的处理感觉很奇怪呀,为啥要buffered大小是10个,会有这种场景么
@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。
client.GO
这个异步接口该怎么调用呢
@yangchen97
Go
返回了 call,call.Done 是一个信道(chan),没有什么特别的地方,按照普通信道来处理就好了。比如直接阻塞就是同步调用,类似于 Call 里的做法,如果不想阻塞,新启动一个协程等待结果,其他函数继续往下执行。
比如:
call := client.Go( ... )
# 新启动协程,异步等待
go func(call *Call) {
select {
<-call.Done:
# do something
<-otherChan:
# do something
}
}(call)
otherFunc() # 不阻塞,继续执行其他函数。
@geektutu
明白了。还想再问一个问题, client.Go()
函数里的 client.send()
,是否应该为 go client.send()
?我认为返回 call 不需要等待 client.send()
执行完。
@yangchen97
我觉得你的理解是对的,我看了下,确实没有等待 send() 完成的必要。我给 golang 标准库提了一个 PR,看看官方的解释是什么样的~
go的网络IO本身就是异步的,加上这个go 提前返回也没必要吧。如果真想这么做, go func() { call := client.Go(...) } ()
?
@yangchen97 @qingyunha 在这个地方,网络IO是同步等待的,不是异步的。请求发送成功,send()
才会返回。不过 golang 的回复是 net/rpc
已经冻结了,不再接受新的特性了。但是就这个点而言,如果网络情况不太好的情况下,发送请求耗时很长,确实对性能会有一定的影响。
Rob Pike Patchset 1 15:54 As the documentation says, The net/rpc package is frozen and is not accepting new features.
异步请求确实有很多种其他的方式,但是 client.Go
的好处在于参数 done chan *Call
可以自定义缓冲区的大小,可以给多个 client.Go
传入同一个 chan 对象,从而控制异步请求并发的数量。其他方式就需要自己控制和实现了。
client.Dial方法中的defer不是很明白,此处的执行顺序是1.NewClient 2.defer 3.return 这个顺序吗
@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。
你可以写个简单的函数验证下:
func test() (ans int) {
defer func() {
fmt.Println(ans)
}()
return 10
}
func main() {
test()
}
输出是 10。
@andcarefree 这是 Go defer 的运行机制,在 return 语句之后,函数退出之前执行,defer 执行时,返回值已经被赋值了。
你可以写个简单的函数验证下:
func test() (ans int) { defer func() { fmt.Println(ans) }() return 10 } func main() { test() }
输出是 10。
多谢解惑
@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。
如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。
为什么方法的返回值(仅单个返回值)不需要的时候要用_抛弃,而不是直接调用不取返回值呢
seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return }
请问这个seq不是当前call的seq加1吗 这样的话 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) 这里remove的seq不应该是seq-1吗
type Call struct {
Seq uint64
ServiceMethod string // format "<service>.<method>"
Args interface{} // arguments to the function
Reply interface{} // reply from the function
Error error // if error occurs, it will be set
Done chan *Call // Strobes when call is complete.
}
type Header struct {
ServiceMethod string
Seq uint64
Error string
想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢
我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀
唉 决定了 下个月就转行卖烧烤去了 再会了 同志们
@yudidi 请问
五个条件
的出处是哪里呢?
不要纠结了 一起卖烧烤去
@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀
因为每个请求的ServiceMethod和Seq是不一样的,一方面服务端需要根据ServiceMethod来调用相应的方法 另一面 客户端中的func (client *Client) receive() 接收功能 需要根据Seq来判断服务器端的响应消息
//registerCall:将参数 call 添加到 client.pending 中,并更新 client.seq
func (c *Client) registerCall(call *Call) (uint64, error) {
// todo client前面被锁住了,这里为什么还需要再锁一次呢?
c.mu.Lock()
defer c.mu.Unlock()
if c.closing || c.shutdown {
return 0, ErrShutDown
}
call.Seq = c.seq
c.pending[call.Seq] = call
c.seq++
return call.Seq, nil
}
调用registerCall时,client刚被锁住,在registerCall中再锁一次是否多余?
func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }
请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。
receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。
我的理解是client向远端发送报文的时候才需要加sending锁
@ls8725 我也与@JesseStutler有同样的问题 seq, err := client.registerCall(call) if err != nil { call.Error = err call.done() return } // prepare request header client.header.ServiceMethod = call.ServiceMethod client.header.Seq = seq client.header.Error = "" 在send()方法里面,client.pending中是可以有多个call的,为啥client.header字段需要跟随call的变化而变化呀
我觉得client.header字段的目的是减少内存申请。一般来说,每次调用cc.write都需要新建一个Header类型的临时变量。尽管客户端是多协程的,但sending互斥量确保了每次只有一个协程可以调用cc.write方法,因此可以直接使用client.header字段。这也就是sending注释中protect following的含义。
@JesseStutler
type Call struct { Seq uint64 ServiceMethod string // format "<service>.<method>" Args interface{} // arguments to the function Reply interface{} // reply from the function Error error // if error occurs, it will be set Done chan *Call // Strobes when call is complete. } type Header struct { ServiceMethod string Seq uint64 Error string
想问一下,为什么Call与Header有公共字段,而不直接在Call中用Header呢
我对于这里的理解是, 确实存在公共字段, 不重用 Header 理由不是代码原因, 而是语义原因, 因为 Header 的存在是为了通讯, 而 Call 本身不涉及通讯, 他只是客户端发起的调用, 客户端理论上来说是不知道底层是如何调用的, 所以这里没有使用 Header , 而且使用了 Header 的话, 在初始化 Call 的时候, 代码也会复杂一点.
@chinawilon
@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。
如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。 基于同一个chan的话就需要客户端传入chan,客户端如果传入了chan,就不会走初始化为10的逻辑了,所以我觉得你这个逻辑不太成立
call := client.removeCall(h.Seq)
switch {
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊
@yzy-github
call := client.removeCall(h.Seq) switch { case call == nil: // it usually means that Write partially failed // and call was already removed. err = client.cc.ReadBody(nil) case h.Error != "": call.Error = fmt.Errorf(h.Error) err = client.cc.ReadBody(nil) call.done() default: err = client.cc.ReadBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() }
有个地方很奇怪,在removeCall中已经把对应的call从pending中删除了,那么为什么还需要调用call.done呢,就算执行了client.terminateCalls也通知不到啊
这里只是从服务器返回的信息当中拿到第一个被处理的call 解码后call.done是为了通知调用者这个call已经完成了
@chinawilon
@furthergo 我看到这里的时候也觉得蛮奇怪的,buffer 设置为 1,不要阻塞 call 的返回,理论上就OK了,我觉得 10 只是随手写的一个数字吧。
如果客户端大量基于这个chan的rpc异步请求,那么1显然是不够的,10是比较合理的。
@GodXuebi func (client *Client) terminateCalls(err error) { client.sending.Lock() defer client.sending.Unlock() client.mu.Lock() defer client.mu.Unlock() client.shutdown = true for _, call := range client.pending { call.Error = err call.done() } }
请问为什么这里要使用sending锁呢?对call.Done()这个环节应该不需要使用sending锁吧。
receive函数中的 if err := client.cc.Write(&client.header, call.Args); err != nil { call := client.removeCall(seq) // call may be nil, it usually means that Write partially failed, // client has received the response and handled if call != nil { call.Error = err call.done() } } } call.done()就没有加sending。
我的理解是client向远端发送报文的时候才需要加sending锁
这里获取sending锁可以防止新的Call被注册到pending
map中。
var _ io.Closer = (*Client)(nil)
你好,这段代码是什么意思呢?在网上也没找到太多的信息。
另外,client中的mu锁和sending为什么要分开,也没有太搞懂。我理解大多数时候都是用mu锁是因为client可以复用,为了避免相互干扰。但在send的时候使用sending锁,如果这时候有其他协程调用了close之类的方法又怎么处理呢?
var _ io.Closer = (*Client)(nil)
你好,这段代码是什么意思呢?在网上也没找到太多的信息。
断言client实现了closer接口,go里面这个写法很常见