启动篇-03RunKubelet
RunKubelet
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
// 上面设置好所需参数,创建并初始化 Kubelet
k, err := createAndInitKubelet(kubeServer,
kubeDeps,
hostname,
hostnameOverridden,
nodeName,
nodeIPs)
if err != nil {
return fmt.Errorf("failed to create kubelet: %w", err)
}
...
// 如果设置只运行一次,运行完就退出,否则执行 startKubelet,进入下一阶段
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
...
}createAndInitKubelet
func createAndInitKubelet(kubeServer *options.KubeletServer,
kubeDeps *kubelet.Dependencies,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k, err = kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.ImageCredentialProviderConfigFile,
kubeServer.ImageCredentialProviderBinDir,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.KernelMemcgNotification,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.NodeStatusMaxImages,
kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault)
if err != nil {
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
return k, nil
}NewMainKubelet
// 实例化一个新的 Kubelet 对象以及所有所需的内部模块,这里不应该初始化 Kubelet 及其模块
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
...
) (*Kubelet, error) {
...
return klet, nil
}serviceLister: 获取 service 相关信息
nodeLister: 获取 node 相关信息
oomWatcher: 监控 pod 内存是否发生 OOM
cloudResourceSyncManager: 云资源同步管理器
secretManager: secret 管理器
configMapManager: configMap 管理器
livenessManager: liveness 管理器
readinessManager: readiness 管理器
startupManager: startup 管理器
podManager: pod 管理器,管理 pod 的生命周期
statusManager: status 管理器,与 apiserver 同步 pod 状态
resourceAnalyzer: 用于收集资源使用情况
runtimeService: 用于与 CRI 通信
runtimeClassManager: runtimeClass 管理器
containerLogManager: containerLog 管理器,管理容器日志
reasonCache: 缓存 pod 的最新的启动失败原因
podWorkers: 管理 pod 的同步
runtime: 设置容器运行时
runtimeCache: 容器运行时缓存
StatsProvider: 用于获取 pod 的使用情况,判断使用 Cadvisor 还是 CRI
containerGC: 对容器进行垃圾回收
imageManager: image 管理器,管理容器镜像
probeManager:probe 管理器,管理探针
volumePluginMgr: 管理 volume 插件
pluginManager: plugin 管理器
volumeManager: volume 管理器
evictionManager: eviction 管理器
shutdownManager: shutdown 管理器
usernsManager: userns 管理器
startKubelet
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// 启动 kubelet,进入下一阶段
// podCfg.Updates() 是一个 Channel,会传递 Pod 的最新信息
go k.Run(podCfg.Updates())
// 启动 kubelet server 提供 API 服务
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
// kubelet 的只读 server
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}最后更新于
这有帮助吗?