启动篇-04Kubelet.Run

Kubelet.Run

pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {  
   ctx := context.Background()  
   if kl.logServer == nil {  
      kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))  
   }  
   if kl.kubeClient == nil {  
      klog.InfoS("No API server defined - no node status update will be sent")  
   }  
   
   // 启动 cloudResourceSyncManager   
   if kl.cloudResourceSyncManager != nil {  
      go kl.cloudResourceSyncManager.Run(wait.NeverStop)  
   }  
  
   // 启动包括 imageManager、serverCertificateManager、oomWatcher 和 resourceAnalyzer   
   // imageManager: 负责镜像垃圾回收  
   // serverCertificateManager: 证书管理  
   // oomWatcher: 监控内存是否 OOM   
   // resourceAnalyzer: 监控资源使用情况  
   if err := kl.initializeModules(); err != nil {  
      kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())  
      klog.ErrorS(err, "Failed to initialize internal modules")  
      os.Exit(1)  
   }  
  

   // 启动 volumeManager   
   go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)  
  
   if kl.kubeClient != nil {       
      // 启动两个 go-routine 更新状态  
      //  
      // 第一个在每个 nodeStatusUpdateFrequency 周期(默认是 10s)向 apiserver 报告,以提供周期状态  
      // 第二个用来在初始化期间提供更及时的状态更新,以及在节点准备就绪后向 apiserver 提供一次更新,然后退出  
      go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)  
      go kl.fastStatusUpdateOnce()  
  
      // 启动 nodeLeaseController
      go kl.nodeLeaseController.Run(wait.NeverStop)  
   }  
   // 更新容器运行时状态  
   go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)  
  
   // 设置 iptables 规则  
   if kl.makeIPTablesUtilChains {  
      kl.initNetworkUtil()  
   }  
  
   // 启动 statusManager   
   kl.statusManager.Start()  
  
   // 如果设置了 RuntimeClasses ,就启动 runtimeClassManager   
   if kl.runtimeClassManager != nil {  
      kl.runtimeClassManager.Start(wait.NeverStop)  
   }  
  
   // 启动 PLEG(pod lifecycle event generator)  
   kl.pleg.Start()  
  
   // Start eventedPLEG only if EventedPLEG feature gate is enabled.  
   if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {  
      kl.eventedPleg.Start()  
   }  
  
   // 同步变化变更的循环  
   kl.syncLoop(ctx, updates, kl)  
}

Kubelet.syncLoop

pkg/kubelet/kubelet.go
// syncLoop 是 kubelet 用来处理变更的主循环  
// 它监听来自三个渠道(file、apiserver 和 http)的更改,并他们合并  
// 对于检测到的任何变更,将对期望状态和运行状态进行同步  
// 如果未检测配置更改,则将在每个同步周期同步最后一个已知的期望状态  
// 是一个 for 循环,不会退出  
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {  
   klog.InfoS("Starting kubelet main sync loop")    
   // syncTicker 会唤醒 kubelet 来检查是否有 pod worker 需要同步  
   // 一个一秒钟的周期就足够了,因为同步间隔默认为 10 秒  
   syncTicker := time.NewTicker(time.Second)  
   defer syncTicker.Stop()  
   housekeepingTicker := time.NewTicker(housekeepingPeriod)  
   defer housekeepingTicker.Stop()  
   plegCh := kl.pleg.Watch()  
   const (  
      base   = 100 * time.Millisecond  
      max    = 5 * time.Second  
      factor = 2  
   )  
   duration := base  
   // Responsible for checking limits in resolv.conf  
   // The limits do not have anything to do with individual pods   
   // Since this is called in syncLoop, we don't need to call it anywhere else   
   if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {  
      kl.dnsConfigurer.CheckLimitsForResolvConf()  
   }  
  
   for {  
      // 报错时通过指数退避计时来重试  
      if err := kl.runtimeState.runtimeErrors(); err != nil {  
         klog.ErrorS(err, "Skipping pod synchronization")  
         // exponential backoff  
         time.Sleep(duration)  
         duration = time.Duration(math.Min(float64(max), factor*float64(duration)))  
         continue  
      }  
      // 成功则重置计时间隔  
      duration = base  
  
      kl.syncLoopMonitor.Store(kl.clock.Now())  
      if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {  
         break  
      }  
      kl.syncLoopMonitor.Store(kl.clock.Now())  
   }  
}

Kubelet.syncLoopIteration

pkg/kubelet/kubelet.go
// syncLoopIteration 负责从各个 channel 中读取信息,并将 pod 分发到指定的 handler
// 包括:  
//   1. configCh:       从这个 channel 读取配置变更事件  
//   2. handler:        使用 SyncHandler 分发 pod
//   3. syncCh:         从这个 channel 读取周期同步事件  
//   4. housekeepingCh: 从这个 channel 读取 housekeeping 事件  
//   5. plegCh:         从这个 channel 读取 PLEG 更新情况  
//  
// 同时还包括 liveness manager 的更新事件  
//  
// 具体的工作流就是从 channel 中读取消息,处理对应事件,然后在 sync loop monitor 中更新时间戳  
//  
// 这里需要注意的是,如果同时有多个 channel 可以读取,select 中的 case 语句是以伪随机顺序进行评估的  
// 换句话说,case 以随机顺序执行,如果有多个 channel 可以读取,不能假设 case 语句是按顺序执行的  
//  
// 根据这一点,实际上没有特定的顺序,不同的 channel 处理以下事情:  
//  
//  - configCh: 将需要修改的 pod 分发到事件类型对应的 handler 中  
//  - plegCh: 更新 runtime 缓存,同步 pod
//  - syncCh: 同步所有待同步的 pod
//  - housekeepingCh: 触发 pod 的清理  
//  - health manager: 同步运行失败或健康检测失败的 pod
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,  
   syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {  
   select {  
   case u, open := <-configCh:  
      // 从配置源更新;将其分发到正确的 handler
      if !open {  
         klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")  
         return false  
      }  
  
      switch u.Op {  
      case kubetypes.ADD:  
         klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))      
         // kubelet 重启后,会将所有现有的 pod 视为新 pod 添加到 ADD 中  
         // 这些 pod 会通过 admission 过程,并 **有可能** 被拒绝  
         // 一旦我们有 checkpointing,这个问题就可以解决了  
         handler.HandlePodAdditions(u.Pods)  
      case kubetypes.UPDATE:  
         klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))  
         handler.HandlePodUpdates(u.Pods)  
      case kubetypes.REMOVE:  
         klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))  
         handler.HandlePodRemoves(u.Pods)  
      case kubetypes.RECONCILE:  
         klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))  
         handler.HandlePodReconcile(u.Pods)  
      case kubetypes.DELETE:  
         klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))  
         // DELETE 会被视为 UPDATE,因为是优雅删除  
         handler.HandlePodUpdates(u.Pods)  
      case kubetypes.SET:  
         klog.ErrorS(nil, "Kubelet does not support snapshot update")  
      default:  
         klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)  
      }  
  
      // 将处理后的资源添加到 sourcesReady 中  
      kl.sourcesReady.AddSource(u.Source)  
  
   case e := <-plegCh:  
      // 过滤掉不需要同步的事件  
      if isSyncPodWorthy(e) {  
         // PLEG event for a pod; sync it.  
         if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {  
            klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)  
            handler.HandlePodSyncs([]*v1.Pod{pod})  
         } else {  
            // If the pod no longer exists, ignore the event.  
            klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)  
         }  
      }  
  
      // 如果容器死亡,则触发容器清理  
      if e.Type == pleg.ContainerDied {  
         if containerID, ok := e.Data.(string); ok {  
            kl.cleanUpContainersInPod(e.ID, containerID)  
         }  
      }  
   case <-syncCh:  
      // Sync pods waiting for sync  
      //  同步所有待同步的 pod      
      podsToSync := kl.getPodsToSync()  
      if len(podsToSync) == 0 {  
         break  
      }  
      klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))  
      handler.HandlePodSyncs(podsToSync)  
   case update := <-kl.livenessManager.Updates():  
      // liveness 的事件,如果检测结果为失败,则触发探针同步  
      if update.Result == proberesults.Failure {  
         handleProbeSync(kl, update, handler, "liveness", "unhealthy")  
      }  
   case update := <-kl.readinessManager.Updates():  
      // readiness 的事件  
      ready := update.Result == proberesults.Success  
      // 设置容器的 readiness 状态  
      kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)  
  
      // readiness 状态标志,然后触发探针同步  
      status := ""  
      if ready {  
         status = "ready"  
      }  
      handleProbeSync(kl, update, handler, "readiness", status)  
   case update := <-kl.startupManager.Updates():  
      // startup 的事件  
      started := update.Result == proberesults.Success  
      // 设置容器的 startup 状态  
      kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)  
  
      // startup 状态标志,然后触发探针同步  
      status := "unhealthy"  
      if started {  
         status = "started"  
      }  
      handleProbeSync(kl, update, handler, "startup", status)  
   case <-housekeepingCh:  
      // 判断是否所有资源都已经准备好  
      if !kl.sourcesReady.AllReady() {  
         // 如果资源还没有准备好,或者 volume manager 还没有同步状态,则跳过 housekeeping         
         // 因为我们可能会意外地从未准备好的资源中删除 pod         
         klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")  
      } else {  
         start := time.Now()  
         klog.V(4).InfoS("SyncLoop (housekeeping)")  
         // 执行 pod 清理  
         if err := handler.HandlePodCleanups(ctx); err != nil {  
            klog.ErrorS(err, "Failed cleaning pods")  
         }  
         duration := time.Since(start)  
         if duration > housekeepingWarningDuration {  
            klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())  
         }  
         klog.V(4).InfoS("SyncLoop (housekeeping) end")  
      }  
   }  
   return true  
}

最后更新于

这有帮助吗?