启动篇-02配置设置与检查
Run
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
...
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %w", err)
}
return nil
}run
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// 根据初始化的 KubeletServer 设置全局 feature gates
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// 验证初始化的 KubeletServer(我们首先设置 feature gates,因为此验证依赖于 feature gates)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// MemoryQoS 使用了 cgroups v1 的警告
if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
!isCgroup2UnifiedMode() {
klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
}
// 获取 Kubelet 的锁文件,当 --exit-on-lock-contention 设置为 true 表示当发生锁文件竞争时 kubelet 可以退出
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
...
// 使用 /configz 端口注册当前配置
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}
// 显示或隐藏 metrics 的版本
if len(s.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// 做一系列检查
// 检查启动模式是否为 standalone 模式
// 这个模式下 kubelet 不会与 apiserver 交互,主要用于 kubelet 的调试
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
// 检查 kubeDeps 是否为空,为空则初始化默认配置
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
if err != nil {
return err
}
}
// 检查 kubeDeps 的 cloud provider 配置
if kubeDeps.Cloud == nil {
...
}
// 设置 hostName
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
// 设置 nodeName
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// 如果是 standalone 模式,把所有 client 设置为 nil
// 说明 kubelet 的 client 有 KubeClient、EventClient 和 HeartbeatClient 三种
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
// 构建 KubeClient 配置
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
if err != nil {
return err
}
if onHeartbeatFailure == nil {
return errors.New("onHeartbeatFailure must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
// 创建 KubeClient
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %w", err)
}
// 创建 EventClient
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %w", err)
}
// 创建 heartbeatClient,包括 Timeout 和 QPS
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
}
}
// 初始化 Auth
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}
// 设置 cgroupRoot
var cgroupRoots []string
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
if s.RuntimeCgroups != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
}
if s.SystemCgroups != "" {
// SystemCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
// 设置 cAdvisor
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint), s.LocalStorageCapacityIsolation)
if err != nil {
return err
}
}
// Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName)
// 设置 ContainerManager
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
// 获取机器信息
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
return err
}
// 设置系统预留 CPU
reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
if err != nil {
return err
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
}
// 设置 Kubernetes 预留资源
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
// 设置系统预留资源
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
// 强制驱逐阈值
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
// 实验性 QOS 预留资源
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
// CPU 管理策略选项
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
// topology 管理策略选项
var topologyManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManagerPolicyOptions) {
topologyManagerPolicyOptions = s.TopologyManagerPolicyOptions
} else if s.TopologyManagerPolicyOptions != nil {
return fmt.Errorf("topology manager policy options %v require feature gates %q, %q enabled",
s.TopologyManagerPolicyOptions, features.TopologyManager, features.TopologyManagerPolicyOptions)
}
}
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
KubeletOOMScoreAdj: s.OOMScoreAdj,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
CPUManagerPolicy: s.CPUManagerPolicy,
CPUManagerPolicyOptions: cpuManagerPolicyOptions,
CPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
ExperimentalTopologyManagerPolicyOptions: topologyManagerPolicyOptions,
},
s.FailSwapOn,
kubeDeps.Recorder,
kubeDeps.KubeClient,
)
if err != nil {
return err
}
}
// 设置 Pod 启动延时 Tracker
if kubeDeps.PodStartupLatencyTracker == nil {
kubeDeps.PodStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
}
// 设置 oomAdjuster
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}
// 在执行 RunKubelet 之前初始化 Runtime Service
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)
if err != nil {
return err
}
// RunKubelet,进入下一阶段
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
// 启动健康检查服务
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
return nil
}
// 如果使用了 systemd,通知它 kubelet 启动了
go daemon.SdNotify(false, "READY=1")
select {
case <-done:
break
case <-ctx.Done():
break
}
return nil
}最后更新于
这有帮助吗?