启动篇-07启动 HTTP 服务
Run
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
...
return prepared.Run(stopCh)
}prepared.Run
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}preparedGenericAPIServer.Run
staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
// Run 生成 https server
// It only returns if stopCh is closed or the secure port cannot be listened on initially.
// 它只有在 stopCh 关闭或者无法监听安全端口时才会返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
// 在 stopCh 信号触发时,执行 s.Destroy() 函数
defer s.Destroy()
// 生成一个新的 goroutine,用于关闭 MuxAndDiscoveryComplete 信号
// 注册发生在 generic api server 构造期间
// 链中的最后一个 server 从前一个实例聚合信号
go func() {
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
select {
case <-muxAndDiscoveryCompletedSignal:
continue
case <-stopCh:
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
return
}
}
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
}()
go func() {
defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
<-stopCh
// 一旦启动关闭,/readyz 应该开始返回失败
// 这为负载均衡器定义了一个窗口,该窗口由 ShutdownDelayDuration 定义,以便检测 /readyz 是否为红色,并停止将流量发送到此服务器
shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
time.Sleep(s.ShutdownDelayDuration)
}()
// 在延迟的 stopCh 之后关闭 socket
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// 当启用此模式时,我们将执行以下操作:
// - 服务器将继续监听,直到所有现有请求(不包括活动的长时间运行的请求)都被耗尽
// - 一旦耗尽,http Server Shutdown 将被调用,超时时间为 2s
// net/http 等待 1s 以响应 GO_AWAY 帧的对等方,因此我们应该等待至少 2s
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
go func() {
defer close(stopHttpServerCh)
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
timeToStopHttpServerCh = drainedCh.Signaled()
}
<-timeToStopHttpServerCh
}()
// 在任何请求到来之前启动 audit backend。这意味着我们必须在 http 服务器开始服务之前调用 Backend.Run
// 否则 Backend.ProcessEvents 调用可能会阻塞。AuditBackend.Run 将在所有正在进行的请求耗尽后停止
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 启动 http server
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
go func() {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
// 一旦 ShutdownDelayDuration 过去并且预关闭钩子完成,我们就不再接受新的请求
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
defer notAcceptingNewRequestCh.Signal()
// 在关闭处理程序链之前等待延迟的 stopCh
<-delayedStopCh.Signaled()
// 此外,还要等待预关闭钩子也完成,因为其中一些需要发送 API 调用来清理自身(例如,lease reconcilers 从活动服务器中删除自身)
<-preShutdownHooksHasStoppedCh.Signaled()
}()
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer drainedCh.Signal()
// 在关闭处理程序链之前等待延迟的 stopCh(在调用 Wait 之后拒绝一切)
<-notAcceptingNewRequestCh.Signaled()
// 等待所有请求完成,这些请求受 RequestTimeout 变量的限制。
// 一旦调用 HandlerChainWaitGroup.Wait,apiserver 将拒绝任何传入请求,该请求将通过 WithWaitGroup 过滤器返回 {503, Retry-After} 响应。
// 相反,我们观察到传入请求会获得“connection refused”错误,这是因为在这一点上,我们已经调用了“Server.Shutdown”,并且 net/http 服务器已停止监听。
// 这导致传入请求获得“connection refused”错误。
// 另一方面,如果启用了“ShutdownSendRetryAfter”,则传入请求将被拒绝返回 {429, Retry-After} ,因为在耗尽了正在进行的请求之后,才会调用“Server.Shutdown”
// TODO: can we consolidate these two modes of graceful termination?
s.HandlerChainWaitGroup.Wait()
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// 直接运行关闭钩子。这包括从 kubernetes 端点注销(如果是 kube-apiserver)
func() {
defer func() {
preShutdownHooksHasStoppedCh.Signal()
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
}()
err = s.RunPreShutdownHooks()
}()
if err != nil {
return err
}
// 等待所有正在进行的请求耗尽,受 RequestTimeout 变量的限制
<-drainedCh.Signaled()
if s.AuditBackend != nil {
s.AuditBackend.Shutdown()
klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
}
// 等待 stoppedCh,当优雅终止(server.Shutdown)完成时关闭
<-listenerStoppedCh
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
return nil
}NonBlockingRun
// NonBlockingRun 生成安全的 http server
// 如果无法监听安全端口,则返回错误
// 返回的 channel 在终止完成时关闭
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// 使用内部停止 channel 允许在错误时清理 listener
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
// 生成安全的 http server
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
return nil, nil, err
}
}
// 现在 listener 已经绑定成功,调用者有责任关闭提供的 channel 以确保清理
go func() {
<-stopCh
close(internalStopCh)
}()
// 运行启动后的 hook
s.RunPostStartHooks(stopCh)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return stoppedCh, listenerStoppedCh, nil
}Serve
// Serve 运行安全的 http server。它只会在证书无法加载或初始监听调用失败时失败。
// 实际的 server 循环(通过关闭 stopCh 可停止)在 go 程中运行,即 Serve 不会阻塞
// 它返回一个 stoppedCh,该 channel 在所有非劫持的活动请求都已处理时关闭
// 它返回一个 listenerStoppedCh,该 channel 在底层 http Server 停止监听时关闭
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
if s.Listener == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}
tlsConfig, err := s.tlsConfig(stopCh)
if err != nil {
return nil, nil, err
}
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
}
// 在调查的集群中,至少 99% 的序列化资源都小于 256kb
// 这应该足以容纳大多数 API POST 请求,且足够小,以允许每个连接缓冲区的大小乘以 `MaxConcurrentStreams`
const resourceBody99Percentile = 256 * 1024
http2Options := &http2.Server{
IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
}
// 从 1MB 的默认值中缩小每个流的缓冲区和最大帧大小,同时仍然可以在单个帧中容纳大多数 API POST 请求
http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
http2Options.MaxReadFrameSize = resourceBody99Percentile
// 使用覆盖的并发流设置,或明确使用默认值 250,以便我们可以适当地调整 MaxUploadBufferPerConnection
if s.HTTP2MaxStreamsPerConnection > 0 {
http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
} else {
http2Options.MaxConcurrentStreams = 250
}
// 将连接缓冲区大小从 1MB 的默认值增加到指定的并发流数量
http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
if !s.DisableHTTP2 {
// 将设置应用于 server
if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
return nil, nil, fmt.Errorf("error configuring http2: %v", err)
}
}
// 使用 tlsHandshakeErrorWriter 处理 tls 握手错误的消息
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}RunServer
// RunServer 产生一个 go-routine,一直运行,直到 stopCh 被关闭
// 它返回一个 stoppedCh,该 channel 在所有非劫持的活动请求都已处理时关闭
// 此函数不会阻塞
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
if ln == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}
// Shutdown server gracefully.
serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(serverShutdownCh)
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx)
cancel()
}()
go func() {
defer utilruntime.HandleCrash()
defer close(listenerStoppedCh)
var listener net.Listener
listener = tcpKeepAliveListener{ln}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return serverShutdownCh, listenerStoppedCh, nil
}最后更新于
这有帮助吗?