启动篇-03RunKubelet

RunKubelet

cmd/kubelet/app/server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	...
	// 上面设置好所需参数,创建并初始化 Kubelet
	k, err := createAndInitKubelet(kubeServer,  
	   kubeDeps,  
	   hostname,  
	   hostnameOverridden,  
	   nodeName,  
	   nodeIPs)  
	if err != nil {  
	   return fmt.Errorf("failed to create kubelet: %w", err)  
	}

	...
	// 如果设置只运行一次,运行完就退出,否则执行 startKubelet,进入下一阶段  
	if runOnce {  
	   if _, err := k.RunOnce(podCfg.Updates()); err != nil {  
	      return fmt.Errorf("runonce failed: %w", err)  
	   }  
	   klog.InfoS("Started kubelet as runonce")  
	} else {  
	   startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)  
	   klog.InfoS("Started kubelet")  
	}
	...
}

createAndInitKubelet

cmd/kubelet/app/server.go
func createAndInitKubelet(kubeServer *options.KubeletServer,  
   kubeDeps *kubelet.Dependencies,  
   hostname string,  
   hostnameOverridden bool,  
   nodeName types.NodeName,  
   nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop  
   // up into "per source" synchronizations  
  
   k, err = kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,  
      kubeDeps,  
      &kubeServer.ContainerRuntimeOptions,  
      hostname,  
      hostnameOverridden,  
      nodeName,  
      nodeIPs,  
      kubeServer.ProviderID,  
      kubeServer.CloudProvider,  
      kubeServer.CertDirectory,  
      kubeServer.RootDirectory,  
      kubeServer.ImageCredentialProviderConfigFile,  
      kubeServer.ImageCredentialProviderBinDir,  
      kubeServer.RegisterNode,  
      kubeServer.RegisterWithTaints,  
      kubeServer.AllowedUnsafeSysctls,  
      kubeServer.ExperimentalMounterPath,  
      kubeServer.KernelMemcgNotification,  
      kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,  
      kubeServer.MinimumGCAge,  
      kubeServer.MaxPerPodContainerCount,  
      kubeServer.MaxContainerCount,  
      kubeServer.MasterServiceNamespace,  
      kubeServer.RegisterSchedulable,  
      kubeServer.KeepTerminatedPodVolumes,  
      kubeServer.NodeLabels,  
      kubeServer.NodeStatusMaxImages,  
      kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault)  
   if err != nil {  
      return nil, err  
   }  
  
   k.BirthCry()  
  
   k.StartGarbageCollection()  
  
   return k, nil  
}

NewMainKubelet

pkg/kubelet/kubelet.go
// 实例化一个新的 Kubelet 对象以及所有所需的内部模块,这里不应该初始化 Kubelet 及其模块
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,  
	...
) (*Kubelet, error) {
	...
    return klet, nil  
}
  • serviceLister: 获取 service 相关信息

  • nodeLister: 获取 node 相关信息

  • oomWatcher: 监控 pod 内存是否发生 OOM

  • cloudResourceSyncManager: 云资源同步管理器

  • secretManager: secret 管理器

  • configMapManager: configMap 管理器

  • livenessManager: liveness 管理器

  • readinessManager: readiness 管理器

  • startupManager: startup 管理器

  • podManager: pod 管理器,管理 pod 的生命周期

  • statusManager: status 管理器,与 apiserver 同步 pod 状态

  • resourceAnalyzer: 用于收集资源使用情况

  • runtimeService: 用于与 CRI 通信

  • runtimeClassManager: runtimeClass 管理器

  • containerLogManager: containerLog 管理器,管理容器日志

  • reasonCache: 缓存 pod 的最新的启动失败原因

  • podWorkers: 管理 pod 的同步

  • runtime: 设置容器运行时

  • runtimeCache: 容器运行时缓存

  • StatsProvider: 用于获取 pod 的使用情况,判断使用 Cadvisor 还是 CRI

  • containerGC: 对容器进行垃圾回收

  • imageManager: image 管理器,管理容器镜像

  • probeManager:probe 管理器,管理探针

  • volumePluginMgr: 管理 volume 插件

  • pluginManager: plugin 管理器

  • volumeManager: volume 管理器

  • evictionManager: eviction 管理器

  • shutdownManager: shutdown 管理器

  • usernsManager: userns 管理器

startKubelet

cmd/kubelet/app/server.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {  
   // 启动 kubelet,进入下一阶段
   // podCfg.Updates() 是一个 Channel,会传递 Pod 的最新信息  
   go k.Run(podCfg.Updates())  
  
   // 启动 kubelet server 提供 API 服务  
   if enableServer {  
      go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)  
   }  
   // kubelet 的只读 server   
   if kubeCfg.ReadOnlyPort > 0 {  
      go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))  
   }  
   if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {  
      go k.ListenAndServePodResources()  
   }  
}

最后更新于

这有帮助吗?