启动篇-03创建 APIExtensionsServer

createAPIExtensionsConfig

cmd/kube-apiserver/app/server.go
apiExtensionsConfig, err := createAPIExtensionsConfig(...)

createAPIExtensionsServer

cmd/kube-apiserver/app/server.go
func createAPIExtensionsServer(...) (*apiextensionsapiserver.CustomResourceDefinitions, error) {  
   return apiextensionsConfig.Complete().New(delegateAPIServer)  
}

New

vendor/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New 返回一个 CustomResourceDefinitions 实例  
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {  
   genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)  
   if err != nil {  
      return nil, err  
   }  
  
   // hasCRDInformerSyncedSignal 是一个信号,当 CRD informer 同步完成后,会关闭这个信号。  
   // 这个信号保证了在服务器安装所有已知的 HTTP 路径之前,对潜在的自定义资源端点的请求会返回 503 错误,而不是 404 错误。  
   hasCRDInformerSyncedSignal := make(chan struct{})  
   if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {  
      return nil, err  
   }  
  
   s := &CustomResourceDefinitions{  
      GenericAPIServer: genericServer,  
   }  
  
   apiResourceConfig := c.GenericConfig.MergedResourceConfig  
   // 创建 APIGroupInfo   
   apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)  
   storage := map[string]rest.Storage{}  
   // customresourcedefinitions  
   if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {  
      customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)  
      if err != nil {  
         return nil, err  
      }  
      storage[resource] = customResourceDefinitionStorage  
      storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)  
   }  
   if len(storage) > 0 {  
      apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage  
   }  
  
   if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {  
      return nil, err  
   }  
  
   // 创建 crd 的 client   
   crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)  
   if err != nil {  
      // it's really bad that this is leaking here, but until we can fix the test (which I'm pretty sure isn't even testing what it wants to test),  
      // we need to be able to move forward      
      return nil, fmt.Errorf("failed to create clientset: %v", err)  
   }  
   // 创建 crd 的 informer
   s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)  
  
   // 创建 crd 的 handler
   delegateHandler := delegationTarget.UnprotectedHandler()  
   if delegateHandler == nil {  
      delegateHandler = http.NotFoundHandler()  
   }  
  
   versionDiscoveryHandler := &versionDiscoveryHandler{  
      discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},  
      delegate:  delegateHandler,  
   }  
   groupDiscoveryHandler := &groupDiscoveryHandler{  
      discovery: map[string]*discovery.APIGroupHandler{},  
      delegate:  delegateHandler,  
   }  
   establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())  
   crdHandler, err := NewCustomResourceDefinitionHandler(  
      versionDiscoveryHandler,  
      groupDiscoveryHandler,  
      s.Informers.Apiextensions().V1().CustomResourceDefinitions(),  
      delegateHandler,  
      c.ExtraConfig.CRDRESTOptionsGetter,  
      c.GenericConfig.AdmissionControl,  
      establishingController,  
      c.ExtraConfig.ServiceResolver,  
      c.ExtraConfig.AuthResolverWrapper,  
      c.ExtraConfig.MasterCount,  
      s.GenericAPIServer.Authorizer,  
      c.GenericConfig.RequestTimeout,  
      time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,  
      apiGroupInfo.StaticOpenAPISpec,  
      c.GenericConfig.MaxRequestBodyBytes,  
   )  
   if err != nil {  
      return nil, err  
   }  
   s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)  
   s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)  
   s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)  
  
   // 创建 crd 的 controller   
   discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, genericServer.AggregatedDiscoveryGroupManager)  
   namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())  
   nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())  
   apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())  
   finalizingController := finalizer.NewCRDFinalizer(  
      s.Informers.Apiextensions().V1().CustomResourceDefinitions(),  
      crdClient.ApiextensionsV1(),  
      crdHandler,  
   )  
  
   // 创建 hook,用于启动 informer 和 controller   
   s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {  
      s.Informers.Start(context.StopCh)  
      return nil  
   })  
   s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {  
      // OpenAPIVersionedService 和 StaticOpenAPISpec 在 generic apiserver 的 PrepareRun() 中被填充。  
      // 他们一起在 generic apiserver 上提供 /openapi/v2 端点。generic apiserver 可能会选择不启用 OpenAPI,  
      // 通过有空的 openAPIConfig,因此 OpenAPIVersionedService 和 StaticOpenAPISpec 都是空的。  
      // 在这种情况下,不会运行 CRD OpenAPI controller。  
      if s.GenericAPIServer.StaticOpenAPISpec != nil {  
         if s.GenericAPIServer.OpenAPIVersionedService != nil {  
            openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())  
            go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)  
         }  
  
         if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {  
            openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())  
            go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)  
         }  
      }  
  
      // 启动 controller
      go namingController.Run(context.StopCh)  
      go establishingController.Run(context.StopCh)  
      go nonStructuralSchemaController.Run(5, context.StopCh)  
      go apiApprovalController.Run(5, context.StopCh)  
      go finalizingController.Run(5, context.StopCh)  
  
      discoverySyncedCh := make(chan struct{})  
      go discoveryController.Run(context.StopCh, discoverySyncedCh)  
      select {  
      case <-context.StopCh:  
      case <-discoverySyncedCh:  
      }  
  
      return nil  
   })  
   // 直到我们可以处理已经注册的所有 CRD 之前,我们不想报告健康。等待 informer 同步可以确保在开始之前列表将是有效的。  
   // 在启动后添加的 CRD 仍然可能存在竞争,直到可以处理已经存在的 CRD 之前,不会变成 healthy。  
   s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {  
      return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {  
         if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {  
            close(hasCRDInformerSyncedSignal)  
            return true, nil  
         }  
         return false, nil  
      }, context.StopCh)  
   })  
  
   return s, nil  
}

最后更新于

这有帮助吗?