Expose readiness and liveness apis so the process's status can be detected by the scheduling cluster like K8S.
@No-SilverBullet
目标:为 dubbo-go 添加 HTTP 健康检查 API,支持 K8S 的 readiness 和 liveness probe。
前置了解:
K8S通过两种探针与健康检查API交互:
- Liveness Probe(存活探针)
-
目的:检查容器是否还活着
-
失败后果:K8S会重启容器
-
调用端点:GET http://pod-ip:8080/health/live
- Readiness Probe(就绪探针)
-
目的:检查容器是否准备好接收流量
-
失败后果:K8S会从Service的endpoint中移除该Pod,停止转发流量
-
调用端点:GET http://pod-ip:8080/health/ready
- 配置扩展
config/metric_config.go:
// MetricsConfig This is the config struct for all metrics implementation
type MetricsConfig struct {
Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
EnableMetadata *bool `default:"false" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
EnableRegistry *bool `default:"false" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
EnableConfigCenter *bool `default:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"`
// added health check configuration
HealthCheck *HealthCheckConfig `yaml:"health-check" json:"health-check" property:"health-check"`
rootConfig *RootConfig
}
type HealthCheckConfig struct {
Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"`
Port string `default:"8080" yaml:"port" json:"port,omitempty" property:"port"`
LivePath string `default:"/health/live" yaml:"live-path" json:"live-path,omitempty" property:"live-path"`
ReadyPath string `default:"/health/ready" yaml:"ready-path" json:"ready-path,omitempty" property:"ready-path"`
Timeout string `default:"10s" yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
- 添加常量定义
common/constant/key.go:
const (
HealthCheckHttpEnabledKey = "health-check.enabled"
HealthCheckHttpPortKey = "health-check.port"
HealthCheckLivePathKey = "health-check.live-path"
HealthCheckReadyPathKey = "health-check.ready-path"
HealthCheckTimeoutKey = "health-check.timeout"
)
common/constant/default.go:
const (
HealthCheckDefaultPort = "8080"
HealthCheckDefaultLivePath = "/health/live"
HealthCheckDefaultReadyPath = "/health/ready"
HealthCheckDefaultTimeout = "10s"
)
- 创建 HTTP 健康检查服务
server/health.go:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"context"
"encoding/json"
"net/http"
"time"
)
import (
"github.com/dubbogo/gost/log/logger"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol/triple/health/triple_health"
)
// HealthCheckServer HTTP health check server
type HealthCheckServer struct {
url *common.URL
server *http.Server
}
// HealthResponse health check response
type HealthResponse struct {
Status string `json:"status"`
Timestamp int64 `json:"timestamp"`
Details map[string]string `json:"details,omitempty"`
}
// NewHealthCheckServer creates a new health check server
func NewHealthCheckServer(url *common.URL) *HealthCheckServer {
return &HealthCheckServer{url: url}
}
// Start starts the health check server
func (h *HealthCheckServer) Start() {
if !h.url.GetParamBool(constant.HealthCheckEnabledKey, false) {
return
}
mux := http.NewServeMux()
port := h.url.GetParam(constant.HealthCheckPortKey, constant.HealthCheckDefaultPort)
livePath := h.url.GetParam(constant.HealthCheckLivePathKey, constant.HealthCheckDefaultLivePath)
readyPath := h.url.GetParam(constant.HealthCheckReadyPathKey, constant.HealthCheckDefaultReadyPath)
mux.HandleFunc(livePath, h.livenessHandler)
mux.HandleFunc(readyPath, h.readinessHandler)
h.server = &http.Server{Addr: ":" + port, Handler: mux}
// Register graceful shutdown callback
extension.AddCustomShutdownCallback(func() {
timeoutStr := h.url.GetParam(constant.HealthCheckTimeoutKey, constant.HealthCheckDefaultTimeout)
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
timeout = 10 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := h.server.Shutdown(ctx); err != nil {
logger.Errorf("health check server shutdown failed, err: %v", err)
} else {
logger.Info("health check server gracefully shutdown success")
}
})
logger.Infof("health check endpoints: %s%s, %s%s", port, livePath, port, readyPath)
go func() {
if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Errorf("health check server error: %v", err)
}
}()
}
// livenessHandler liveness probe handler
func (h *HealthCheckServer) livenessHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// liveness check - whether the process is alive
h.writeResponse(w, "UP", http.StatusOK, map[string]string{
"check": "liveness",
})
}
// readinessHandler readiness probe handler
func (h *HealthCheckServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// readiness check - whether the service is ready
if h.isServiceReady() {
h.writeResponse(w, "UP", http.StatusOK, map[string]string{
"check": "readiness",
})
} else {
h.writeResponse(w, "DOWN", http.StatusServiceUnavailable, map[string]string{
"check": "readiness",
"reason": "service not ready",
})
}
}
// isServiceReady checks if the service is ready
func (h *HealthCheckServer) isServiceReady() bool {
// todo
return true
}
// writeResponse writes the response
func (h *HealthCheckServer) writeResponse(w http.ResponseWriter, status string, httpStatus int, details map[string]string) {
response := HealthResponse{
Status: status,
Timestamp: time.Now().UnixMilli(),
Details: details,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpStatus)
if err := json.NewEncoder(w).Encode(response); err != nil {
logger.Errorf("failed to encode health response: %v", err)
}
}
检查逻辑(讨论)
- 集成到现有系统
metrics/health/health_registry.go:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package health
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/server"
)
func init() {
metrics.AddCollector(func(registry metrics.MetricRegistry, url *common.URL) {
healthServer := server.NewHealthCheckServer(url)
healthServer.Start()
})
}
然后在 imports/imports.go 中导入:
_ "dubbo.apache.org/dubbo-go/v3/metrics/health"
用户使用案例...
目标:为 dubbo-go 添加 HTTP 健康检查 API,支持 K8S 的 readiness 和 liveness probe。
前置了解:
K8S通过两种探针与健康检查API交互:
- Liveness Probe(存活探针)
- 目的:检查容器是否还活着
- 失败后果:K8S会重启容器
- 调用端点:GET http://pod-ip:8080/health/live
- Readiness Probe(就绪探针)
- 目的:检查容器是否准备好接收流量
- 失败后果:K8S会从Service的endpoint中移除该Pod,停止转发流量
- 调用端点:GET http://pod-ip:8080/health/ready
- 配置扩展
config/metric_config.go:
// MetricsConfig This is the config struct for all metrics implementation type MetricsConfig struct { Enable *bool
default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"Port stringdefault:"9090" yaml:"port" json:"port,omitempty" property:"port"Path stringdefault:"/metrics" yaml:"path" json:"path,omitempty" property:"path"Protocol stringdefault:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"EnableMetadata *booldefault:"false" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"EnableRegistry *booldefault:"false" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"EnableConfigCenter *booldefault:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"Prometheus *PrometheusConfigyaml:"prometheus" json:"prometheus" property:"prometheus"Aggregation *AggregateConfigyaml:"aggregation" json:"aggregation" property:"aggregation"// added health check configuration HealthCheck *HealthCheckConfigyaml:"health-check" json:"health-check" property:"health-check"rootConfig *RootConfig }type HealthCheckConfig struct { Enabled *bool
default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"Port stringdefault:"8080" yaml:"port" json:"port,omitempty" property:"port"LivePath stringdefault:"/health/live" yaml:"live-path" json:"live-path,omitempty" property:"live-path"ReadyPath stringdefault:"/health/ready" yaml:"ready-path" json:"ready-path,omitempty" property:"ready-path"Timeout stringdefault:"10s" yaml:"timeout" json:"timeout,omitempty" property:"timeout"} 2. 添加常量定义common/constant/key.go:
const ( HealthCheckHttpEnabledKey = "health-check.enabled" HealthCheckHttpPortKey = "health-check.port" HealthCheckLivePathKey = "health-check.live-path" HealthCheckReadyPathKey = "health-check.ready-path" HealthCheckTimeoutKey = "health-check.timeout" ) common/constant/default.go:
const ( HealthCheckDefaultPort = "8080" HealthCheckDefaultLivePath = "/health/live" HealthCheckDefaultReadyPath = "/health/ready" HealthCheckDefaultTimeout = "10s" ) 3. 创建 HTTP 健康检查服务
server/health.go:
/*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */
package server
import ( "context" "encoding/json" "net/http" "time" )
import ( "github.com/dubbogo/gost/log/logger" )
import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol/triple/health/triple_health" )
// HealthCheckServer HTTP health check server type HealthCheckServer struct { url *common.URL server *http.Server }
// HealthResponse health check response type HealthResponse struct { Status string
json:"status"Timestamp int64json:"timestamp"Details map[string]stringjson:"details,omitempty"}// NewHealthCheckServer creates a new health check server func NewHealthCheckServer(url *common.URL) *HealthCheckServer { return &HealthCheckServer{url: url} }
// Start starts the health check server func (h *HealthCheckServer) Start() { if !h.url.GetParamBool(constant.HealthCheckEnabledKey, false) { return }
mux := http.NewServeMux() port := h.url.GetParam(constant.HealthCheckPortKey, constant.HealthCheckDefaultPort) livePath := h.url.GetParam(constant.HealthCheckLivePathKey, constant.HealthCheckDefaultLivePath) readyPath := h.url.GetParam(constant.HealthCheckReadyPathKey, constant.HealthCheckDefaultReadyPath)
mux.HandleFunc(livePath, h.livenessHandler) mux.HandleFunc(readyPath, h.readinessHandler)
h.server = &http.Server{Addr: ":" + port, Handler: mux}
// Register graceful shutdown callback extension.AddCustomShutdownCallback(func() { timeoutStr := h.url.GetParam(constant.HealthCheckTimeoutKey, constant.HealthCheckDefaultTimeout) timeout, err := time.ParseDuration(timeoutStr) if err != nil { timeout = 10 * time.Second } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if err := h.server.Shutdown(ctx); err != nil { logger.Errorf("health check server shutdown failed, err: %v", err) } else { logger.Info("health check server gracefully shutdown success") } })
logger.Infof("health check endpoints: %s%s, %s%s", port, livePath, port, readyPath) go func() { if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Errorf("health check server error: %v", err) } }() }
// livenessHandler liveness probe handler func (h *HealthCheckServer) livenessHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }
// liveness check - whether the process is alive h.writeResponse(w, "UP", http.StatusOK, map[string]string{ "check": "liveness", }) }
// readinessHandler readiness probe handler func (h *HealthCheckServer) readinessHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }
// readiness check - whether the service is ready if h.isServiceReady() { h.writeResponse(w, "UP", http.StatusOK, map[string]string{ "check": "readiness", }) } else { h.writeResponse(w, "DOWN", http.StatusServiceUnavailable, map[string]string{ "check": "readiness", "reason": "service not ready", }) } }
// isServiceReady checks if the service is ready func (h *HealthCheckServer) isServiceReady() bool { // todo return true }
// writeResponse writes the response func (h *HealthCheckServer) writeResponse(w http.ResponseWriter, status string, httpStatus int, details map[string]string) { response := HealthResponse{ Status: status, Timestamp: time.Now().UnixMilli(), Details: details, }
w.Header().Set("Content-Type", "application/json") w.WriteHeader(httpStatus)
if err := json.NewEncoder(w).Encode(response); err != nil { logger.Errorf("failed to encode health response: %v", err) } } 检查逻辑(讨论)
- 集成到现有系统
metrics/health/health_registry.go:
/*
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. */
package health
import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/metrics" "dubbo.apache.org/dubbo-go/v3/server" )
func init() { metrics.AddCollector(func(registry metrics.MetricRegistry, url *common.URL) { healthServer := server.NewHealthCheckServer(url) healthServer.Start() }) } 然后在 imports/imports.go 中导入:
_ "dubbo.apache.org/dubbo-go/v3/metrics/health" 用户使用案例...
// isServiceReady checks if the service is ready func (h *HealthCheckServer) isServiceReady() bool { // todo return true } 这里最好设计成interface或者回调,支持用户自定义健康检查逻辑。主要原因是liveness探针是有风险的,直接影响pod重启,因此最好开放给用户。而且本来就绪与否和用户依赖的中间件甚至业务逻辑都是有关系的。