启动篇-04Kubelet.Run
Kubelet.Run
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
ctx := context.Background()
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// 启动 cloudResourceSyncManager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// 启动包括 imageManager、serverCertificateManager、oomWatcher 和 resourceAnalyzer
// imageManager: 负责镜像垃圾回收
// serverCertificateManager: 证书管理
// oomWatcher: 监控内存是否 OOM
// resourceAnalyzer: 监控资源使用情况
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// 启动 volumeManager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// 启动两个 go-routine 更新状态
//
// 第一个在每个 nodeStatusUpdateFrequency 周期(默认是 10s)向 apiserver 报告,以提供周期状态
// 第二个用来在初始化期间提供更及时的状态更新,以及在节点准备就绪后向 apiserver 提供一次更新,然后退出
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// 启动 nodeLeaseController
go kl.nodeLeaseController.Run(wait.NeverStop)
}
// 更新容器运行时状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// 设置 iptables 规则
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// 启动 statusManager
kl.statusManager.Start()
// 如果设置了 RuntimeClasses ,就启动 runtimeClassManager
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// 启动 PLEG(pod lifecycle event generator)
kl.pleg.Start()
// Start eventedPLEG only if EventedPLEG feature gate is enabled.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
kl.eventedPleg.Start()
}
// 同步变化变更的循环
kl.syncLoop(ctx, updates, kl)
}Kubelet.syncLoop
// syncLoop 是 kubelet 用来处理变更的主循环
// 它监听来自三个渠道(file、apiserver 和 http)的更改,并他们合并
// 对于检测到的任何变更,将对期望状态和运行状态进行同步
// 如果未检测配置更改,则将在每个同步周期同步最后一个已知的期望状态
// 是一个 for 循环,不会退出
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// syncTicker 会唤醒 kubelet 来检查是否有 pod worker 需要同步
// 一个一秒钟的周期就足够了,因为同步间隔默认为 10 秒
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
// 报错时通过指数退避计时来重试
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// 成功则重置计时间隔
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}Kubelet.syncLoopIteration
// syncLoopIteration 负责从各个 channel 中读取信息,并将 pod 分发到指定的 handler
// 包括:
// 1. configCh: 从这个 channel 读取配置变更事件
// 2. handler: 使用 SyncHandler 分发 pod
// 3. syncCh: 从这个 channel 读取周期同步事件
// 4. housekeepingCh: 从这个 channel 读取 housekeeping 事件
// 5. plegCh: 从这个 channel 读取 PLEG 更新情况
//
// 同时还包括 liveness manager 的更新事件
//
// 具体的工作流就是从 channel 中读取消息,处理对应事件,然后在 sync loop monitor 中更新时间戳
//
// 这里需要注意的是,如果同时有多个 channel 可以读取,select 中的 case 语句是以伪随机顺序进行评估的
// 换句话说,case 以随机顺序执行,如果有多个 channel 可以读取,不能假设 case 语句是按顺序执行的
//
// 根据这一点,实际上没有特定的顺序,不同的 channel 处理以下事情:
//
// - configCh: 将需要修改的 pod 分发到事件类型对应的 handler 中
// - plegCh: 更新 runtime 缓存,同步 pod
// - syncCh: 同步所有待同步的 pod
// - housekeepingCh: 触发 pod 的清理
// - health manager: 同步运行失败或健康检测失败的 pod
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// 从配置源更新;将其分发到正确的 handler
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// kubelet 重启后,会将所有现有的 pod 视为新 pod 添加到 ADD 中
// 这些 pod 会通过 admission 过程,并 **有可能** 被拒绝
// 一旦我们有 checkpointing,这个问题就可以解决了
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE 会被视为 UPDATE,因为是优雅删除
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
// 将处理后的资源添加到 sourcesReady 中
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
// 过滤掉不需要同步的事件
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
// 如果容器死亡,则触发容器清理
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
// 同步所有待同步的 pod
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
// liveness 的事件,如果检测结果为失败,则触发探针同步
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
// readiness 的事件
ready := update.Result == proberesults.Success
// 设置容器的 readiness 状态
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
// readiness 状态标志,然后触发探针同步
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
// startup 的事件
started := update.Result == proberesults.Success
// 设置容器的 startup 状态
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
// startup 状态标志,然后触发探针同步
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
case <-housekeepingCh:
// 判断是否所有资源都已经准备好
if !kl.sourcesReady.AllReady() {
// 如果资源还没有准备好,或者 volume manager 还没有同步状态,则跳过 housekeeping
// 因为我们可能会意外地从未准备好的资源中删除 pod
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
// 执行 pod 清理
if err := handler.HandlePodCleanups(ctx); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}最后更新于
这有帮助吗?