启动篇-02配置设置与检查

Run

cmd/kubelet/app/server.go
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {  
   ...
   if err := run(ctx, s, kubeDeps, featureGate); err != nil {  
      return fmt.Errorf("failed to run Kubelet: %w", err)  
   }  
   return nil  
}

run

cmd/kubelet/app/server.go
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {  
   // 根据初始化的 KubeletServer 设置全局 feature gates
   err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)  
   if err != nil {  
      return err  
   }  
   // 验证初始化的 KubeletServer(我们首先设置 feature gates,因为此验证依赖于 feature gates)  
   if err := options.ValidateKubeletServer(s); err != nil {  
      return err  
   }  
  
   // MemoryQoS 使用了 cgroups v1 的警告  
   if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&  
      !isCgroup2UnifiedMode() {  
      klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")  
   }  
   // 获取 Kubelet 的锁文件,当 --exit-on-lock-contention 设置为 true 表示当发生锁文件竞争时 kubelet 可以退出  
   if s.ExitOnLockContention && s.LockFilePath == "" {  
      return errors.New("cannot exit on lock file contention: no lock file specified")  
   }  
   ...
  
   // 使用 /configz 端口注册当前配置  
   err = initConfigz(&s.KubeletConfiguration)  
   if err != nil {  
      klog.ErrorS(err, "Failed to register kubelet configuration with configz")  
   }  
   // 显示或隐藏 metrics 的版本  
   if len(s.ShowHiddenMetricsForVersion) > 0 {  
      metrics.SetShowHidden()  
   }  
  
   // 做一系列检查  
   // 检查启动模式是否为 standalone 模式  
   // 这个模式下 kubelet 不会与 apiserver 交互,主要用于 kubelet 的调试  
   standaloneMode := true  
   if len(s.KubeConfig) > 0 {  
      standaloneMode = false  
   }  
  
   // 检查 kubeDeps 是否为空,为空则初始化默认配置  
   if kubeDeps == nil {  
      kubeDeps, err = UnsecuredDependencies(s, featureGate)  
      if err != nil {  
         return err  
      }  
   }  
  
   // 检查 kubeDeps 的 cloud provider 配置  
   if kubeDeps.Cloud == nil {  
      ...
   }  
  
   // 设置 hostName
   hostName, err := nodeutil.GetHostname(s.HostnameOverride)  
   if err != nil {  
      return err  
   }  
   // 设置 nodeName
   nodeName, err := getNodeName(kubeDeps.Cloud, hostName)  
   if err != nil {  
      return err  
   }  
  
   // 如果是 standalone 模式,把所有 client 设置为 nil   
   // 说明 kubelet 的 client 有 KubeClient、EventClient 和 HeartbeatClient 三种  
   switch {  
   case standaloneMode:  
      kubeDeps.KubeClient = nil  
      kubeDeps.EventClient = nil  
      kubeDeps.HeartbeatClient = nil  
      klog.InfoS("Standalone mode, no API client")  
  
   case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:  
      // 构建 KubeClient 配置  
      clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)  
      if err != nil {  
         return err  
      }  
      if onHeartbeatFailure == nil {  
         return errors.New("onHeartbeatFailure must be a valid function other than nil")  
      }  
      kubeDeps.OnHeartbeatFailure = onHeartbeatFailure  
  
      // 创建 KubeClient      
      kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)  
      if err != nil {  
         return fmt.Errorf("failed to initialize kubelet client: %w", err)  
      }  
  
      // 创建 EventClient      
      eventClientConfig := *clientConfig  
      eventClientConfig.QPS = float32(s.EventRecordQPS)  
      eventClientConfig.Burst = int(s.EventBurst)  
      kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)  
      if err != nil {  
         return fmt.Errorf("failed to initialize kubelet event client: %w", err)  
      }  
  

      // 创建 heartbeatClient,包括 Timeout 和 QPS
      heartbeatClientConfig := *clientConfig  
      heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration  
      // The timeout is the minimum of the lease duration and status update frequency  
      leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second  
      if heartbeatClientConfig.Timeout > leaseTimeout {  
         heartbeatClientConfig.Timeout = leaseTimeout  
      }  
  
      heartbeatClientConfig.QPS = float32(-1)  
      kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)  
      if err != nil {  
         return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)  
      }  
   }  
  
   // 初始化 Auth   
   if kubeDeps.Auth == nil {  
      auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)  
      if err != nil {  
         return err  
      }  
      kubeDeps.Auth = auth  
      runAuthenticatorCAReload(ctx.Done())  
   }  
  
   // 设置 cgroupRoot   
   var cgroupRoots []string  
   nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)  
   cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)  
   kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)  
   if err != nil {  
      klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)  
   } else if kubeletCgroup != "" {  
      cgroupRoots = append(cgroupRoots, kubeletCgroup)  
   }  
  
   if s.RuntimeCgroups != "" {  
      // RuntimeCgroups is optional, so ignore if it isn't specified  
      cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)  
   }  
  
   if s.SystemCgroups != "" {  
      // SystemCgroups is optional, so ignore if it isn't specified  
      cgroupRoots = append(cgroupRoots, s.SystemCgroups)  
   }  
  
   // 设置 cAdvisor   
   if kubeDeps.CAdvisorInterface == nil {  
      imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)  
      kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint), s.LocalStorageCapacityIsolation)  
      if err != nil {  
         return err  
      }  
   }  
  
   // Setup event recorder if required.  
   makeEventRecorder(kubeDeps, nodeName)  
  
   // 设置 ContainerManager   
   if kubeDeps.ContainerManager == nil {  
      if s.CgroupsPerQOS && s.CgroupRoot == "" {  
         klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")  
         s.CgroupRoot = "/"  
      }  
  
      // 获取机器信息  
      machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()  
      if err != nil {  
         return err  
      }  
      // 设置系统预留 CPU      
      reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)  
      if err != nil {  
         return err  
      }  
      if reservedSystemCPUs.Size() > 0 {  
         // at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok  
         klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)  
         if s.KubeReserved != nil {  
            delete(s.KubeReserved, "cpu")  
         }  
         if s.SystemReserved == nil {  
            s.SystemReserved = make(map[string]string)  
         }  
         s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())  
         klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)  
      }  
  
      // 设置 Kubernetes 预留资源  
      kubeReserved, err := parseResourceList(s.KubeReserved)  
      if err != nil {  
         return err  
      }  
      // 设置系统预留资源  
      systemReserved, err := parseResourceList(s.SystemReserved)  
      if err != nil {  
         return err  
      }  
      // 强制驱逐阈值  
      var hardEvictionThresholds []evictionapi.Threshold  
      // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.  
      if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {  
         hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)  
         if err != nil {  
            return err  
         }  
      }  
      // 实验性 QOS 预留资源  
      experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)  
      if err != nil {  
         return err  
      }  
  
      // CPU 管理策略选项  
      var cpuManagerPolicyOptions map[string]string  
      if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {  
         cpuManagerPolicyOptions = s.CPUManagerPolicyOptions  
      } else if s.CPUManagerPolicyOptions != nil {  
         return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",  
            s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)  
      }  
  
      // topology 管理策略选项  
      var topologyManagerPolicyOptions map[string]string  
      if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {  
         if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManagerPolicyOptions) {  
            topologyManagerPolicyOptions = s.TopologyManagerPolicyOptions  
         } else if s.TopologyManagerPolicyOptions != nil {  
            return fmt.Errorf("topology manager policy options %v require feature gates %q, %q enabled",  
               s.TopologyManagerPolicyOptions, features.TopologyManager, features.TopologyManagerPolicyOptions)  
         }  
      }  
  
      kubeDeps.ContainerManager, err = cm.NewContainerManager(  
         kubeDeps.Mounter,  
         kubeDeps.CAdvisorInterface,  
         cm.NodeConfig{  
            RuntimeCgroupsName:    s.RuntimeCgroups,  
            SystemCgroupsName:     s.SystemCgroups,  
            KubeletCgroupsName:    s.KubeletCgroups,  
            KubeletOOMScoreAdj:    s.OOMScoreAdj,  
            CgroupsPerQOS:         s.CgroupsPerQOS,  
            CgroupRoot:            s.CgroupRoot,  
            CgroupDriver:          s.CgroupDriver,  
            KubeletRootDir:        s.RootDirectory,  
            ProtectKernelDefaults: s.ProtectKernelDefaults,  
            NodeAllocatableConfig: cm.NodeAllocatableConfig{  
               KubeReservedCgroupName:   s.KubeReservedCgroup,  
               SystemReservedCgroupName: s.SystemReservedCgroup,  
               EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),  
               KubeReserved:             kubeReserved,  
               SystemReserved:           systemReserved,  
               ReservedSystemCPUs:       reservedSystemCPUs,  
               HardEvictionThresholds:   hardEvictionThresholds,  
            },  
            QOSReserved:                              *experimentalQOSReserved,  
            CPUManagerPolicy:                         s.CPUManagerPolicy,  
            CPUManagerPolicyOptions:                  cpuManagerPolicyOptions,  
            CPUManagerReconcilePeriod:                s.CPUManagerReconcilePeriod.Duration,  
            ExperimentalMemoryManagerPolicy:          s.MemoryManagerPolicy,  
            ExperimentalMemoryManagerReservedMemory:  s.ReservedMemory,  
            ExperimentalPodPidsLimit:                 s.PodPidsLimit,  
            EnforceCPULimits:                         s.CPUCFSQuota,  
            CPUCFSQuotaPeriod:                        s.CPUCFSQuotaPeriod.Duration,  
            ExperimentalTopologyManagerPolicy:        s.TopologyManagerPolicy,  
            ExperimentalTopologyManagerScope:         s.TopologyManagerScope,  
            ExperimentalTopologyManagerPolicyOptions: topologyManagerPolicyOptions,  
         },  
         s.FailSwapOn,  
         kubeDeps.Recorder,  
         kubeDeps.KubeClient,  
      )  
  
      if err != nil {  
         return err  
      }  
   }  
  
   // 设置 Pod 启动延时 Tracker   
   if kubeDeps.PodStartupLatencyTracker == nil {  
      kubeDeps.PodStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()  
   }  
  
   // 设置 oomAdjuster   
   oomAdjuster := kubeDeps.OOMAdjuster  
   if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {  
      klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)  
   }  
  
   // 在执行 RunKubelet 之前初始化 Runtime Service   
   err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)  
   if err != nil {  
      return err  
   }  
  
   // RunKubelet,进入下一阶段
   if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {  
      return err  
   }  
  
   // 启动健康检查服务  
   if s.HealthzPort > 0 {  
      mux := http.NewServeMux()  
      healthz.InstallHandler(mux)  
      go wait.Until(func() {  
         err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)  
         if err != nil {  
            klog.ErrorS(err, "Failed to start healthz server")  
         }  
      }, 5*time.Second, wait.NeverStop)  
   }  
  
   if s.RunOnce {  
      return nil  
   }  
  
   // 如果使用了 systemd,通知它 kubelet 启动了  
   go daemon.SdNotify(false, "READY=1")  
  
   select {  
   case <-done:  
      break  
   case <-ctx.Done():  
      break  
   }  
  
   return nil  
}

最后更新于

这有帮助吗?