sig-kubernetes icon indicating copy to clipboard operation
sig-kubernetes copied to clipboard

client-go源码issue

Open JaneLiuL opened this issue 4 years ago • 2 comments

例子理解

我们从一段最简单的代码入手,这段代码块主要是监听了namespace,对于有新的namespace添加,就打印namespace的名字,我们看看K8S源码,看看这个黑盒子实际在在K8S发生了什么动作?

func main() {
	// in cluster get config
	config, err := rest.InClusterConfig()
	if err != nil {
		panic(err.Error())
	}

	// cilentset
	clientset, err := kubernetes.NewForConfig(config)
...

	// listen namespace informer for AddFunc
	stopCh := make(chan struct{})
	defer close(stopCh)

	shareInformers := informers.NewSharedInformerFactory(clientset, time.Second)
	informer := shareInformers.Core().V1().Namespaces().Informer()

	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func (obj interface{})  {
			nObj := obj.(v1.Object)
			log.Printf("New namespaces add %s", nObj.GetName())
		},
	})
	informer.Run(stopCh)
	
}

NewSharedInformerFactory

我们先看看这段代码首先使用了rest去获取kubeconfig, 然后初始化clientset对象, 接下来我们使用NewSharedInformerFactory工厂来获得shareInformers, 然后再使用调用Informer来创建真正的Informer

我们可以先来看看namespace资源实现了Informer机制的代码块, 实现了Informer和Lister方法(K8S内部的每一个资源都实现了Informer机制)

代码块staging/src/k8s.io/client-go/informers/core/v1/namespace.go

type NamespaceInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.NamespaceLister
}

我们看看NewSharedInformerFactory 工厂, NewFilteredSharedInformerFactory 工厂来获得factory 对象,这个对象中的 informers是一个map 类型的数据结构, key 为资源类型,而 value 便是关注该资源类型的 Informer。

QA

其实我看不出来同一类资源Informer是共享一个Reflector的...

代码块staging/src/k8s.io/client-go/informers/factory.go

func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

AddEventHandler

继续,我们的代码现在走到了AddEventHandler, 可以看到这个接口实现了三个方法,我们的代码块只使用了其中的OnAddd去接受添加事件的触发

代码块staging/src/k8s.io/client-go/tools/cache/controller.go

type ResourceEventHandler interface {
	OnAdd(obj interface{})
	OnUpdate(oldObj, newObj interface{})
	OnDelete(obj interface{})
}

Informer.Run

最后我们通过informer.Run(stopCh)来执行,这段代码比较重要

代码块staging/src/k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

    // 首先初始化了一个fifo的队列,我们可以看看下面DeltaFIFO的说明
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

    // 我们把我们的deltafifo的队列以及listerWatcher等去初始化一个Controller所需要的配置:cfg对象,可以看下方的controller章节
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

        // 创建了一个controller 实例
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

    // 启动cacheMutationDetector
    // 启动了processor	
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
    
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

DeltaFIFO

拆开理解,FIFO是一个先进先出的队列,然后Delta是一个资源对象的存储,可以保存资源对象的操作类型。

我们来看看DeltaFIFO的数据结构:

queue字段存储资源对象的key, 该key可以通过下面的KeyOf方法来得到,而items字段通过map数据结构的方式存储

type DeltaFIFO struct {
	lock sync.RWMutex
	cond sync.Cond
	items map[string]Deltas    
	queue []string
	populated bool
	initialPopulationCount int
	keyFunc KeyFunc	
	knownObjects KeyListerGetter
	closed     bool
	closedLock sync.Mutex
}

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 {
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
		obj = d.Newest().Object
	}
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return f.keyFunc(obj)
}

从代码块staging/src/k8s.io/client-go/tools/cache/delta_fifo.go可以看出这个DeltaFIFO具有队列操作的级别方法,例如Added(添加)/Deleted等操作如下面代码块所示,理解一下,就是提供了生产者/消费者模式,,可以看看下面生产者消费者的小节。

QA

生产者是Reflector调用的Add方法,消费者是Controller调用的Pop方法 这里我没看出来这个顺序???咋看出来的

func (f *DeltaFIFO) Add(obj interface{}) error {
...
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
...
}

func (f *DeltaFIFO) Delete(obj interface{}) error {	
...
}
DeltaFIFO生产者消费者方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
        // 队列是空得时候,就阻塞
		for len(f.queue) == 0 {
...
			f.cond.Wait()
		}
        // 从头部取第一个数据
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
        // process 提供了 EventHandler 注册和事件分发的功能,由上层消费者进行处理
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		return item, err
	}
}

Controller

上方informer.Run的时候,初始化了Controller所需要配置Config的对象,我们看看Controller配置的

代码块staging/src/k8s.io/client-go/tools/cache/controller.go

type Config struct {
	// The queue for your objects - has to be a DeltaFIFO due to
	// assumptions in the implementation. Your Process() function
	// should accept the output of this Queue's Pop() method.
	Queue
	// Something that can list and watch your objects.
	ListerWatcher
	Process ProcessFunc
	ObjectType runtime.Object
	FullResyncPeriod time.Duration
	ShouldResync ShouldResyncFunc
	RetryOnError bool
}

QA

我从最简单的代码块,仍然是没办法像下图1 里面的顺序这样走下去,我的顺序更多是API Server-->ClientSet-->ShareInformerFactory-->AddEvent...

image

第一周的笔记比较惨,真的我没法按书里的顺序看下去。。按书的顺序我没法形成一个flow chart

JaneLiuL avatar Aug 05 '20 07:08 JaneLiuL

回答你第一个问题 shareInformers.Core().V1().Namespaces().Informer() 调用的方法如下

//创建informer
func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
       //如果已经存在, 就不会再创建新的
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

xyz-li avatar Aug 05 '20 07:08 xyz-li

https://blog.ihypo.net/15763910382218.html 可能会对你的理解有所帮助

NewSharedInformerFactoryWithOptions中会返回一个SharedInformerFactory其中有一个informers 是map[reflect.Type]cache.SharedIndexInformer 那么对应类型的informer(这段代码在staging/src/k8s.io/client-go/informers/factory.go),

//对应类型的对应sharedIndexInformer 
type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller //这里的controller 值得注意
/*
type Controller interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}
*/
	processor             *sharedProcessor
	cacheMutationDetector MutationDetector

	// This block is tracked to handle late initialization of the controller
	listerWatcher ListerWatcher
	objectType    runtime.Object

	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex
}
type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

//controller的run实际是调用了reflector 的run方法 所以我理解在informer启动的时候会调用controller的run,那么controller的run会调用reflector的run方法,具体informer的run可以看最后面
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}
// Start initializes all requested informers.
//此段代码在client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
   f.lock.Lock()
   defer f.lock.Unlock()

   for informerType, informer := range f.informers {
      //如果informer是 false  也就是未启动状态 那么!false=true
      if !f.startedInformers[informerType] {
         //启动这个informer
         go informer.Run(stopCh)
         //启动之后做标记
         f.startedInformers[informerType] = true
      }
   }
}


type SharedInformer interface {
	...

	Run(stopCh <-chan struct{})
  ...
}


func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
... 
  
	s.controller.Run(stopCh)
  //所以这里也是调用了controller的run ,所以我理解一个type 对应一个informer  这里共享 reflector 
}

队列这块的话 书中也提到了限速队列以及延迟对了和正常的队列

这块我找到了client-go/util/workqueue/ 这下面会有队列的一些代码

strive-after avatar Aug 05 '20 07:08 strive-after