6174.github.io
6174.github.io copied to clipboard
syncthing 源码阅读:events pubsub 模型
syncthing/lib/events/events.go
syncthing 事件管理
入口函数部分看到了 apiSub 和 diskSub
apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected), 1000)
diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected), 1000)
syncthing 的 pubsub 模块是通过 events 包来管理的,下面咱来完整的看看 events 的设计
events 包结构
- EventType
- Event
- Logger
- Default -> Logger 实例
- Subscription
- BufferedSubscription
EventType
定义 event 的类型常量和对应的字符串
type EventType int
const (
Ping EventType = 1 << iota
Starting
....
AllEvents = (1 << iota) - 1
)
这里通过 iota 表达式技巧来定义常量,对应
Ping 00000000
Starting 00000010
...
AllEvents 0111...11
常量采用类位运算的方式
1 << iota 表示向左移动 iota 位
mask & eventType != 0 来判断 mask 时候包好对应的事件类型
mask = Starting : Starting 事件 mask = Starting & Ping : Staring 和 Ping 事件 mask = AllEvents&^Starting : Starting 除外的其他事件 (&^ 表示后者先取反再执行 & 运算)
EventType String 方法返回对应常量的字符串 slice
Event
每次时间对应一个 Event 实例 ,结构
type Event struct {
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
// 一个订阅者创建后,后面所有订阅者订阅的事件都会传递给订阅者-对应事件序列,SubscriptionID 是这个序列中事件对应的 ID
SubscriptionID int `json:"id"`
// 事件全局递增 ID
GlobalID int `json:"globalID"`
// 事件创建时间
Time time.Time `json:"time"`
// 事件类型
Type EventType `json:"type"`
// 时间附带数据 (interface{} 为万能类型)
Data interface{} `json:"data"`
}
Logger
Logger 管理了事件的创建和订阅,结构:
type Logger struct {
// 订阅者数组
subs []*Subscription
// 用于产生下次事件的 e.SubscriptionId ,对应每一个 sub 一个 int 值
nextSubscriptionIDs []int
// 事件的全局 ID 管理
nextGlobalID int
// Logger 可能会被多个 goroutine 调用,所以必须要设置互斥
mutex sync.Mutex
}
// Log 创建事件
func (l *Logger) Log(t EventType, data interface{}){}
// Subscribe 方法创建对应事件的订阅者
func (l *Logger) Subscribe(mask EventType) *Subscription {}
// 取消事件的订阅
func (l *Logger) Unsubscribe(s *Subscription) {}
Default
对外暴露的默认 Logger ,调用 Default.Log 创建事件,Default.Subscribe 创建订阅者
Subscription
订阅者
const BufferSize = 64
type Subscription struct {
// 订阅的事件 mask
mask EventType
// 事件 channel , 对应一个有缓冲区的通道,缓冲区大小为 BufferSize
events chan Event
// 接收事件设置一个 timeout ,在 timeout 之内不会重复接收
timeout *time.Timer
}
// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.
// Poll 方法将本 goroutine 停住,等待一个事件产生,或者超时事件产生
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {}
// 返回事件通道
func (s *Subscription) C() <-chan Event {}
BufferedSubscription
相当于一个订阅者的事件缓冲区(感觉名字应该叫 SubscriptionEventBuffer)
type bufferedSubscription struct {
// 订阅者
sub *Subscription
// 事件缓冲数组
buf []Event
// 缓冲区是有大小限制的,next 每次获得一个事件过后要放在缓冲区的什么位置
next int
// 当前的 event 对应的 SubscriptionID
cur int // Current SubscriptionID
// 多个 goroutine 场景要使用互斥和信号量来
mut sync.Mutex
cond *stdsync.Cond
}
// 创建缓冲区并启动一个 goroutine 运行 pollingLoop
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
bs := &bufferedSubscription{
sub: s,
buf: make([]Event, size),
mut: sync.NewMutex(),
}
bs.cond = stdsync.NewCond(bs.mut)
go bs.pollingLoop()
return bs
}
// pollingLoop 方法会启动一个循环,不断的调用 sub.Poll 方法获取事件
func (s *bufferedSubscription) pollingLoop() {}
// Since 方法会获取 id 后缓冲区内发生的所有事件
func (s *bufferedSubscription) Since(id int, into []Event) []Event {
s.mut.Lock()
defer s.mut.Unlock()
// 无线循环直到等到有发生在 id 后的事件产生
// Q: 为什么没有非要一直等呢,万一一直没有事件发生不是卡死了么
for id >= s.cur {
s.cond.Wait()
}
for i := s.next; i < len(s.buf); i++ {
if s.buf[i].SubscriptionID > id {
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
if s.buf[i].SubscriptionID > id {
into = append(into, s.buf[i])
}
}
return into
}
完整流程
events 模块中最重要的就是 Default 和 BufferedSubscription 了,通过 go 插件可以看到调用方:

- sub = events.Default.Subscribe 创建订阅者
- bsub = events.NewBufferedSubscription 创建一个事件订阅缓冲区
- events.Default.Log 产生事件
- bsub.Since 方法获取缓冲区内的事件列表
请问这是用的什么go插件?
@amozz https://github.com/go-lang-plugin-org/go-lang-idea-plugin