sig-kubernetes
sig-kubernetes copied to clipboard
client-go源码issue
例子理解
我们从一段最简单的代码入手,这段代码块主要是监听了namespace,对于有新的namespace添加,就打印namespace的名字,我们看看K8S源码,看看这个黑盒子实际在在K8S发生了什么动作?
func main() {
// in cluster get config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// cilentset
clientset, err := kubernetes.NewForConfig(config)
...
// listen namespace informer for AddFunc
stopCh := make(chan struct{})
defer close(stopCh)
shareInformers := informers.NewSharedInformerFactory(clientset, time.Second)
informer := shareInformers.Core().V1().Namespaces().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func (obj interface{}) {
nObj := obj.(v1.Object)
log.Printf("New namespaces add %s", nObj.GetName())
},
})
informer.Run(stopCh)
}
NewSharedInformerFactory
我们先看看这段代码首先使用了rest去获取kubeconfig, 然后初始化clientset对象, 接下来我们使用NewSharedInformerFactory
工厂来获得shareInformers, 然后再使用调用Informer
来创建真正的Informer
我们可以先来看看namespace资源实现了Informer机制的代码块, 实现了Informer和Lister方法(K8S内部的每一个资源都实现了Informer机制)
代码块staging/src/k8s.io/client-go/informers/core/v1/namespace.go
type NamespaceInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.NamespaceLister
}
我们看看NewSharedInformerFactory
工厂, NewFilteredSharedInformerFactory 工厂来获得factory 对象,这个对象中的 informers是一个map 类型的数据结构, key 为资源类型,而 value 便是关注该资源类型的 Informer。
QA
其实我看不出来同一类资源Informer是共享一个Reflector的...
代码块staging/src/k8s.io/client-go/informers/factory.go
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
AddEventHandler
继续,我们的代码现在走到了AddEventHandler
, 可以看到这个接口实现了三个方法,我们的代码块只使用了其中的OnAddd去接受添加事件的触发
代码块staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
Informer.Run
最后我们通过informer.Run(stopCh)
来执行,这段代码比较重要
代码块staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 首先初始化了一个fifo的队列,我们可以看看下面DeltaFIFO的说明
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 我们把我们的deltafifo的队列以及listerWatcher等去初始化一个Controller所需要的配置:cfg对象,可以看下方的controller章节
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 创建了一个controller 实例
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 启动cacheMutationDetector
// 启动了processor
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
DeltaFIFO
拆开理解,FIFO是一个先进先出的队列,然后Delta是一个资源对象的存储,可以保存资源对象的操作类型。
我们来看看DeltaFIFO的数据结构:
queue字段存储资源对象的key, 该key可以通过下面的KeyOf
方法来得到,而items字段通过map数据结构的方式存储
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
closedLock sync.Mutex
}
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
从代码块staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
可以看出这个DeltaFIFO具有队列操作的级别方法,例如Added(添加)/Deleted等操作如下面代码块所示,理解一下,就是提供了生产者/消费者模式,,可以看看下面生产者消费者的小节。
QA
生产者是Reflector调用的Add方法,消费者是Controller调用的Pop方法 这里我没看出来这个顺序???咋看出来的
func (f *DeltaFIFO) Add(obj interface{}) error {
...
}
// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
...
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
...
}
DeltaFIFO生产者消费者方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 队列是空得时候,就阻塞
for len(f.queue) == 0 {
...
f.cond.Wait()
}
// 从头部取第一个数据
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// process 提供了 EventHandler 注册和事件分发的功能,由上层消费者进行处理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
Controller
上方informer.Run的时候,初始化了Controller所需要配置Config的对象,我们看看Controller配置的
代码块staging/src/k8s.io/client-go/tools/cache/controller.go
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
Queue
// Something that can list and watch your objects.
ListerWatcher
Process ProcessFunc
ObjectType runtime.Object
FullResyncPeriod time.Duration
ShouldResync ShouldResyncFunc
RetryOnError bool
}
QA
我从最简单的代码块,仍然是没办法像下图1 里面的顺序这样走下去,我的顺序更多是API Server-->ClientSet-->ShareInformerFactory-->AddEvent...
第一周的笔记比较惨,真的我没法按书里的顺序看下去。。按书的顺序我没法形成一个flow chart
回答你第一个问题 shareInformers.Core().V1().Namespaces().Informer() 调用的方法如下
//创建informer
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
//如果已经存在, 就不会再创建新的
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
https://blog.ihypo.net/15763910382218.html 可能会对你的理解有所帮助
NewSharedInformerFactoryWithOptions中会返回一个SharedInformerFactory其中有一个informers 是map[reflect.Type]cache.SharedIndexInformer 那么对应类型的informer(这段代码在staging/src/k8s.io/client-go/informers/factory.go),
//对应类型的对应sharedIndexInformer
type sharedIndexInformer struct {
indexer Indexer
controller Controller //这里的controller 值得注意
/*
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
*/
processor *sharedProcessor
cacheMutationDetector MutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
//controller的run实际是调用了reflector 的run方法 所以我理解在informer启动的时候会调用controller的run,那么controller的run会调用reflector的run方法,具体informer的run可以看最后面
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
// Start initializes all requested informers.
//此段代码在client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
//如果informer是 false 也就是未启动状态 那么!false=true
if !f.startedInformers[informerType] {
//启动这个informer
go informer.Run(stopCh)
//启动之后做标记
f.startedInformers[informerType] = true
}
}
}
type SharedInformer interface {
...
Run(stopCh <-chan struct{})
...
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
...
s.controller.Run(stopCh)
//所以这里也是调用了controller的run ,所以我理解一个type 对应一个informer 这里共享 reflector
}
队列这块的话 书中也提到了限速队列以及延迟对了和正常的队列
这块我找到了client-go/util/workqueue/ 这下面会有队列的一些代码