启动篇-07启动 HTTP 服务

Run

cmd/kube-apiserver/app/server.go
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
	...
	return prepared.Run(stopCh)
}

prepared.Run

vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {  
	return s.runnable.Run(stopCh)  
}

preparedGenericAPIServer.Run

  • staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go

staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
// Run 生成 https server
// It only returns if stopCh is closed or the secure port cannot be listened on initially.
// 它只有在 stopCh 关闭或者无法监听安全端口时才会返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
	shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated

	// 在 stopCh 信号触发时,执行 s.Destroy() 函数
	defer s.Destroy()

	// 生成一个新的 goroutine,用于关闭 MuxAndDiscoveryComplete 信号
	// 注册发生在 generic api server 构造期间
	// 链中的最后一个 server 从前一个实例聚合信号
	go func() {
		for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
			select {
			case <-muxAndDiscoveryCompletedSignal:
				continue
			case <-stopCh:
				klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
				return
			}
		}
		s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
		klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
	}()

	go func() {
		defer delayedStopCh.Signal()
		defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())

		<-stopCh

		// 一旦启动关闭,/readyz 应该开始返回失败
		// 这为负载均衡器定义了一个窗口,该窗口由 ShutdownDelayDuration 定义,以便检测 /readyz 是否为红色,并停止将流量发送到此服务器
		shutdownInitiatedCh.Signal()
		klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())

		time.Sleep(s.ShutdownDelayDuration)
	}()

	// 在延迟的 stopCh 之后关闭 socket
	shutdownTimeout := s.ShutdownTimeout
	if s.ShutdownSendRetryAfter {
		// 当启用此模式时,我们将执行以下操作:
		// - 服务器将继续监听,直到所有现有请求(不包括活动的长时间运行的请求)都被耗尽
		// - 一旦耗尽,http Server Shutdown 将被调用,超时时间为 2s
		//   net/http 等待 1s 以响应 GO_AWAY 帧的对等方,因此我们应该等待至少 2s
		shutdownTimeout = 2 * time.Second
		klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
	}

	notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
	drainedCh := s.lifecycleSignals.InFlightRequestsDrained
	stopHttpServerCh := make(chan struct{})
	go func() {
		defer close(stopHttpServerCh)

		timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
		if s.ShutdownSendRetryAfter {
			timeToStopHttpServerCh = drainedCh.Signaled()
		}

		<-timeToStopHttpServerCh
	}()

	// 在任何请求到来之前启动 audit backend。这意味着我们必须在 http 服务器开始服务之前调用 Backend.Run
	// 否则 Backend.ProcessEvents 调用可能会阻塞。AuditBackend.Run 将在所有正在进行的请求耗尽后停止
	if s.AuditBackend != nil {
		if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
			return fmt.Errorf("failed to run the audit backend: %v", err)
		}
	}

	// 启动 http server
	stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
	if err != nil {
		return err
	}

	httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
	go func() {
		<-listenerStoppedCh
		httpServerStoppedListeningCh.Signal()
		klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
	}()

	// 一旦 ShutdownDelayDuration 过去并且预关闭钩子完成,我们就不再接受新的请求
	preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
	go func() {
		defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
		defer notAcceptingNewRequestCh.Signal()

		// 在关闭处理程序链之前等待延迟的 stopCh
		<-delayedStopCh.Signaled()

		// 此外,还要等待预关闭钩子也完成,因为其中一些需要发送 API 调用来清理自身(例如,lease reconcilers 从活动服务器中删除自身)
		<-preShutdownHooksHasStoppedCh.Signaled()
	}()

	go func() {
		defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
		defer drainedCh.Signal()

		// 在关闭处理程序链之前等待延迟的 stopCh(在调用 Wait 之后拒绝一切)
		<-notAcceptingNewRequestCh.Signaled()

		// 等待所有请求完成,这些请求受 RequestTimeout 变量的限制。
		// 一旦调用 HandlerChainWaitGroup.Wait,apiserver 将拒绝任何传入请求,该请求将通过 WithWaitGroup 过滤器返回 {503, Retry-After} 响应。
		// 相反,我们观察到传入请求会获得“connection refused”错误,这是因为在这一点上,我们已经调用了“Server.Shutdown”,并且 net/http 服务器已停止监听。
		// 这导致传入请求获得“connection refused”错误。
		// 另一方面,如果启用了“ShutdownSendRetryAfter”,则传入请求将被拒绝返回 {429, Retry-After} ,因为在耗尽了正在进行的请求之后,才会调用“Server.Shutdown”
		// TODO: can we consolidate these two modes of graceful termination?
		s.HandlerChainWaitGroup.Wait()
	}()

	klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
	<-stopCh

	// 直接运行关闭钩子。这包括从 kubernetes 端点注销(如果是 kube-apiserver)
	func() {
		defer func() {
			preShutdownHooksHasStoppedCh.Signal()
			klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
		}()
		err = s.RunPreShutdownHooks()
	}()
	if err != nil {
		return err
	}

	// 等待所有正在进行的请求耗尽,受 RequestTimeout 变量的限制
	<-drainedCh.Signaled()

	if s.AuditBackend != nil {
		s.AuditBackend.Shutdown()
		klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
	}

	// 等待 stoppedCh,当优雅终止(server.Shutdown)完成时关闭
	<-listenerStoppedCh
	<-stoppedCh

	klog.V(1).Info("[graceful-termination] apiserver is exiting")
	return nil
}

NonBlockingRun

staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
// NonBlockingRun 生成安全的 http server
// 如果无法监听安全端口,则返回错误
// 返回的 channel 在终止完成时关闭
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
	// 使用内部停止 channel 允许在错误时清理 listener
	internalStopCh := make(chan struct{})
	var stoppedCh <-chan struct{}
	var listenerStoppedCh <-chan struct{}
	if s.SecureServingInfo != nil && s.Handler != nil {
		var err error
		// 生成安全的 http server
		stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
		if err != nil {
			close(internalStopCh)
			return nil, nil, err
		}
	}

	// 现在 listener 已经绑定成功,调用者有责任关闭提供的 channel 以确保清理
	go func() {
		<-stopCh
		close(internalStopCh)
	}()

	// 运行启动后的 hook
	s.RunPostStartHooks(stopCh)

	if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
		klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
	}

	return stoppedCh, listenerStoppedCh, nil
}

Serve

staging/src/k8s.io/apiserver/pkg/server/secure_serving.go
// Serve 运行安全的 http server。它只会在证书无法加载或初始监听调用失败时失败。
// 实际的 server 循环(通过关闭 stopCh 可停止)在 go 程中运行,即 Serve 不会阻塞
// 它返回一个 stoppedCh,该 channel 在所有非劫持的活动请求都已处理时关闭
// 它返回一个 listenerStoppedCh,该 channel 在底层 http Server 停止监听时关闭
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
	if s.Listener == nil {
		return nil, nil, fmt.Errorf("listener must not be nil")
	}

	tlsConfig, err := s.tlsConfig(stopCh)
	if err != nil {
		return nil, nil, err
	}

	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig:      tlsConfig,

		IdleTimeout:       90 * time.Second, // matches http.DefaultTransport keep-alive timeout
		ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
	}

	// 在调查的集群中,至少 99% 的序列化资源都小于 256kb
	// 这应该足以容纳大多数 API POST 请求,且足够小,以允许每个连接缓冲区的大小乘以 `MaxConcurrentStreams`
	const resourceBody99Percentile = 256 * 1024

	http2Options := &http2.Server{
		IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
	}

	// 从 1MB 的默认值中缩小每个流的缓冲区和最大帧大小,同时仍然可以在单个帧中容纳大多数 API POST 请求
	http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
	http2Options.MaxReadFrameSize = resourceBody99Percentile

	// 使用覆盖的并发流设置,或明确使用默认值 250,以便我们可以适当地调整 MaxUploadBufferPerConnection
	if s.HTTP2MaxStreamsPerConnection > 0 {
		http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
	} else {
		http2Options.MaxConcurrentStreams = 250
	}

	// 将连接缓冲区大小从 1MB 的默认值增加到指定的并发流数量
	http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)

	if !s.DisableHTTP2 {
		// 将设置应用于 server
		if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
			return nil, nil, fmt.Errorf("error configuring http2: %v", err)
		}
	}

	// 使用 tlsHandshakeErrorWriter 处理 tls 握手错误的消息
	tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
	tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
	secureServer.ErrorLog = tlsErrorLogger

	klog.Infof("Serving securely on %s", secureServer.Addr)
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}

RunServer

staging/src/k8s.io/apiserver/pkg/server/secure_serving.go
// RunServer 产生一个 go-routine,一直运行,直到 stopCh 被关闭
// 它返回一个 stoppedCh,该 channel 在所有非劫持的活动请求都已处理时关闭
// 此函数不会阻塞
func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
	if ln == nil {
		return nil, nil, fmt.Errorf("listener must not be nil")
	}

	// Shutdown server gracefully.
	serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
	go func() {
		defer close(serverShutdownCh)
		<-stopCh
		ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
		server.Shutdown(ctx)
		cancel()
	}()

	go func() {
		defer utilruntime.HandleCrash()
		defer close(listenerStoppedCh)

		var listener net.Listener
		listener = tcpKeepAliveListener{ln}
		if server.TLSConfig != nil {
			listener = tls.NewListener(listener, server.TLSConfig)
		}

		err := server.Serve(listener)

		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()

	return serverShutdownCh, listenerStoppedCh, nil
}

最后更新于

这有帮助吗?