dubbo-go icon indicating copy to clipboard operation
dubbo-go copied to clipboard

Expose readiness and liveness apis so the process's status can be detected by the scheduling cluster like K8S.

Open chickenlj opened this issue 3 years ago • 3 comments

chickenlj avatar Aug 31 '22 07:08 chickenlj

@No-SilverBullet

1kasa avatar May 30 '25 13:05 1kasa

目标:为 dubbo-go 添加 HTTP 健康检查 API,支持 K8S 的 readiness 和 liveness probe。

前置了解:

K8S通过两种探针与健康检查API交互:

  1. Liveness Probe(存活探针)
  • 目的:检查容器是否还活着

  • 失败后果:K8S会重启容器

  • 调用端点:GET http://pod-ip:8080/health/live

  1. Readiness Probe(就绪探针)
  • 目的:检查容器是否准备好接收流量

  • 失败后果:K8S会从Service的endpoint中移除该Pod,停止转发流量

  • 调用端点:GET http://pod-ip:8080/health/ready

  1. 配置扩展

config/metric_config.go:

// MetricsConfig This is the config struct for all metrics implementation
type MetricsConfig struct {
	Enable             *bool             `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"`
	Port               string            `default:"9090" yaml:"port" json:"port,omitempty" property:"port"`
	Path               string            `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"`
	Protocol           string            `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
	EnableMetadata     *bool             `default:"false" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"`
	EnableRegistry     *bool             `default:"false" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"`
	EnableConfigCenter *bool             `default:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"`
	Prometheus         *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"`
	Aggregation        *AggregateConfig  `yaml:"aggregation" json:"aggregation" property:"aggregation"`
	// added health check configuration
	HealthCheck        *HealthCheckConfig `yaml:"health-check" json:"health-check" property:"health-check"`
	rootConfig         *RootConfig
}

type HealthCheckConfig struct {
	Enabled     *bool  `default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled"`
	Port        string `default:"8080" yaml:"port" json:"port,omitempty" property:"port"`
	LivePath    string `default:"/health/live" yaml:"live-path" json:"live-path,omitempty" property:"live-path"`
	ReadyPath   string `default:"/health/ready" yaml:"ready-path" json:"ready-path,omitempty" property:"ready-path"`
	Timeout     string `default:"10s" yaml:"timeout" json:"timeout,omitempty" property:"timeout"`
}
  1. 添加常量定义

common/constant/key.go:

const (
    HealthCheckHttpEnabledKey    = "health-check.enabled"
    HealthCheckHttpPortKey       = "health-check.port"
    HealthCheckLivePathKey       = "health-check.live-path"
    HealthCheckReadyPathKey      = "health-check.ready-path"
    HealthCheckTimeoutKey        = "health-check.timeout"
)

common/constant/default.go:

const (
    HealthCheckDefaultPort      = "8080"
    HealthCheckDefaultLivePath  = "/health/live"
    HealthCheckDefaultReadyPath = "/health/ready"
    HealthCheckDefaultTimeout   = "10s"
)
  1. 创建 HTTP 健康检查服务

server/health.go:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package server

import (
	"context"
	"encoding/json"
	"net/http"
	"time"
)

import (
	"github.com/dubbogo/gost/log/logger"
)

import (
	"dubbo.apache.org/dubbo-go/v3/common"
	"dubbo.apache.org/dubbo-go/v3/common/constant"
	"dubbo.apache.org/dubbo-go/v3/common/extension"
	"dubbo.apache.org/dubbo-go/v3/protocol/triple/health/triple_health"
)

// HealthCheckServer HTTP health check server
type HealthCheckServer struct {
	url    *common.URL
	server *http.Server
}

// HealthResponse health check response
type HealthResponse struct {
	Status    string            `json:"status"`
	Timestamp int64             `json:"timestamp"`
	Details   map[string]string `json:"details,omitempty"`
}

// NewHealthCheckServer creates a new health check server
func NewHealthCheckServer(url *common.URL) *HealthCheckServer {
	return &HealthCheckServer{url: url}
}

// Start starts the health check server
func (h *HealthCheckServer) Start() {
	if !h.url.GetParamBool(constant.HealthCheckEnabledKey, false) {
		return
	}

	mux := http.NewServeMux()
	port := h.url.GetParam(constant.HealthCheckPortKey, constant.HealthCheckDefaultPort)
	livePath := h.url.GetParam(constant.HealthCheckLivePathKey, constant.HealthCheckDefaultLivePath)
	readyPath := h.url.GetParam(constant.HealthCheckReadyPathKey, constant.HealthCheckDefaultReadyPath)

	mux.HandleFunc(livePath, h.livenessHandler)
	mux.HandleFunc(readyPath, h.readinessHandler)

	h.server = &http.Server{Addr: ":" + port, Handler: mux}

	// Register graceful shutdown callback
	extension.AddCustomShutdownCallback(func() {
		timeoutStr := h.url.GetParam(constant.HealthCheckTimeoutKey, constant.HealthCheckDefaultTimeout)
		timeout, err := time.ParseDuration(timeoutStr)
		if err != nil {
			timeout = 10 * time.Second
		}
		ctx, cancel := context.WithTimeout(context.Background(), timeout)
		defer cancel()
		if err := h.server.Shutdown(ctx); err != nil {
			logger.Errorf("health check server shutdown failed, err: %v", err)
		} else {
			logger.Info("health check server gracefully shutdown success")
		}
	})

	logger.Infof("health check endpoints: %s%s, %s%s", port, livePath, port, readyPath)
	go func() {
		if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			logger.Errorf("health check server error: %v", err)
		}
	}()
}

// livenessHandler liveness probe handler
func (h *HealthCheckServer) livenessHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	// liveness check - whether the process is alive
	h.writeResponse(w, "UP", http.StatusOK, map[string]string{
		"check": "liveness",
	})
}

// readinessHandler readiness probe handler
func (h *HealthCheckServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	// readiness check - whether the service is ready
	if h.isServiceReady() {
		h.writeResponse(w, "UP", http.StatusOK, map[string]string{
			"check": "readiness",
		})
	} else {
		h.writeResponse(w, "DOWN", http.StatusServiceUnavailable, map[string]string{
			"check":  "readiness",
			"reason": "service not ready",
		})
	}
}

// isServiceReady checks if the service is ready
func (h *HealthCheckServer) isServiceReady() bool {
	// todo
	return true
}

// writeResponse writes the response
func (h *HealthCheckServer) writeResponse(w http.ResponseWriter, status string, httpStatus int, details map[string]string) {
	response := HealthResponse{
		Status:    status,
		Timestamp: time.Now().UnixMilli(),
		Details:   details,
	}

	w.Header().Set("Content-Type", "application/json")
	w.WriteHeader(httpStatus)

	if err := json.NewEncoder(w).Encode(response); err != nil {
		logger.Errorf("failed to encode health response: %v", err)
	}
}

检查逻辑(讨论)

  1. 集成到现有系统

metrics/health/health_registry.go:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package health

import (
	"dubbo.apache.org/dubbo-go/v3/common"
	"dubbo.apache.org/dubbo-go/v3/metrics"
	"dubbo.apache.org/dubbo-go/v3/server"
)

func init() {
	metrics.AddCollector(func(registry metrics.MetricRegistry, url *common.URL) {
		healthServer := server.NewHealthCheckServer(url)
		healthServer.Start()
	})
}

然后在 imports/imports.go 中导入:

_ "dubbo.apache.org/dubbo-go/v3/metrics/health"

用户使用案例...

solisamicus avatar Sep 26 '25 11:09 solisamicus

目标:为 dubbo-go 添加 HTTP 健康检查 API,支持 K8S 的 readiness 和 liveness probe。

前置了解:

K8S通过两种探针与健康检查API交互:

  1. Liveness Probe(存活探针)
  • 目的:检查容器是否还活着
  • 失败后果:K8S会重启容器
  • 调用端点:GET http://pod-ip:8080/health/live
  1. Readiness Probe(就绪探针)
  • 目的:检查容器是否准备好接收流量
  • 失败后果:K8S会从Service的endpoint中移除该Pod,停止转发流量
  • 调用端点:GET http://pod-ip:8080/health/ready
  1. 配置扩展

config/metric_config.go:

// MetricsConfig This is the config struct for all metrics implementation type MetricsConfig struct { Enable *bool default:"false" yaml:"enable" json:"enable,omitempty" property:"enable" Port string default:"9090" yaml:"port" json:"port,omitempty" property:"port" Path string default:"/metrics" yaml:"path" json:"path,omitempty" property:"path" Protocol string default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol" EnableMetadata *bool default:"false" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata" EnableRegistry *bool default:"false" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry" EnableConfigCenter *bool default:"false" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center" Prometheus *PrometheusConfig yaml:"prometheus" json:"prometheus" property:"prometheus" Aggregation *AggregateConfig yaml:"aggregation" json:"aggregation" property:"aggregation" // added health check configuration HealthCheck *HealthCheckConfig yaml:"health-check" json:"health-check" property:"health-check" rootConfig *RootConfig }

type HealthCheckConfig struct { Enabled *bool default:"false" yaml:"enabled" json:"enabled,omitempty" property:"enabled" Port string default:"8080" yaml:"port" json:"port,omitempty" property:"port" LivePath string default:"/health/live" yaml:"live-path" json:"live-path,omitempty" property:"live-path" ReadyPath string default:"/health/ready" yaml:"ready-path" json:"ready-path,omitempty" property:"ready-path" Timeout string default:"10s" yaml:"timeout" json:"timeout,omitempty" property:"timeout" } 2. 添加常量定义

common/constant/key.go:

const ( HealthCheckHttpEnabledKey = "health-check.enabled" HealthCheckHttpPortKey = "health-check.port" HealthCheckLivePathKey = "health-check.live-path" HealthCheckReadyPathKey = "health-check.ready-path" HealthCheckTimeoutKey = "health-check.timeout" ) common/constant/default.go:

const ( HealthCheckDefaultPort = "8080" HealthCheckDefaultLivePath = "/health/live" HealthCheckDefaultReadyPath = "/health/ready" HealthCheckDefaultTimeout = "10s" ) 3. 创建 HTTP 健康检查服务

server/health.go:

/*

  • Licensed to the Apache Software Foundation (ASF) under one or more
  • contributor license agreements. See the NOTICE file distributed with
  • this work for additional information regarding copyright ownership.
  • The ASF licenses this file to You under the Apache License, Version 2.0
  • (the "License"); you may not use this file except in compliance with
  • the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
    
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License. */

package server

import ( "context" "encoding/json" "net/http" "time" )

import ( "github.com/dubbogo/gost/log/logger" )

import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol/triple/health/triple_health" )

// HealthCheckServer HTTP health check server type HealthCheckServer struct { url *common.URL server *http.Server }

// HealthResponse health check response type HealthResponse struct { Status string json:"status" Timestamp int64 json:"timestamp" Details map[string]string json:"details,omitempty" }

// NewHealthCheckServer creates a new health check server func NewHealthCheckServer(url *common.URL) *HealthCheckServer { return &HealthCheckServer{url: url} }

// Start starts the health check server func (h *HealthCheckServer) Start() { if !h.url.GetParamBool(constant.HealthCheckEnabledKey, false) { return }

mux := http.NewServeMux() port := h.url.GetParam(constant.HealthCheckPortKey, constant.HealthCheckDefaultPort) livePath := h.url.GetParam(constant.HealthCheckLivePathKey, constant.HealthCheckDefaultLivePath) readyPath := h.url.GetParam(constant.HealthCheckReadyPathKey, constant.HealthCheckDefaultReadyPath)

mux.HandleFunc(livePath, h.livenessHandler) mux.HandleFunc(readyPath, h.readinessHandler)

h.server = &http.Server{Addr: ":" + port, Handler: mux}

// Register graceful shutdown callback extension.AddCustomShutdownCallback(func() { timeoutStr := h.url.GetParam(constant.HealthCheckTimeoutKey, constant.HealthCheckDefaultTimeout) timeout, err := time.ParseDuration(timeoutStr) if err != nil { timeout = 10 * time.Second } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if err := h.server.Shutdown(ctx); err != nil { logger.Errorf("health check server shutdown failed, err: %v", err) } else { logger.Info("health check server gracefully shutdown success") } })

logger.Infof("health check endpoints: %s%s, %s%s", port, livePath, port, readyPath) go func() { if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Errorf("health check server error: %v", err) } }() }

// livenessHandler liveness probe handler func (h *HealthCheckServer) livenessHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }

// liveness check - whether the process is alive h.writeResponse(w, "UP", http.StatusOK, map[string]string{ "check": "liveness", }) }

// readinessHandler readiness probe handler func (h *HealthCheckServer) readinessHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return }

// readiness check - whether the service is ready if h.isServiceReady() { h.writeResponse(w, "UP", http.StatusOK, map[string]string{ "check": "readiness", }) } else { h.writeResponse(w, "DOWN", http.StatusServiceUnavailable, map[string]string{ "check": "readiness", "reason": "service not ready", }) } }

// isServiceReady checks if the service is ready func (h *HealthCheckServer) isServiceReady() bool { // todo return true }

// writeResponse writes the response func (h *HealthCheckServer) writeResponse(w http.ResponseWriter, status string, httpStatus int, details map[string]string) { response := HealthResponse{ Status: status, Timestamp: time.Now().UnixMilli(), Details: details, }

w.Header().Set("Content-Type", "application/json") w.WriteHeader(httpStatus)

if err := json.NewEncoder(w).Encode(response); err != nil { logger.Errorf("failed to encode health response: %v", err) } } 检查逻辑(讨论)

  1. 集成到现有系统

metrics/health/health_registry.go:

/*

  • Licensed to the Apache Software Foundation (ASF) under one or more
  • contributor license agreements. See the NOTICE file distributed with
  • this work for additional information regarding copyright ownership.
  • The ASF licenses this file to You under the Apache License, Version 2.0
  • (the "License"); you may not use this file except in compliance with
  • the License. You may obtain a copy of the License at
  • http://www.apache.org/licenses/LICENSE-2.0
    
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License. */

package health

import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/metrics" "dubbo.apache.org/dubbo-go/v3/server" )

func init() { metrics.AddCollector(func(registry metrics.MetricRegistry, url *common.URL) { healthServer := server.NewHealthCheckServer(url) healthServer.Start() }) } 然后在 imports/imports.go 中导入:

_ "dubbo.apache.org/dubbo-go/v3/metrics/health" 用户使用案例...

// isServiceReady checks if the service is ready func (h *HealthCheckServer) isServiceReady() bool { // todo return true } 这里最好设计成interface或者回调,支持用户自定义健康检查逻辑。主要原因是liveness探针是有风险的,直接影响pod重启,因此最好开放给用户。而且本来就绪与否和用户依赖的中间件甚至业务逻辑都是有关系的。

No-SilverBullet avatar Sep 30 '25 07:09 No-SilverBullet