mysql-operator icon indicating copy to clipboard operation
mysql-operator copied to clipboard

orchestrator persistence

Open wangqiang163 opened this issue 1 year ago • 1 comments

Can orchestrator write persistence information to etcd?

wangqiang163 avatar Jul 23 '24 07:07 wangqiang163


package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"go.etcd.io/etcd/client/v3"
)

// InstanceKey represents the unique identifier for a database instance.
type InstanceKey struct {
	Hostname string `json:"hostname"`
}

// Instance represents a database instance in a replication topology.
type Instance struct {
	ClusterName       string        `json:"cluster_name"`
	Key               InstanceKey   `json:"key"`
	ReadOnly          bool          `json:"read_only"`
	SlaveLagSeconds   sql.NullInt64 `json:"slave_lag_seconds"`
	IsUpToDate        bool          `json:"is_up_to_date"`
	IsRecentlyChecked bool          `json:"is_recently_checked"`
	IsLastCheckValid  bool          `json:"is_last_check_valid"`
}

// Orchestrator is a simplified struct managing database instances.
type Orchestrator struct {
	etcdClient *clientv3.Client
}

// NewOrchestrator initializes an Orchestrator with an etcd client.
func NewOrchestrator(endpoints []string) (*Orchestrator, error) {
	// Configure etcd client
	cfg := clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	}
	client, err := clientv3.New(cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to connect to etcd: %v", err)
	}
	return &Orchestrator{etcdClient: client}, nil
}

// Close closes the etcd client connection.
func (o *Orchestrator) Close() {
	if o.etcdClient != nil {
		o.etcdClient.Close()
	}
}

// AddInstance saves an instance to etcd.
func (o *Orchestrator) AddInstance(instance Instance) error {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Serialize instance to JSON
	data, err := json.Marshal(instance)
	if err != nil {
		return fmt.Errorf("failed to marshal instance: %v", err)
	}

	// Use hostname as the key for simplicity
	key := fmt.Sprintf("/orchestrator/instances/%s", instance.Key.Hostname)

	// Write to etcd
	_, err = o.etcdClient.Put(ctx, key, string(data))
	if err != nil {
		return fmt.Errorf("failed to write to etcd: %v", err)
	}

	log.Printf("Saved instance %s to etcd", instance.Key.Hostname)
	return nil
}

// GetInstance retrieves an instance from etcd by hostname.
func (o *Orchestrator) GetInstance(hostname string) (*Instance, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	key := fmt.Sprintf("/orchestrator/instances/%s", hostname)

	// Read from etcd
	resp, err := o.etcdClient.Get(ctx, key)
	if err != nil {
		return nil, fmt.Errorf("failed to read from etcd: %v", err)
	}

	if len(resp.Kvs) == 0 {
		return nil, fmt.Errorf("instance %s not found", hostname)
	}

	// Deserialize JSON to Instance
	var instance Instance
	if err := json.Unmarshal(resp.Kvs[0].Value, &instance); err != nil {
		return nil, fmt.Errorf("failed to unmarshal instance: %v", err)
	}

	return &instance, nil
}

// ListInstances retrieves all instances from etcd.
func (o *Orchestrator) ListInstances() ([]Instance, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Get all keys under /orchestrator/instances/
	resp, err := o.etcdClient.Get(ctx, "/orchestrator/instances/", clientv3.WithPrefix())
	if err != nil {
		return nil, fmt.Errorf("failed to list instances from etcd: %v", err)
	}

	var instances []Instance
	for _, kv := range resp.Kvs {
		var instance Instance
		if err := json.Unmarshal(kv.Value, &instance); err != nil {
			log.Printf("Failed to unmarshal instance for key %s: %v", kv.Key, err)
			continue
		}
		instances = append(instances, instance)
	}

	return instances, nil
}

func main() {
	// Initialize Orchestrator with etcd endpoints
	orch, err := NewOrchestrator([]string{"http://localhost:2379"})
	if err != nil {
		log.Fatalf("Failed to initialize orchestrator: %v", err)
	}
	defer orch.Close()

	// Create an instance (similar to your snippet)
	instance := Instance{
		ClusterName: "my_cluster",
		Key:         InstanceKey{Hostname: "db1.example.com"},
		ReadOnly:    true,
		SlaveLagSeconds: sql.NullInt64{
			Valid: false,
			Int64: 0,
		},
		IsUpToDate:        true,
		IsRecentlyChecked: true,
		IsLastCheckValid:  true,
	}

	// Add instance to etcd
	if err := orch.AddInstance(instance); err != nil {
		log.Fatalf("Failed to add instance: %v", err)
	}

	// Retrieve and print the instance
	retrieved, err := orch.GetInstance("db1.example.com")
	if err != nil {
		log.Fatalf("Failed to get instance: %v", err)
	}
	fmt.Printf("Retrieved instance: %+v\n", retrieved)

	// List all instances
	instances, err := orch.ListInstances()
	if err != nil {
		log.Fatalf("Failed to list instances: %v", err)
	}
	for _, inst := range instances {
		fmt.Printf("Listed instance: Cluster=%s, Host=%s, UpToDate=%v\n",
			inst.ClusterName, inst.Key.Hostname, inst.IsUpToDate)
	}
}

ljluestc avatar May 24 '25 20:05 ljluestc