smux
smux copied to clipboard
看了好多遍都没看懂,这个地方没有错误吗
这个问题我想了很久也没想明白,恳请大侠指点一下。 因为我不相信这个程序这里会有错误。太抓狂了。
// session.go
// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
var reqs shaperHeap
var next writeRequest
var chWrite chan writeRequest
for {
if len(reqs) > 0 {
chWrite = s.writes
next = heap.Pop(&reqs).(writeRequest) // 从队列里取一个数据准备发送
} else {
chWrite = nil
}
select {
case <-s.die:
return
case r := <-s.shaper: //有新的数据,就把刚刚从队列里取出的数据放回去,问题是放回去顺序不就乱了吗
if chWrite != nil { // next is valid, reshape
heap.Push(&reqs, next)
}
heap.Push(&reqs, r)
case chWrite <- next:
}
}
}
我的理解 有prio = 1 的一种数据 a1 a2 a3 a4 a5,prio =2 的一种数据 b1,b2,b3,b4,b5 。只要保证 prio相同的数据的发送顺序即可。a、b两种数据可以优先发送a数据。上面的操作把数据取出来,再放回去不是打算了数据a这种数据的顺序了吗? 我写了个小程序测试了一下。程序和结果如下:
package main
import (
"container/heap"
"fmt"
"math/rand"
"time"
)
type writeRequest struct {
prio int
data string
}
type shaperHeap []writeRequest
func (h shaperHeap) Len() int { return len(h) }
func (h shaperHeap) Less(i, j int) bool { return h[i].prio < h[j].prio }
func (h shaperHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) }
func (h *shaperHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type Session struct {
die chan struct{}
shaper chan writeRequest // a shaper for writing
writes chan writeRequest
}
func NewSession() *Session {
s := &Session{}
s.writes = make(chan writeRequest)
s.shaper = make(chan writeRequest)
s.die = make(chan struct{})
return s
}
func (s *Session) Consume() {
for {
time.Sleep(300 * time.Millisecond)
select {
case <-s.die:
return
case wr := <-s.writes:
fmt.Printf("[-]% d %s\n", wr.prio, wr.data)
}
}
}
func (s *Session) Write(wr writeRequest) {
select {
case s.shaper <- wr:
fmt.Printf("[+] %d %s\n", wr.prio, wr.data)
case <-s.die:
return
}
}
// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
var reqs shaperHeap
var next writeRequest
var chWrite chan writeRequest
for {
if len(reqs) > 0 {
chWrite = s.writes
next = heap.Pop(&reqs).(writeRequest)
} else {
chWrite = nil
}
select {
case <-s.die:
return
case r := <-s.shaper:
if chWrite != nil { // next is valid, reshape
heap.Push(&reqs, next)
}
heap.Push(&reqs, r)
case chWrite <- next:
}
}
}
func main() {
sess := NewSession()
go sess.shaperLoop()
go sess.Consume()
rand.Seed(time.Now().UnixNano())
go func() {
a1 := writeRequest{1, "a1"}
a2 := writeRequest{1, "a2"}
a3 := writeRequest{1, "a3"}
a4 := writeRequest{1, "a4"}
a5 := writeRequest{1, "a5"}
b1 := writeRequest{2, "b1"}
b2 := writeRequest{2, "b2"}
b3 := writeRequest{2, "b3"}
b4 := writeRequest{2, "b4"}
b5 := writeRequest{2, "b5"}
sa := []writeRequest{a1, a2, a3, a4, a5}
sb := []writeRequest{b1, b2, b3, b4, b5}
_ = sa
_ = sb
for {
x := rand.Intn(2)
// fmt.Println(x)
if len(sa) == 0 && len(sb) == 0 {
break
}
if x == 0 {
if len(sa) > 0 {
aa := sa[0]
sa = sa[1:]
sess.Write(aa)
}
} else {
if len(sb) > 0 {
bb := sb[0]
sb = sb[1:]
sess.Write(bb)
}
}
}
}()
time.Sleep(30 * time.Second)
}
结果是这样的:
[+] 1 a1
[+] 2 b1
[+] 2 b2
[+] 1 a2
[+] 1 a3
[+] 2 b3
[+] 2 b4
[+] 1 a4
[+] 1 a5
[+] 2 b5
[-] 1 a2
[-] 1 a5
[-] 1 a4
[-] 1 a3
[-] 1 a1
[-] 2 b2
[-] 2 b3
[-] 2 b5
[-] 2 b4
[-] 2 b1
每个流都是独立的,所以流之间乱序没有关系,这反而是shaper的目的,如果是单一流,可能确实肯定存在一个prio变量overflow导致的乱序问题。
又看了下代码在每个stream内部,使用的是numWritten作为writeRequest结构体中的prio,所以在stream内部怎么重新排列都可以保证发送的顺序。 这个问题可以关闭了。
感谢您的回复,您的代码让我学到了很多!
func (s *Stream) writeV2(b []byte) (n int, err error) {
...
for len(bts) > 0 {
sz := len(bts)
if sz > s.frameSize {
sz = s.frameSize
}
frame.data = bts[:sz]
bts = bts[sz:]
n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
atomic.AddUint32(&s.numWritten, uint32(sz))
sent += n
if err != nil {
return sent, err
}
}
}
嗯,这个的prio溢出可能性是很高的,还是需要修复一下。
这个overflow的问题怎么保证呢,每个stream 发送 1 << 32 字节后就不行了。
因为stream.Write本身是阻塞函数,那么出现这个情况只可能是发送一个刚好在 1<<32附近的数据导致乱序,那么只需要做 无符号判断即可。
我怎么觉得这里的判断有问题呢? ··· // shaper.go package smux
func _itimediff(later, earlier uint32) int32 { return (int32)(later - earlier) } ···
比如说某stream用来传输一个大文件。
在这个stream 里,存在三个writeRequest 在队列里,分别为: prio = (1 << 32)- 32768 *2 prio = (1 << 32)- 32768 prio = (1 << 32)+ 1024
等待发送,最后一个溢出了。这样判断还还能保证顺序吗?
使用发送的字节数作为prio 真的合理吗? 假如长期存在以下两个stream。 另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。 一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。
在 shaperHeap 中存在很多这样的数据。 reshape的时候,是不是Stream B 一直得不到发送的机会?
我怎么觉得这里的判断有问题呢? ··· // shaper.go package smux
func _itimediff(later, earlier uint32) int32 { return (int32)(later - earlier) } ···
比如说某stream用来传输一个大文件。
在这个stream 里,存在三个writeRequest 在队列里,分别为: prio = (1 << 32)- 32768 *2 prio = (1 << 32)- 32768 prio = (1 << 32)+ 1024
等待发送,最后一个溢出了。这样判断还还能保证顺序吗?
成立,你可以做 go test -v -run Shaper
使用发送的字节数作为prio 真的合理吗? 假如长期存在以下两个stream。 另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。 一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。
在 shaperHeap 中存在很多这样的数据。 reshape的时候,是不是Stream B 一直得不到发送的机会?
数值32bit问题不大,很快就会回绕。
package smux
import (
"container/heap"
"testing"
)
func TestShaper(t *testing.T) {
w1 := writeRequest{prio: 10}
w2 := writeRequest{prio: 2048}
w3 := writeRequest{prio: (1 << 32) - 32768*2}
w4 := writeRequest{prio: (1 << 32) - 32768}
prioOverflow := uint32((1 << 32) - 32768)
prioOverflow += 32768
prioOverflow += 1024
w5 := writeRequest{prio: prioOverflow}
var reqs shaperHeap
heap.Push(&reqs, w5)
heap.Push(&reqs, w4)
heap.Push(&reqs, w3)
heap.Push(&reqs, w2)
heap.Push(&reqs, w1)
var lastPrio = reqs[0].prio
for len(reqs) > 0 {
w := heap.Pop(&reqs).(writeRequest)
if int32(w.prio-lastPrio) < 0 {
t.Fatal("incorrect shaper priority")
}
t.Log("prio:", w.prio)
lastPrio = w.prio
}
}
go test -run Shaper -v === RUN TestShaper shaper_test.go:43: prio: 4294901760 shaper_test.go:43: prio: 4294934528 shaper_test.go:43: prio: 10 shaper_test.go:43: prio: 1024 shaper_test.go:43: prio: 2048 --- PASS: TestShaper (0.00s) PASS ok _/Users/jason/code/smux 1.744s
您的意思是只要保证溢出数据附近的数据顺序正确就可以是吧。 感谢您的回复,我是个业余程序员,读代码也是技能,恕我愚钝,耽误您时间。 之前看第一版的学到了很多知识,比如分包组包、流复用,也用在了自己写的工具上,这个版本总算也快搞明白了。 感谢无私共享和指教!
有两个问题请您解答一下:
- stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧?
- 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?
// internal writeFrame version to support deadline used in keepalive
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
req := writeRequest{
prio: prio,
frame: f,
result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗?
}
有两个问题请您解答一下:
- stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧? 队列里可以有多个,阻塞是函数不保证立即返回。
- 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?
// internal writeFrame version to support deadline used in keepalive func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) { req := writeRequest{ prio: prio, frame: f, result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗? }
结果就一个,避免阻塞啊。
即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序
stream.Write 是阻塞的,Session.writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧? 队列里可以有多个,阻塞是函数不保证立即返回。
队列里可以有多个,阻塞是函数不保证立即返回。
我做了个测试,某一个stream 里(sid相同)的数据,没有并发调用 Write(不应该并发调用吧?) 函数的情况下,实际上同一时刻只有一个cmdPSH数据 在shaperHeap队列里。即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序,不会出错。
您说的队列里可以有多个
- 在多个stream里队列里肯定有多个
- 同一个stream里,可以有多个的情况是,还有 cmdNOP、cmdUPD等数据。
把shaper.go中比较函数改成随机的,删除shaper_test.go,也能测试通过。
//shaper.go
package smux
import "math/rand"
func _itimediff(later, earlier uint32) int32 {
x := rand.Intn(2)
return int32(x)
// return (int32)(later - earlier)
}
··· jason@MacBook-Air ~/g/s/g/x/smux (master)> go test -v === RUN TestAllocGet --- PASS: TestAllocGet (0.00s) === RUN TestAllocPut --- PASS: TestAllocPut (0.00s) === RUN TestAllocPutThenGet --- PASS: TestAllocPutThenGet (0.00s) === RUN TestConfig mux_test.go:23: keep-alive interval must be positive mux_test.go:32: keep-alive timeout must be larger than keep-alive interval mux_test.go:40: max frame size must be positive mux_test.go:48: max frame size must not be larger than 65535 mux_test.go:56: max receive buffer must be positive mux_test.go:64: max stream buffer must be positive mux_test.go:73: max stream buffer must not be larger than max receive buffer --- PASS: TestConfig (0.00s) === RUN TestEcho --- PASS: TestEcho (0.01s) === RUN TestWriteTo --- PASS: TestWriteTo (0.03s) === RUN TestWriteToV2 --- PASS: TestWriteToV2 (0.02s) === RUN TestGetDieCh --- PASS: TestGetDieCh (0.00s) === RUN TestSpeed session_test.go:315: 127.0.0.1:56096 127.0.0.1:56095 session_test.go:336: time for 16MB rtt 59.936929ms --- PASS: TestSpeed (0.06s) === RUN TestParallel session_test.go:374: created 501 streams --- PASS: TestParallel (1.49s) === RUN TestParallelV2 session_test.go:408: created 605 streams --- PASS: TestParallelV2 (1.48s) === RUN TestCloseThenOpen --- PASS: TestCloseThenOpen (0.00s) === RUN TestSessionDoubleClose --- PASS: TestSessionDoubleClose (0.00s) === RUN TestStreamDoubleClose --- PASS: TestStreamDoubleClose (0.00s) === RUN TestConcurrentClose --- PASS: TestConcurrentClose (0.00s) === RUN TestTinyReadBuffer --- PASS: TestTinyReadBuffer (0.01s) === RUN TestIsClose --- PASS: TestIsClose (0.00s) === RUN TestKeepAliveTimeout --- PASS: TestKeepAliveTimeout (3.00s) === RUN TestKeepAliveBlockWriteTimeout --- PASS: TestKeepAliveBlockWriteTimeout (3.00s) === RUN TestServerEcho --- PASS: TestServerEcho (0.02s) === RUN TestSendWithoutRecv --- PASS: TestSendWithoutRecv (0.00s) === RUN TestWriteAfterClose --- PASS: TestWriteAfterClose (0.00s) === RUN TestReadStreamAfterSessionClose session_test.go:703: EOF --- PASS: TestReadStreamAfterSessionClose (0.00s) === RUN TestWriteStreamAfterConnectionClose --- PASS: TestWriteStreamAfterConnectionClose (0.00s) === RUN TestNumStreamAfterClose --- PASS: TestNumStreamAfterClose (0.00s) === RUN TestRandomFrame --- PASS: TestRandomFrame (0.01s) === RUN TestWriteFrameInternal --- PASS: TestWriteFrameInternal (1.01s) === RUN TestReadDeadline --- PASS: TestReadDeadline (0.00s) === RUN TestWriteDeadline --- PASS: TestWriteDeadline (0.00s) PASS ok github.com/xtaci/smux 13.257s ···
都是正数,当然能过哈
无符号数转换为带符号数,比如uint32 1<<32 - 1 ,cast成 int32,就会负了。
都是正数,当然能过哈
//shaper.go
package smux
import "math/rand"
func _itimediff(later, earlier uint32) int32 {
x := rand.Intn(2)
return int32(x) // 这里返回0 或者1
// return (int32)(later - earlier)
}
//shaper.go 这里比较的是大于0
func (h shaperHeap) Less(i, j int) bool { return _itimediff(h[j].prio, h[i].prio) > 0 }
因为 你的测试函数,TestWriteToV2,这里有比较发送和接受的数据是否一致,能测试通过说明数据顺序是正确的。原因就是某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,
func TestWriteToV2(t *testing.T) {
......
if bytes.Compare(sndbuf, rcvbuf.Bytes()) != 0 {
t.Fatal("mismatched echo bytes")
}
}
我的意思是说把s.numWritten 作为 writeFrameInternal函数的prio参数,并没有发挥作用啊。
func (s *Stream) writeV2(b []byte) (n int, err error) {
...
for len(bts) > 0 {
sz := len(bts)
if sz > s.frameSize {
sz = s.frameSize
}
frame.data = bts[:sz]
bts = bts[sz:]
n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
atomic.AddUint32(&s.numWritten, uint32(sz))
sent += n
if err != nil {
return sent, err
}
}
}