启动篇-05PLEG

PLEG 的创建

pkg/kubelet/kubelet.go
func NewMainKubelet(...) (*Kubelet, error) {
	...
	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
	...
}

NewGenericPLEG

pkg/kubelet/pleg/generic.go
// NewGenericPLEG 实例化一个新的 GenericPLEG 对象并返回它  
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,  
   relistDuration *RelistDuration, cache kubecontainer.Cache,  
   clock clock.Clock) PodLifecycleEventGenerator {  
   return &GenericPLEG{  
      relistDuration: relistDuration,  
      runtime:        runtime,  
      eventChannel:   eventChannel,  
      podRecords:     make(podRecords),  
      cache:          cache,  
      clock:          clock,  
   }  
}

PLEG 的启动

pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {  
   ...
   kl.pleg.Start()  
   ...
}

PLEG.Start

pkg/kubelet/pleg/generic.go
// Start 生成一个 goroutine 来定期 relist
func (g *GenericPLEG) Start() {  
   g.runningMu.Lock()  
   defer g.runningMu.Unlock()  
   if !g.isRunning {  
      g.isRunning = true  
      g.stopCh = make(chan struct{})  
      go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)  
   }  
}

PLEG.Relist

pkg/kubelet/pleg/generic.go
// Relist 会查询容器运行时的 pod/container 列表,将其与内部的 pod/container 进行比较,并生成对应 event
func (g *GenericPLEG) Relist() {  
   g.relistLock.Lock()  
   defer g.relistLock.Unlock()  
  
   ctx := context.Background()  
   klog.V(5).InfoS("GenericPLEG: Relisting")  
  
   if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {  
      metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))  
   }  
  
   timestamp := g.clock.Now()  
   defer func() {  
      metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))  
   }()  
  
   // 获取所有的 pod   
   podList, err := g.runtime.GetPods(ctx, true)  
   if err != nil {  
      klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")  
      return  
   }  
  
   g.updateRelistTime(timestamp)  
  
   pods := kubecontainer.Pods(podList)  
   // 更新运行中的 pod 和 container 数量  
   updateRunningPodAndContainerMetrics(pods)  
   g.podRecords.setCurrent(pods)  
  
   // 比较旧的和当前的 pod,并生成 event   
   eventsByPodID := map[types.UID][]*PodLifecycleEvent{}  
   for pid := range g.podRecords {  
      oldPod := g.podRecords.getOld(pid)  
      pod := g.podRecords.getCurrent(pid)  
      // Get all containers in the old and the new pod.  
      allContainers := getContainersFromPods(oldPod, pod)  
      for _, container := range allContainers {  
         events := computeEvents(oldPod, pod, &container.ID)  
         for _, e := range events {  
            updateEvents(eventsByPodID, e)  
         }  
      }  
   }  
  
   var needsReinspection map[types.UID]*kubecontainer.Pod  
   if g.cacheEnabled() {  
      needsReinspection = make(map[types.UID]*kubecontainer.Pod)  
   }  
    
   // 如果有与 pod 相关的 event,我们应该更新 podCache   
   for pid, events := range eventsByPodID {  
      pod := g.podRecords.getCurrent(pid)  
      if g.cacheEnabled() {  
         // updateCache() 会检查 pod 并更新缓存  
         // 如果在检查期间发生错误,我们希望 PLEG 在下一次 relist 时再次重试  
         // 因此,我们不会更新与 pod 相关的 podRecord,以便在下一次 relist 时再次检测到更改  
         if err, updated := g.updateCache(ctx, pod, pid); err != nil {  
            // 依赖 updateCache 调用 GetPodStatus 来记录实际的错误  
            klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))  
  
            // 确保在下一次 relisting 时再次检查 pod
            needsReinspection[pid] = pod  
  
            continue  
         } else {  
            // 这个 pod 在要重新检查的列表中,并且我们这样做是因为它有 event,因此从列表中删除它  
            delete(g.podsToReinspect, pid)  
            if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {  
               if !updated {  
                  continue  
               }  
            }  
         }  
      }  

      // 更新内部存储并发送 event
      g.podRecords.update(pid)  
  
      // 从 containerId 到 exit code 的映射;用作临时缓存以供查找  
      containerExitCode := make(map[string]int)  
  
      for i := range events {  
         // 过滤掉不可靠的 event,其他组件也不使用  
         if events[i].Type == ContainerChanged {  
            continue  
         }  
         select {  
         case g.eventChannel <- events[i]:  
         default:  
            metrics.PLEGDiscardEvents.Inc()  
            klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")  
         }  
         // 在特定 event 中完成时记录容器的退出代码  
         if events[i].Type == ContainerDied {  
            // 在第一次出现 ContainerDied event 时填充 containerExitCode map            
            if len(containerExitCode) == 0 && pod != nil && g.cache != nil {  
               // 获取更新后的 podStatus               
               status, err := g.cache.Get(pod.ID)  
               if err == nil {  
                  for _, containerStatus := range status.ContainerStatuses {  
                     containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode  
                  }  
               }  
            }  
            if containerID, ok := events[i].Data.(string); ok {  
               if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {  
                  klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)  
               }  
            }  
         }  
      }  
   }  
  
   if g.cacheEnabled() {  
      // 重新检查上一次 relist 期间检查失败的任何 pod
      if len(g.podsToReinspect) > 0 {  
         klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")  
         for pid, pod := range g.podsToReinspect {  
            if err, _ := g.updateCache(ctx, pod, pid); err != nil {  
               // Rely on updateCache calling GetPodStatus to log the actual error.  
               klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))  
               needsReinspection[pid] = pod  
            }  
         }  
      }  
  
      // 更新缓存时间戳。需要在所有 pod 都正确更新缓存 **之后** 执行  
      g.cache.UpdateTime(timestamp)  
   }  
  
   // 确保下次调用 relist 时保留需要重新检查的 pod 列表  
   g.podsToReinspect = needsReinspection  
}

最后更新于

这有帮助吗?