启动篇-05PLEG
PLEG 的创建
func NewMainKubelet(...) (*Kubelet, error) {
...
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{})
...
}NewGenericPLEG
// NewGenericPLEG 实例化一个新的 GenericPLEG 对象并返回它
func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
relistDuration *RelistDuration, cache kubecontainer.Cache,
clock clock.Clock) PodLifecycleEventGenerator {
return &GenericPLEG{
relistDuration: relistDuration,
runtime: runtime,
eventChannel: eventChannel,
podRecords: make(podRecords),
cache: cache,
clock: clock,
}
}PLEG 的启动
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
kl.pleg.Start()
...
}PLEG.Start
// Start 生成一个 goroutine 来定期 relist
func (g *GenericPLEG) Start() {
g.runningMu.Lock()
defer g.runningMu.Unlock()
if !g.isRunning {
g.isRunning = true
g.stopCh = make(chan struct{})
go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
}
}PLEG.Relist
// Relist 会查询容器运行时的 pod/container 列表,将其与内部的 pod/container 进行比较,并生成对应 event
func (g *GenericPLEG) Relist() {
g.relistLock.Lock()
defer g.relistLock.Unlock()
ctx := context.Background()
klog.V(5).InfoS("GenericPLEG: Relisting")
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}
timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()
// 获取所有的 pod
podList, err := g.runtime.GetPods(ctx, true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
return
}
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
// 更新运行中的 pod 和 container 数量
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)
// 比较旧的和当前的 pod,并生成 event
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// 如果有与 pod 相关的 event,我们应该更新 podCache
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() 会检查 pod 并更新缓存
// 如果在检查期间发生错误,我们希望 PLEG 在下一次 relist 时再次重试
// 因此,我们不会更新与 pod 相关的 podRecord,以便在下一次 relist 时再次检测到更改
if err, updated := g.updateCache(ctx, pod, pid); err != nil {
// 依赖 updateCache 调用 GetPodStatus 来记录实际的错误
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
// 确保在下一次 relisting 时再次检查 pod
needsReinspection[pid] = pod
continue
} else {
// 这个 pod 在要重新检查的列表中,并且我们这样做是因为它有 event,因此从列表中删除它
delete(g.podsToReinspect, pid)
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
if !updated {
continue
}
}
}
}
// 更新内部存储并发送 event
g.podRecords.update(pid)
// 从 containerId 到 exit code 的映射;用作临时缓存以供查找
containerExitCode := make(map[string]int)
for i := range events {
// 过滤掉不可靠的 event,其他组件也不使用
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
// 在特定 event 中完成时记录容器的退出代码
if events[i].Type == ContainerDied {
// 在第一次出现 ContainerDied event 时填充 containerExitCode map
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
// 获取更新后的 podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
}
}
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
}
}
if g.cacheEnabled() {
// 重新检查上一次 relist 期间检查失败的任何 pod
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err, _ := g.updateCache(ctx, pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
}
// 更新缓存时间戳。需要在所有 pod 都正确更新缓存 **之后** 执行
g.cache.UpdateTime(timestamp)
}
// 确保下次调用 relist 时保留需要重新检查的 pod 列表
g.podsToReinspect = needsReinspection
}最后更新于
这有帮助吗?