Add support of the embedded storage
Rationale
The kvrocks-controller previously depended on external storage systems such as Apache ZooKeeper or ETCD for metadata management and leader election. This reliance introduces increased operational complexity and user burden. This proposal aims to alleviate these issues by integrating an embedded storage solution.
Implementation Overview
The detailed design can be reviewed in the proposal document.
Key components include:
- storage/persistence/embedded/embedded.go
- storage/persistence/embedded/raft.go
Embedded System
The Embedded struct houses the application logic to manipulate the metadata:
type Embedded struct {
kv map[string][]byte
kvMu sync.RWMutex
snapshotter *snap.Snapshotter
node *raftNode
myID string
PeerIDs []string
quitCh chan struct{}
leaderChangeCh <-chan bool
proposeCh chan string
confChangeCh chan raftpb.ConfChange
}
The kv map serves as the primary data structure, akin to the functionality found in etcd or ZooKeeper. Here's how data operations are handled:
- Reading data (
Get,Exists) directly queries thekvmap. - Writing data (
Set,Delete) utilizes theProposemethod rather than modifyingkvdirectly.
func (e *Embedded) Propose(k string, v []byte) {
var buf strings.Builder
if err := gob.NewEncoder(&buf).Encode(persistence.Entry{Key: k, Value: v}); err != nil {
logger.Get().With(zap.Error(err)).Error("Failed to propose changes")
}
e.proposeCh <- buf.String()
}
A background process consistently reads from the commitCh channel, which receives commits published by raftNode.
for c := range commitCh {
if c == nil {
snapshot, err := e.loadSnapshot()
if err != nil {
logger.Get().With(zap.Error(err)).Error("Failed to load snapshot")
}
if snapshot != nil {
logger.Get().Sugar().Infof("Loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := e.recoverFromSnapshot(snapshot.Data); err != nil {
logger.Get().With(zap.Error(err)).Error("Failed to recover snapshot")
}
}
continue
}
for _, data := range c.data {
var entry persistence.Entry
dec := gob.NewDecoder(bytes.NewBufferString(data))
if err := dec.Decode(&entry); err != nil {
logger.Get().With(zap.Error(err)).Error("Failed to decode message")
}
e.kvMu.Lock()
if entry.Value == nil {
delete(e.kv, entry.Key)
} else {
e.kv[entry.Key] = entry.Value
}
e.kvMu.Unlock()
}
close(c.applyDoneC)
}
Communication between the Embedded system and raftNode occurs via proposeCh and commitCh.
Raft Node
raftNode is explored in raft.go. It initializes its state when created:
if !fileutil.Exist(rc.snapDir) {
if err := os.Mkdir(rc.snapDir, 0750); err != nil {
logger.Get().With(zap.Error(err)).Fatal("Cannot create directory for snapshot")
}
}
rc.snapshotter = snap.New(logger.Get(), rc.snapDir)
oldwal := wal.Exist(rc.walDir)
rc.wal = rc.replayWAL()
Recovery of state occurs prior to regular operations, restoring from snapshots and replaying WAL for uncommitted entries.
Establishment of network communication with peer nodes follows recovery:
rc.transport = &rafthttp.Transport{
Logger: logger.Get(),
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
ErrorC: make(chan error),
DialRetryFrequency: 1,
}
if err := rc.transport.Start(); err != nil {
logger.Get().With(zap.Error(err)).Panic("Failed to start raft HTTP server")
}
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
With go rc.serveChannels(), the system enters the critical event loop, primarily divided into two main goroutines:
Receiving proposals from proposeCh
This goroutine is responsible for handling incoming proposals
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopCh)
}()
Event Loop on Raft State Machine Updates This loop processes state machine updates and manages storage interactions
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
if rd.SoftState != nil {
isLeader := rd.RaftState == raft.StateLeader
rc.leader.Store(rd.Lead)
if rc.isLeader.CAS(!isLeader, isLeader) {
rc.leaderChangeCh <- isLeader
}
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries
// or hardstate to ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
}
rc.wal.Save(rd.HardState, rd.Entries)
// Load snapshot to memory
if !raft.IsEmptySnap(rd.Snapshot) {
rc.raftStorage.ApplySnapshot(rd.Snapshot)
// Notify Embedded to load snapshot
rc.publishSnapshot(rd.Snapshot)
}
// Append entries
rc.raftStorage.Append(rd.Entries)
// Send some metadata required by the etcd/raft framework
rc.transport.Send(rc.processMessages(rd.Messages))
// Send commits to Embedded
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopCh:
rc.stop()
return
}
}
I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins
I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins
I see. However, dolphinscheduler use pom.xml profile to define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modify Makefile and use Go build tags.
@git-hulk What do you think?
I think this should be a plug-in. Users can choose etcd, zookeeper, raft, or even mysql and other external storage. Reference https://github.com/apache/dolphinscheduler/tree/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins
I see. However, dolphinscheduler use pom.xml
profileto define which code should be included and excluded. If we surely want to implement this behaviour, we probably need to modifyMakefileand use Go build tags.@git-hulk What do you think?
It should be fine to include plugins while building and users can choose which engine to use via the configuration file. But from my personal perspective, I prefer encouraging users to use raft + embedded storage instead of the external service if it's ready.
@ptyin This requirement does not have to be completed in this PR. Maybe I will send a proposal after I write a complete plan. Thank you for your suggestion.
Closed this PR since it has been done by #222