启动篇-04创建 KubeAPIServer
CreateServerChain
func CreateServerChain(completedOptions completedServerRunOptions)(...){
...
apiExtensionsConfig, err := createAPIExtensionsConfig()
...
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
...
}CreateKubeAPIServer
// CreateKubeAPIServer 创建并连接一个可用的 kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}New
// New 根据给定的配置创建一个新的 ControlPlane 实例
// 如果某些配置字段未设置,则将其设置为默认值
// 必须指定某些配置字段,包括:KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// metadata 和 keys 预期只会在重启时更改,因此我们只需立即对其 marshal 并提供缓存的 JSON 即可
md, err := serviceaccount.NewOpenIDMetadata(
c.ExtraConfig.ServiceAccountIssuerURL,
c.ExtraConfig.ServiceAccountJWKSURI,
c.GenericConfig.ExternalAddress,
c.ExtraConfig.ServiceAccountPublicKeys,
)
if err != nil {
// 如果发生错误,则跳过安装 endpoint 并记录错误,但继续进行
// 不返回错误,因为 metadata 响应需要额外的、不兼容的命令行选项验证
msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
" enabled. Error: %v", err)
if c.ExtraConfig.ServiceAccountIssuerURL != "" {
// 如果设置了 issuer URL 并启用了该 feature gate,则用户可能希望启用该特性
// 如果没有启用 feature gate 并且未设置 issuer URL,则用户可能不希望启用该特性
// 我们将前一种情况记录为 Error,后一种情况记录为 Info
klog.Error(msg)
} else {
klog.Info(msg)
}
} else {
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
}
// 创建 Instance 实例
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// 装载 legacy API
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
// TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery
discoveryClientForAdmissionRegistration := clientset.Discovery()
// 这里的顺序在发现中会被保留
// 如果这些组中存在具有相同名称的资源(例如“deployments.apps”和“deployments.extensions”),则此列表中的顺序决定了未限定资源名称(例如“deployments”)应该优先选择的组
// 本地发现使用此优先级顺序,但它最终在 `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go 中聚合了特定的优先级
restStorageProviders := []RESTStorageProvider{
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// [basefas] 保持 apps 在 extensions 之后,以便旧版客户端解析共享资源名称的扩展版本
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
resourcerest.RESTStorageProvider{},
}
// 装载 API
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-hookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
go controller.Run(ctx, 1)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
controller := lease.NewController(
clock.RealClock{},
kubeClient,
holderIdentity,
int32(IdentityLeaseDurationSeconds),
nil,
IdentityLeaseRenewIntervalPeriod,
leaseName,
metav1.NamespaceSystem,
labelAPIServerHeartbeat)
go controller.Run(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
go apiserverleasegc.NewAPIServerLeaseGC(
kubeClient,
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
).Run(hookContext.StopCh)
return nil
})
}
m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
go legacytokentracking.NewController(kubeClient).Run(hookContext.StopCh)
return nil
})
return m, nil
}最后更新于
这有帮助吗?