blog icon indicating copy to clipboard operation
blog copied to clipboard

Kubernetes 的 apiserver 实现 - 服务启动

Open junnplus opened this issue 5 years ago • 5 comments

Library for writing a Kubernetes-style API server.

apiserver 是基于 go-restful 实现的一个 API server,下面是一个 demo 例子,我们将通过它来了解 apiserver 的服务启动过程。

// apiserver-sample

package main

import (
	"fmt"
	"net"

	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	genericserver "k8s.io/apiserver/pkg/server"
	genericoptions "k8s.io/apiserver/pkg/server/options"
)

var (
	// Scheme defines methods for serializing and deserializing API objects.
	Scheme = runtime.NewScheme()
	// Codecs provides methods for retrieving codecs and serializers for specific
	// versions and content types.
	Codecs = serializer.NewCodecFactory(Scheme)
)

func main() {
	genericConfig := genericserver.NewConfig(Codecs)
	secureServingOption := NewSecureServingOptions()
	if err := secureServingOption.WithLoopback().ApplyTo(
		&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); err != nil {
		fmt.Println(err)
		return
	}

	completedConfig := genericConfig.Complete(nil)
	apiserver, err := completedConfig.New("apiserver", genericserver.NewEmptyDelegate())
	if err != nil {
		fmt.Println(err)
		return
	}

	stopCh := genericserver.SetupSignalHandler()
	fmt.Println("serving...")
	apiserver.SecureServingInfo.Serve(apiserver.Handler, apiserver.ShutdownTimeout, stopCh)
	<-stopCh
}

代码有几个主要的结构体:

  • genericConfig 对应的 Config 结构体
  • genericConfig.SecureServing 对应的 SecureServingInfo 结构体
  • apiserver 对应的 GenericAPIServer 结构体

junnplus avatar May 11 '20 02:05 junnplus

SecureServingInfo

SecureServingInfo 在 apiserver 中的作用与 http.ListenAndServe 类似。

// pkg/server/deprecated_insecure_serving.go

type SecureServingInfo struct {
	// Listener is the secure server network listener.
	Listener net.Listener

	// Cert is the main server cert which is used if SNI does not match. Cert must be non-nil and is
	// allowed to be in SNICerts.
	Cert dynamiccertificates.CertKeyContentProvider
...
}

// Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
	if s.Listener == nil {
		return nil, fmt.Errorf("listener must not be nil")
	}

	tlsConfig, err := s.tlsConfig(stopCh)
	if err != nil {
		return nil, err
	}

	secureServer := &http.Server{
		Addr:           s.Listener.Addr().String(),
		Handler:        handler,
		MaxHeaderBytes: 1 << 20,
		TLSConfig:      tlsConfig,
	}
...
	return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}

SecureServingInfo 结构体持有 net.Listener 对象,Serve 方法构建 http.Server 对象并开启服务处理请求。

SecureServingInfo 提供了基于 HTTPS 的安全服务,相对的,apiserver 在之前的版本也支持非安全的 HTTP。

// pkg/server/deprecated_insecure_serving.go

// DeprecatedInsecureServingInfo is the main context object for the insecure http server.
// HTTP does NOT include authentication or authorization.
// You shouldn't be using this.  It makes sig-auth sad.
type DeprecatedInsecureServingInfo struct {
	// Listener is the secure server network listener.
	Listener net.Listener
	// optional server name for log messages
	Name string
}

从命名上可以知道 DeprecatedInsecureServingInfo 是已经弃用的结构体,不推荐使用。

You shouldn't be using this. It makes sig-auth sad.

SecureServingInfo 可以通过 SecureServingOptions 结构来构建。

// apiserver-sample

func NewSecureServingOptions() *genericoptions.SecureServingOptions {
	o := genericoptions.SecureServingOptions{
		BindAddress: net.ParseIP("0.0.0.0"),
		BindPort:    6443,
		Required:    true,
		ServerCert: genericoptions.GeneratableKeyCert{
			PairName:      "apiserver",
			CertDirectory: "/var/run/apiserver",
		},
	}
	if err := o.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
		fmt.Println(err)
	}
	return &o
}

// secureServingOption := NewSecureServingOptions()
// secureServingOption.ApplyTo(&secureServingInfo)

junnplus avatar May 11 '20 02:05 junnplus

Config / CompletedConfig

Config 作为 apiserver 的配置结构:

// pkg/server/config.go

// Config is a structure used to configure a GenericAPIServer.
// Its members are sorted roughly in order of importance for composers.
type Config struct {
	// SecureServing is required to serve https
	SecureServing *SecureServingInfo
...
	// ExternalAddress is the host name to use for external (public internet) facing URLs (e.g. Swagger)
	// Will default to a value based on secure serving info and available ipv4 IPs.
	ExternalAddress string
...
}

// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
	defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
	return &Config{
		Serializer:                  codecs,
		BuildHandlerChainFunc:       DefaultBuildHandlerChain,
...
	}
}

NewConfig 函数可以创建一个具有默认值的 Config,这边先不具体展开讲解每个配置。 通过 Complete 方法可以完整的填充 Config 结构。

// pkg/server/config.go

// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
	if len(c.ExternalAddress) == 0 && c.PublicAddress != nil {
		c.ExternalAddress = c.PublicAddress.String()
	}

	// if there is no port, and we listen on one securely, use that one
	if _, _, err := net.SplitHostPort(c.ExternalAddress); err != nil {
		if c.SecureServing == nil {
			klog.Fatalf("cannot derive external address port without listening on a secure port.")
		}
		_, port, err := c.SecureServing.HostPort()
		if err != nil {
			klog.Fatalf("cannot derive external address from the secure port: %v", err)
		}
		c.ExternalAddress = net.JoinHostPort(c.ExternalAddress, strconv.Itoa(port))
	}
...

Complete 方法填充的字段数据可以从其他字段派生出来。 像 ExternalAddress 这个外部访问的地址配置,可以从 SecureServing 这个结构体获得。

completedConfig 作为 Config 结构的完整表示。

// pkg/server/config.go
type completedConfig struct {
	*Config

	//===========================================================================
	// values below here are filled in during completion
	//===========================================================================

	// SharedInformerFactory provides shared informers for resources
	SharedInformerFactory informers.SharedInformerFactory
}

type CompletedConfig struct {
	// Embed a private pointer that cannot be instantiated outside of this package.
	*completedConfig
}

junnplus avatar May 11 '20 02:05 junnplus

GenericAPIServer

GenericAPIServer 是 apiserver 提供服务的结构体,维护 apiserver 的状态。

// pkg/server/genericapiserver.go

// GenericAPIServer contains state for a Kubernetes cluster api server.
type GenericAPIServer struct {
...

	// Handler holds the handlers being used by this API server
	Handler *APIServerHandler

	// delegationTarget is the next delegate in the chain. This is never nil.
	delegationTarget DelegationTarget

}

GenericAPIServer 的属性这边只解释列出来几个:

  • Handler 指针是 apiserver 的服务处理入口,维护着 http 处理函数。
  • delegationTarget 是作为委托对象存在,GenericAPIServer 自身也是一个可委托对象。

通过 completedConfig 可以创建一个新的 GenericAPIServer。

// pkg/server/config.go

// New creates a new server which logically combines the handling chain with the passed server.
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating.
// delegationTarget may not be nil.
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
...

	handlerChainBuilder := func(handler http.Handler) http.Handler {
		return c.BuildHandlerChainFunc(handler, c.Config)
	}
	apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

	s := &GenericAPIServer{
...
		delegationTarget:           delegationTarget,
...

		Handler: apiServerHandler,

		listedPathProvider: apiServerHandler,
...
	}

...

	s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}

	installAPI(s, c.Config)
...
	return s, nil
}

New 方法接收一个 name 和 delegationTarge 委托对象,创建 APIServerHandler 来构造 GenericAPIServer。

APIServerHandler

APIServerHandler 是通过 NewAPIServerHandler 函数创建的。

// pkg/server/handler.go
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

NewAPIServerHandler 创建的 APIServerHandler 由多种 http.Handler 组成:

  • GoRestfulContainer,基于 go-restful 的 container,
  • NonGoRestfulMux,apiserver 内部实现的 PathRecorderMux,处理非 restful 的接口。

以及作为两者路由分发器的 director 和 apiserver 完整处理链 FullHandlerChain。

APIServerHandler 的 http 处理函数是以 FullHandlerChain 开始的:

// pkg/server/handler.go
// ServeHTTP makes it an http.Handler
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	a.FullHandlerChain.ServeHTTP(w, r)
}

FullHandlerChain 处理链是由 DefaultBuildHandlerChain(director) 构造的。

// pkg/server/config.go

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
	if c.FlowControl != nil {
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
	failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericfilters.WithPanicRecovery(handler)
	return handler
}

DefaultBuildHandlerChain 构造出来的处理链是以各种过滤器开头(包含鉴权、流控等),director 结尾的链。

我们再来看下 director 的分发请求功能。

// pkg/server/handler.go

type director struct {
	name               string
	goRestfulContainer *restful.Container
	nonGoRestfulMux    *mux.PathRecorderMux
}

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

	// check to see if our webservices want to claim this path
	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
		switch {
		case ws.RootPath() == "/apis":
			// if we are exactly /apis or /apis/, then we need special handling in loop.
			// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
			// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
			if path == "/apis" || path == "/apis/" {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}

		case strings.HasPrefix(path, ws.RootPath()):
			// ensure an exact match or a path boundary match
			if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}

	// if we didn't find a match, then we just skip gorestful altogether
	klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

所以,apiserver 的 http 处理过程是这样的: FullHandlerChain -> Director -> {GoRestfulContainer,NonGoRestfulMux}

installAPI

installAPI 注册通用的 API 接口。

func installAPI(s *GenericAPIServer, c *Config) {
	if c.EnableIndex {
		routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
	}
	if c.EnableProfiling {
		routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
		if c.EnableContentionProfiling {
			goruntime.SetBlockProfileRate(1)
		}
		// so far, only logging related endpoints are considered valid to add for these debug flags.
		routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
	}
	if c.EnableMetrics {
		if c.EnableProfiling {
			routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
		} else {
			routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
		}
	}

	routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)

	if c.EnableDiscovery {
		s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
	}
}

根据 Config 的配置加载多个 apiserver 基础的接口。

  • EnableIndex 对应的 /index 接口
  • EnableProfiling 对应的 /debug/* 接口
  • EnableMetrics 对应的 /metrics 接口
  • Version 对应的 /version 接口
  • EnableDiscovery 对应的 /apis 接口

前面四个基础接口都定义在 pkg/server/routes routes 包里面,分别被注册到 NonGoRestfulMux 和 GoRestfulContainer 这两个路由注册器上。

junnplus avatar May 11 '20 02:05 junnplus

image

junnplus avatar May 11 '20 02:05 junnplus

到这里可以利用 SecureServingInfo 和 APIServerHandler 来启动服务处理请求。 完整的 demo 代码见 gist

# curl -k https://127.0.0.1:6443
{
  "paths": [
    "/apis",
    "/metrics"
  ]
}

GenericAPIServer 还提供了 PrepareRun 的方法来启动 API 安装,通过封装的 preparedGenericAPIServer.Run() 来启动服务。

apiserver.PrepareRun().Run(stopCh)
// pkg/server/genericapiserver.go
// preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
type preparedGenericAPIServer struct {
	*GenericAPIServer
}

// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
	delayedStopCh := make(chan struct{})

	// close socket after delayed stopCh
	err := s.NonBlockingRun(delayedStopCh)
	if err != nil {
		return err
	}

	<-stopCh
...
}

// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {

	// Use an internal stop channel to allow cleanup of the listeners on error.
	internalStopCh := make(chan struct{})
	var stoppedCh <-chan struct{}
	if s.SecureServingInfo != nil && s.Handler != nil {
		var err error
		stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
	}
...
}

PrepareRun 在启动服务之前额外处理了 API 安装。

// pkg/server/genericapiserver.go
// PrepareRun does post API installation setup steps. It calls recursively the same function of the delegates.
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
	s.delegationTarget.PrepareRun()
...

	s.installHealthz()
	s.installLivez()

	s.installReadyz()
...

	return preparedGenericAPIServer{s}
}

可见,PrepareRun 安装了 GenericAPIServer 自身服务的一些检查接口:

  • installHealthz 安装 /healthz 接口
  • installLivez 安装 /livez 接口
  • installReadyz 安装 /readyz 接口

以及委托对象的 PrepareRun。

以 installHealthz 为例:

// pkg/server/healthz.go
// installHealthz creates the healthz endpoint for this server
func (s *GenericAPIServer) installHealthz() {
	s.healthzLock.Lock()
	defer s.healthzLock.Unlock()
	s.healthzChecksInstalled = true
	healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...)
}

func InstallHandler(mux mux, checks ...HealthChecker) {
	InstallPathHandler(mux, "/healthz", checks...)
}

检查的处理逻辑来自 GenericAPIServer.healthzChecks,也就是来自 Config 配置中的 HealthzChecks。

// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
	defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
	return &Config{
		HealthzChecks:               append([]healthz.HealthChecker{}, defaultHealthChecks...),
		ReadyzChecks:                append([]healthz.HealthChecker{}, defaultHealthChecks...),
		LivezChecks:                 append([]healthz.HealthChecker{}, defaultHealthChecks...),
...
}

默认情况下,/healthz、/livez 和 /readyz 这三个接口都检查了:

  • healthz.PingHealthz,简单的 ping 实现;
  • healthz.LogHealthz,检查 log 是否阻塞。

apiserver.PrepareRun().Run(stopCh) 方式启动服务,我们可以看到下面的 API:

# curl -k https://127.0.0.1:6443
{
  "paths": [
    "/apis",
    "/healthz",
    "/healthz/log",
    "/healthz/ping",
    "/livez",
    "/livez/log",
    "/livez/ping",
    "/metrics",
    "/readyz",
    "/readyz/log",
    "/readyz/ping",
    "/readyz/shutdown"
  ]
}

junnplus avatar May 11 '20 03:05 junnplus