Kubernetes scheduler 源码阅读
Kubernetes scheduler 源码阅读
本文基于 Kubernetes 1.20 版本。
Kubernetes 调度器负责将 Pod 调度到集群内的节点上,它监听 API Server,查询还未分配 Node 的 Pod,然后根据调度策略为这些 Pod 分配节点(更新 Pod 的 NodeName 字段)。
0. 调度框架简介
调度框架是 Kubernetes Scheduler 的一种可插入架构,可以简化调度器的自定义。 它向现有的调度器增加了一组新的“插件” API。插件被编译到调度器程序中。 这些 API 允许大多数调度功能以插件的形式实现,同时使调度“核心”保持简单且可维护。
调度框架定义了一些扩展点。调度器插件注册后在一个或多个扩展点处被调用。 这些插件中的一些可以改变调度决策,而另一些仅用于提供信息。
每次调度一个 Pod 的尝试都分为两个阶段,即 调度周期 和 绑定周期。
- 调度周期为 Pod 选择一个节点,绑定周期将该决策应用于集群。 调度周期和绑定周期一起被称为“调度上下文”。
- 调度周期是串行运行的,而绑定周期可以并发运行。
- 如果确定 Pod 不可调度或者存在内部错误,则可以终止调度周期或绑定周期。 Pod 将返回队列并重试。

以上简介内容来自官方文档,下面开始看源码。
Framework是一个接口,要实现此接口需要实现上面提到的各扩展点的方法:
framework 结构实现了 Framework 接口:
type framework struct {
registry Registry
snapshotSharedLister schedulerlisters.SharedLister
waitingPods *waitingPodsMap
pluginNameToWeightMap map[string]int
queueSortPlugins []QueueSortPlugin
preFilterPlugins []PreFilterPlugin
filterPlugins []FilterPlugin
preScorePlugins []PreScorePlugin
scorePlugins []ScorePlugin
reservePlugins []ReservePlugin
preBindPlugins []PreBindPlugin
bindPlugins []BindPlugin
postBindPlugins []PostBindPlugin
unreservePlugins []UnreservePlugin
permitPlugins []PermitPlugin
clientSet clientset.Interface
informerFactory informers.SharedInformerFactory
volumeBinder *volumebinder.VolumeBinder
metricsRecorder *metricsRecorder
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
}
1. 启动 kube-scheduler 进程
首先找到 scheduler 的入口:cmd/kube-scheduler/scheduler.go 中的 main 函数,代码编译后,可通过终端命令启动程序。
command := app.NewSchedulerCommand()
// ...
if err := command.Execute(); err != nil {
os.Exit(1)
}
进入下一个函数:cmd/kube-scheduler/app/server.go 中的 Run 函数:
Run 函数做的是基于给定的配置信息和 registryOptions 运行 scheduler,只有在出错或终端命令终止时退出:
// 因为 runCommand() 中设置了 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
简单看一下 Scheduler 里面几个关键字段,先不必深入细节:
// 监控未调度的 Pod, 找到合适的节点, 并将绑定信息写回 API Server.
type Scheduler struct {
// 通过 SchedulerCache 所做的变更会被 NodeLister 和 Algorithm 观察到.
SchedulerCache internalcache.Cache
Algorithm core.ScheduleAlgorithm
podConditionUpdater podConditionUpdater
// 用来驱逐 Pods 和更新抢占者 Pod 的 NominatedNode 字段.
podPreemptor podPreemptor
// 函数实现应该是阻塞的, 直到有可用的 Pod 才返回.
// 此函数不使用 channel, 因为调度 Pod 会花一些时间, Pod 放在 channel 中可能会时间过长而 stale.
NextPod func() *framework.PodInfo
Error func(*framework.PodInfo, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// 处理 Pod PVC/PV 的绑定
VolumeBinder *volumebinder.VolumeBinder
// 是否禁用 Pod 抢占.
DisablePreemption bool
// 待调度 Pod 的队列, 后面会详细介绍
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
scheduledPodsHasSynced func() bool
}
回到主线,Run 函数:
// 初始化调度框架的插件注册表
// ...
// 创建 Scheduler 结构
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
)
// 准备事件广播器
// 设置 healthz 检查并启动该服务
// 另起一个 goroutine 运行 informers
// ...
// 运行 scheduler
sched.Run(ctx)
sched.Run 方法开始进行 Pod 调度:
// 执行监听和调度, 等待缓存同步完毕后, 阻塞的执行调度,直到 context cancel。
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
// Pod 队列操作, 里面起了两个 goroutine
sched.SchedulingQueue.Run()
// sched.scheduleOne 实现 Pod 的调度逻辑
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
下面分开看 Pod 队列的操作和 Pod 调度。
2. 优先级队列 PriorityQueue
sched.SchedulingQueue.Run() 方法将 Pod 在不同的队列中进行移动。SchedulingQueue 是一个 interface,实现此接口的队列用来存储等待调度的 Pod。
实现此接口的结构是 PriorityQueue,它主要包含三个队列:
- activeQ:调度器从 activeQ 队列寻找 Pod 来调度,队列最前面的 Pod 的优先级最高。
- podBackoffQ:Pod 调度失败后放入此队列,里面的 Pod 按退避到期顺序 (即重试时间) 排序。在调度器从 activeQ 寻找 Pod 之前,会从 podBackoffQ 中弹出退避完成 (即到达重试时间) 的 Pod 进行重新调度。
- unschedulableQ:是一个 map,用来存储那些已尝试并确定无法调度的 Pod。
简单看一下数据结构:
type PriorityQueue struct {
lock sync.RWMutex // 此结构是非线程安全的
// Heap 是一个 map+slice 实现的队列.
activeQ *heap.Heap
podBackoffQ *heap.Heap
unschedulableQ *UnschedulablePodsMap
// nominatedPods 是一个 map, 用来存储被提名的在节点上运行的 Pod.
nominatedPods *nominatedPodMap
// 表示调度周期的序号, 当有一个 Pod 从 activeQ 弹出时将加 1.
schedulingCycle int64
// 收到一个移动请求时, 缓存调度周期的序号.
// 当收到移动请求时, 如果尝试去调度在这个调度周期之前和之中的不可调度的 Pod, 它们将被放回到 activeQueue.
moveRequestCycle int64
}
实际执行的是 PriorityQueue 的 Run 方法,它负责将 Pod 在三个队列中进行移动:
// wait.Until 函数来保证 flushBackoffQCompleted 和 flushUnschedulableQLeftover 两个方法失败时会不断重试.
func (p *PriorityQueue) Run() {
// 每 1 秒执行一次 flushBackoffQCompleted 方法, 直到收到停止信号.
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
// 每 30 秒执行一次 flushUnschedulableQLeftover 方法, 直到收到停止信号.
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}
PriorityQueue.flushBackoffQCompleted 方法做的事情是把 BackoffQ 里面到达重试时间的 Pod 放回到 activeQ:
func (p *PriorityQueue) flushBackoffQCompleted() {
// 省略加解锁代码(什么情况下会发生锁竞争?)
for {
rawPodInfo := p.podBackoffQ.Peek()
// ...
// 查看队列头部的 Pod 是否到达重新调度时间 (根据尝试次数计算), 未到达则 return.
// 这里是先查看 Pod 符合 pop 的条件后才执行真正的 pop 动作.
// ...
_, err := p.podBackoffQ.Pop()
// 入 activeQ 队列
p.activeQ.Add(rawPodInfo)
// 唤醒所有等待从 activeQ 队列 pop Pod 的 goroutine (sched.scheduleOne 方法会调用 sched.NextPod() 来获取 Pod)
defer p.cond.Broadcast()
}
}
PriorityQueue.flushUnschedulableQLeftover 方法会把在 unschedulableQ 队列中存放时间超过 unschedulableQTimeInterval (60 秒) 的 Pod 移到 podBackoffQ 或 activeQ。
func (p *PriorityQueue) flushUnschedulableQLeftover() {
// 加锁
// ...
// 遍历 map 寻找到达重是时间的 Pod 放入 podsToMove
// ...
if len(podsToMove) > 0 {
// 主要逻辑, 正在等待 backoff 时间的 Pod 会放入 podBackoffQ, 否则放入 activeQ
// UnschedulableTimeout 表示一个事件, 用来统计 metrics
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}
接下来看 Pod 真正的调度逻辑。
3. Pod 调度
Pod 的调度逻辑在 Scheduler.scheduleOne 方法中实现。大体思路是先找到合适的节点,缓存必要信息,假定 Pod 已经运行在该节点上,真正的绑定操作是异步进行的。
看一下它的调度逻辑:
-
从队列中获取一个待调度的 Pod。
-
获取此 Pod 所属调度器的 Profile,包括根据给定的配置创建的 Framework:
// pkg/scheduler/profile/profile.go type Profile struct { framework.Framework Recorder events.EventRecorder } // pkg/scheduler/apis/config/types.go type KubeSchedulerProfile struct { SchedulerName string // 调度器要用到的插件 Plugins *Plugins PluginConfig []PluginConfig } -
创建一个 CycleState 结构供插件读写数据,各插件的数据可以互相读写。
// pkg/scheduler/framework/v1alpha1/cycle_state.go type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool } -
调用
sched.Algorithm.Schedule()方法,经过调用一系列插件的过滤及打分,得到一个符合的节点。- 运行 Prefilter 插件集。
- 调用
findNodesThatFitPod()方法,经过 Filter 和 Extender 的过滤,得到符合条件的节点列表。 - 运行 Prescore 插件集,都成功后进行后续逻辑。
- 调用
prioritizeNodes()方法执行 Score 插件集,也可以运行任何的 extender。- 每个插件的分数加在一起,就是一个节点的总分。
- 返回 NodeScore 列表。
- 选择一个得分最高的节点。
-
复制一份 Pod 信息,假定该 Pod 已经运行在选定的节点上,即使还没有绑定它们。这样调度器可以继续调度其它 Pod,而无需等待绑定操作完成(绑定操作是异步进行的)。
-
调用
AssumePodVolumes()方法缓存 Pod 的节点选择。如果需要 PVC 绑定,则只在内存中缓存。- AssumePodVolumes 将把缓存的匹配 PV 和 PVC 提供给 podBindingCache 中的所选节点。
- 用新的预绑定 PV 更新 pvCache。
- 用新的带 annotations 集合的 PVC 更新 pvcCache。
- 用为 PV 和 PVC 缓存的 API 更新再次更新 podBindingCache 。
-
运行 Reserve 插件。
-
调用 assume 方法来把 assumedPod 缓存起来,缓存前它会设置
assumed.Spec.NodeName = scheduleResult.SuggestedHost,即所谓的 Pod 绑定信息。 -
调用
RunPermitPlugins()方法运行 Permit 插件集。Permit 插件用于防止或延迟 Pod 的绑定。- 如果返回的不是 Success 或 Wait,将不会继续执行剩下的 Permit 插件并返回错误。
- 如果有任一插件返回的是 Wait,在所有插件运行完后,会创建一个 waitingPod(已经开始) 并将其放入 waitingPods map 里(后面会用到),随后返回。
- 如果都返回 Success,则继续后面的异步绑定操作。
-
起一个 goroutine 进行绑定:
- 执行 WaitOnPermit 方法在 Permit 阶段等待 Pod。
- 绑定 Volumes。
- 它使用前面假定的绑定更新 API,并等待 PV 控制器完成绑定操作。
- 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
- 执行 prebind 插件。同上面一样,失败后会触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
- 执行 bind 操作。
- 绑定的优先级:先执行 extender 再执行 framework 的 Bind 插件。其实就是把 Pod 与 Node 的绑定信息发送给 API Server 处理。
- 绑定成功后,会调用
finishBinding()方法使缓存的 Pod 过期。
- 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
- 执行 Postbind 插件。
至此,调度一个 Pod 的逻辑就梳理完毕了,不过,还有一些细节需要再梳理。