mysql-operator
mysql-operator copied to clipboard
orchestrator persistence
Can orchestrator write persistence information to etcd?
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
"go.etcd.io/etcd/client/v3"
)
// InstanceKey represents the unique identifier for a database instance.
type InstanceKey struct {
Hostname string `json:"hostname"`
}
// Instance represents a database instance in a replication topology.
type Instance struct {
ClusterName string `json:"cluster_name"`
Key InstanceKey `json:"key"`
ReadOnly bool `json:"read_only"`
SlaveLagSeconds sql.NullInt64 `json:"slave_lag_seconds"`
IsUpToDate bool `json:"is_up_to_date"`
IsRecentlyChecked bool `json:"is_recently_checked"`
IsLastCheckValid bool `json:"is_last_check_valid"`
}
// Orchestrator is a simplified struct managing database instances.
type Orchestrator struct {
etcdClient *clientv3.Client
}
// NewOrchestrator initializes an Orchestrator with an etcd client.
func NewOrchestrator(endpoints []string) (*Orchestrator, error) {
// Configure etcd client
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %v", err)
}
return &Orchestrator{etcdClient: client}, nil
}
// Close closes the etcd client connection.
func (o *Orchestrator) Close() {
if o.etcdClient != nil {
o.etcdClient.Close()
}
}
// AddInstance saves an instance to etcd.
func (o *Orchestrator) AddInstance(instance Instance) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Serialize instance to JSON
data, err := json.Marshal(instance)
if err != nil {
return fmt.Errorf("failed to marshal instance: %v", err)
}
// Use hostname as the key for simplicity
key := fmt.Sprintf("/orchestrator/instances/%s", instance.Key.Hostname)
// Write to etcd
_, err = o.etcdClient.Put(ctx, key, string(data))
if err != nil {
return fmt.Errorf("failed to write to etcd: %v", err)
}
log.Printf("Saved instance %s to etcd", instance.Key.Hostname)
return nil
}
// GetInstance retrieves an instance from etcd by hostname.
func (o *Orchestrator) GetInstance(hostname string) (*Instance, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
key := fmt.Sprintf("/orchestrator/instances/%s", hostname)
// Read from etcd
resp, err := o.etcdClient.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to read from etcd: %v", err)
}
if len(resp.Kvs) == 0 {
return nil, fmt.Errorf("instance %s not found", hostname)
}
// Deserialize JSON to Instance
var instance Instance
if err := json.Unmarshal(resp.Kvs[0].Value, &instance); err != nil {
return nil, fmt.Errorf("failed to unmarshal instance: %v", err)
}
return &instance, nil
}
// ListInstances retrieves all instances from etcd.
func (o *Orchestrator) ListInstances() ([]Instance, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Get all keys under /orchestrator/instances/
resp, err := o.etcdClient.Get(ctx, "/orchestrator/instances/", clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to list instances from etcd: %v", err)
}
var instances []Instance
for _, kv := range resp.Kvs {
var instance Instance
if err := json.Unmarshal(kv.Value, &instance); err != nil {
log.Printf("Failed to unmarshal instance for key %s: %v", kv.Key, err)
continue
}
instances = append(instances, instance)
}
return instances, nil
}
func main() {
// Initialize Orchestrator with etcd endpoints
orch, err := NewOrchestrator([]string{"http://localhost:2379"})
if err != nil {
log.Fatalf("Failed to initialize orchestrator: %v", err)
}
defer orch.Close()
// Create an instance (similar to your snippet)
instance := Instance{
ClusterName: "my_cluster",
Key: InstanceKey{Hostname: "db1.example.com"},
ReadOnly: true,
SlaveLagSeconds: sql.NullInt64{
Valid: false,
Int64: 0,
},
IsUpToDate: true,
IsRecentlyChecked: true,
IsLastCheckValid: true,
}
// Add instance to etcd
if err := orch.AddInstance(instance); err != nil {
log.Fatalf("Failed to add instance: %v", err)
}
// Retrieve and print the instance
retrieved, err := orch.GetInstance("db1.example.com")
if err != nil {
log.Fatalf("Failed to get instance: %v", err)
}
fmt.Printf("Retrieved instance: %+v\n", retrieved)
// List all instances
instances, err := orch.ListInstances()
if err != nil {
log.Fatalf("Failed to list instances: %v", err)
}
for _, inst := range instances {
fmt.Printf("Listed instance: Cluster=%s, Host=%s, UpToDate=%v\n",
inst.ClusterName, inst.Key.Hostname, inst.IsUpToDate)
}
}