启动篇-07启动 HTTP 服务
Run
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
...
return prepared.Run(stopCh)
}prepared.Run
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}preparedGenericAPIServer.Run
staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
// Run 生成 https server
// It only returns if stopCh is closed or the secure port cannot be listened on initially.
// 它只有在 stopCh 关闭或者无法监听安全端口时才会返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
// 在 stopCh 信号触发时,执行 s.Destroy() 函数
defer s.Destroy()
// 生成一个新的 goroutine,用于关闭 MuxAndDiscoveryComplete 信号
// 注册发生在 generic api server 构造期间
// 链中的最后一个 server 从前一个实例聚合信号
go func() {
for _, muxAndDiscoveryCompletedSignal := range s.GenericAPIServer.MuxAndDiscoveryCompleteSignals() {
select {
case <-muxAndDiscoveryCompletedSignal:
continue
case <-stopCh:
klog.V(1).Infof("haven't completed %s, stop requested", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
return
}
}
s.lifecycleSignals.MuxAndDiscoveryComplete.Signal()
klog.V(1).Infof("%s has all endpoints registered and discovery information is complete", s.lifecycleSignals.MuxAndDiscoveryComplete.Name())
}()
go func() {
defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
<-stopCh
// 一旦启动关闭,/readyz 应该开始返回失败
// 这为负载均衡器定义了一个窗口,该窗口由 ShutdownDelayDuration 定义,以便检测 /readyz 是否为红色,并停止将流量发送到此服务器
shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
time.Sleep(s.ShutdownDelayDuration)
}()
// 在延迟的 stopCh 之后关闭 socket
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
// 当启用此模式时,我们将执行以下操作:
// - 服务器将继续监听,直到所有现有请求(不包括活动的长时间运行的请求)都被耗尽
// - 一旦耗尽,http Server Shutdown 将被调用,超时时间为 2s
// net/http 等待 1s 以响应 GO_AWAY 帧的对等方,因此我们应该等待至少 2s
shutdownTimeout = 2 * time.Second
klog.V(1).InfoS("[graceful-termination] using HTTP Server shutdown timeout", "ShutdownTimeout", shutdownTimeout)
}
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
go func() {
defer close(stopHttpServerCh)
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
timeToStopHttpServerCh = drainedCh.Signaled()
}
<-timeToStopHttpServerCh
}()
// 在任何请求到来之前启动 audit backend。这意味着我们必须在 http 服务器开始服务之前调用 Backend.Run
// 否则 Backend.ProcessEvents 调用可能会阻塞。AuditBackend.Run 将在所有正在进行的请求耗尽后停止
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(drainedCh.Signaled()); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 启动 http server
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
httpServerStoppedListeningCh := s.lifecycleSignals.HTTPServerStoppedListening
go func() {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
}()
// 一旦 ShutdownDelayDuration 过去并且预关闭钩子完成,我们就不再接受新的请求
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
defer notAcceptingNewRequestCh.Signal()
// 在关闭处理程序链之前等待延迟的 stopCh
<-delayedStopCh.Signaled()
// 此外,还要等待预关闭钩子也完成,因为其中一些需要发送 API 调用来清理自身(例如,lease reconcilers 从活动服务器中删除自身)
<-preShutdownHooksHasStoppedCh.Signaled()
}()
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", drainedCh.Name())
defer drainedCh.Signal()
// 在关闭处理程序链之前等待延迟的 stopCh(在调用 Wait 之后拒绝一切)
<-notAcceptingNewRequestCh.Signaled()
// 等待所有请求完成,这些请求受 RequestTimeout 变量的限制。
// 一旦调用 HandlerChainWaitGroup.Wait,apiserver 将拒绝任何传入请求,该请求将通过 WithWaitGroup 过滤器返回 {503, Retry-After} 响应。
// 相反,我们观察到传入请求会获得“connection refused”错误,这是因为在这一点上,我们已经调用了“Server.Shutdown”,并且 net/http 服务器已停止监听。
// 这导致传入请求获得“connection refused”错误。
// 另一方面,如果启用了“ShutdownSendRetryAfter”,则传入请求将被拒绝返回 {429, Retry-After} ,因为在耗尽了正在进行的请求之后,才会调用“Server.Shutdown”
// TODO: can we consolidate these two modes of graceful termination?
s.HandlerChainWaitGroup.Wait()
}()
klog.V(1).Info("[graceful-termination] waiting for shutdown to be initiated")
<-stopCh
// 直接运行关闭钩子。这包括从 kubernetes 端点注销(如果是 kube-apiserver)
func() {
defer func() {
preShutdownHooksHasStoppedCh.Signal()
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
}()
err = s.RunPreShutdownHooks()
}()
if err != nil {
return err
}
// 等待所有正在进行的请求耗尽,受 RequestTimeout 变量的限制
<-drainedCh.Signaled()
if s.AuditBackend != nil {
s.AuditBackend.Shutdown()
klog.V(1).InfoS("[graceful-termination] audit backend shutdown completed")
}
// 等待 stoppedCh,当优雅终止(server.Shutdown)完成时关闭
<-listenerStoppedCh
<-stoppedCh
klog.V(1).Info("[graceful-termination] apiserver is exiting")
return nil
}NonBlockingRun
Serve
RunServer
最后更新于
这有帮助吗?