grid icon indicating copy to clipboard operation
grid copied to clipboard

Consider Raft instead of/in addition to ETCD

Open araddon opened this issue 7 years ago • 6 comments

It would be great to have easier to get going and less dependencies, less overlap with emerging Norms in Cloud Native Distributed Compute.

  1. less dependencies (than etcd) on import / vendor.
  2. less run-time dependencies to need a server (running etcd).
  3. less overlap w Cloud Native Norms (kube, istio/envoy).

So possibly make the etcd an interface for Coordinator or something. Let us say that currently etcd provides these functions:

  1. Cluster Seed (find/connect an existing cluster)
  2. Node/Cluster discovery and membership. (add/remove nodes as they come & go)
  3. Mailbox Discovery & Mutex
  4. Durable Storage for https://github.co/lytics/flo

Since Grid started there are a few interesting things to consider in the distributed-compute space:

  • Kubernetes more and more kube may be the new OS, so software maybe written and targeted for this os. ie, depend on it. Kube could provide some of the discovery that etcd provides (but not mailbox level).
  • Envoy & ISTIO Istio/envoy are nice for allowing clients to be thinner push the tracing, backoff, discovery away from client. Not sure we can push the mailbox mutex all the way down, but possibly with envoy similar to the countour project below?

research items

  • https://github.com/heptio/contour interesting because usage of envoy wout istio
  • Serf for Discovery https://github.com/travisjeffery/jocko/tree/master/serf
  • Embedded Raft for Mutex "Mailboxes" https://github.com/travisjeffery/jocko/tree/master/raft & https://github.com/rqlite/rqlite/blob/master/store/store.go

araddon avatar Nov 07 '17 16:11 araddon

I like the idea of embedding raft, as it would simplify our dependencies. I think Istio\envoy/kubernetes could be components of embedding raft (discovery/seeding), but before we dive into thier functionally we should discuss what it'd take to embed raft in the general sense.

epsniff avatar Nov 07 '17 17:11 epsniff

Questions/Thoughts:
  • If we embed raft, does that mean we'd need to run a min number of nodes (3)
  • How do we deal with large clusters with more than 5 nodes. Do we using multi-raft or do we do what etcd v2 did, and any node after 5 is just a listener and doesn't participate into elections.
  • Or do use multi-raft rings https://www.cockroachlabs.com/blog/scaling-raft/, and partition the consensus groups out over nodes. For example we could create one consensus groups per ActorType, etc.
  • During a cold-startup, cluster starts will need to discover at least one seed node so that each new node can join the serf/gossip ring. We could use use istio/k8s API for this, but I think we should make it pluggable and support any method.

epsniff avatar Nov 07 '17 17:11 epsniff

Overall I like the idea. Here are a few thoughts to further the idea:

1.

Right now both the grid Server and Client take etcd Client as an argument. That client would be replaced with something akin to the DB struct in database/sql.

Specifically it would be a struct, NOT an interface. There would be a driver sub-package that would allow plug-in drivers, just like in database/sql/driver or in flo/storage/driver

2.

BUT this replacement for the etcd client should not be considered something like a database, it is NOT a database, it really is a partial exposure of the RAFT FSM.

The struct should be in the top level grid package, so that code using it looks like:

    import (
        ...
        _ "github.com/lytics/grid/fsm/driver/memraft"
    )


    fsm, err := grid.NewFSM(...)
    ...

    client, err := grid.NewClient(fsm, ...)
    ...

    server, err := grid.NewServer(fsm, ...)
    ...

3.

The grid FSM should be able to use, and only use, messages registered with grid, via the Register function:

    // Register our message types.
    grid.Register(SchemaStateMsg{})

This will allow user code do do things like the following:

    // Use the FSM.
    err := fsm.Set(&SchemaStateMsg{ ... })

If the Set method returns without an error, then the caller knows that a quorum of peers has applied the message.

The FSM sends message as interface{}, not as bytes. This is fundamental to grid, that users register message types, and the grid library handles the encoding and decoding of those messages transparently.

In keeping with this this philosophy, the grid FSM must do the same.

4.

The FSM must be accessible to actors, but should not be retrievable from the Context that actors receive.

The actors cannot get their Client or Server from the Context they receive, and there is an important reason for this.

In trivial cases an actual program using grid will only communicate and affect its own namespace, but more realistically it will actually affect many namespaces.

Because of this it is up to the user to define how they want to code this kind of dependency injection. In our own use of grid we have the concept of providers.

Hence in all three cases, grid.Client, grid.Server, and grid.FSM, it is up to the user to inject these into their actors or code.

5.

The FSM struct would probably have just two methods:

// Applier applies incoming RAFT events
// to some state machine. An applier
// cannot return an error to affect the
// RAFT log, since it has already been
// written.
type Applier interface {
    Apply(v interface{})
}

type FSM struct {
    ...
}

// Set a value, which will be written to the RAFT
// log, once committed each applier will have its
// Apply method called with the value.
func (fsm *FSM) Set(v  interface{}) error {
    ...
}

// Use the applier.
func (fsm *FSM) Use(a Applier) error {
    ...
}
  • Why does the FSM not use keys and values like a map?
  • Why does the FSM not have a get method?
  • Why does the FSM not have watch method?

The intent of RAFT is that it can drive arbitrary "state machines", and it will handle the replication for you, you just provide the state machine.

The grid.FSM could have all sorts of crazy ways to "insert" special state machines into it, but none of that is needed.

User code that wants to "watch" or "get" values simply registers an Applier, every time an update happens it will get called, covering the use case of gets, watches, and write-ahead-log restorations.

RAFT is suitable beyond simple Key-Value databases, there is no reason to force the use of the FSM as just a key value database. Users that want a key value database can use the FSM like so:

type DB map[string]string

func (db DB) Apply(v interface{}) {
    kv, ok := v.(*KV)
    if !ok {
        // Ignore message types
        // that are not related to
        // this state machine.
        return
    }
    db[kv.Key] = kv.Val
}

func main() {
    fsm, err := grid.NewFSM(...)
    ...

    // KV is a protobuf message type.
    err = grid.Register(KV{})
    ...

    // Our "state machine" is a simple map.
    db := DB{}

    // Use the db state machine.
    err = fsm.Use(db)
    ...
}

func somewhereElseInTheCode() {
    err := fms.Set(&KV{
        Key: "schema-v1", 
        Val: "create table ( ... )",
    })
    if err == nil {
        // Quorum of FSM peers have applied the change.
    } else {
        panic(...)
    }
}

6.

The grid libraries depends on the registration of peer, actor, and mailbox entries in etcd. It uses each entry as a mutex. The FSM change will totally change the registry package in grid, as it will need to become a Applier, rather than what it is today.

The grid library will need to create a set of messages, something like the following:

PeerMsg
ActorMsg
MailboxMsg

The grid library will need a collection of Appliers that listen for these message types and from them provide the same set of capabilities as it currently has.

RAFT guarantees that each FSM will receive a write-ahead-log like sequence of messages. If each FSM applies the messages in order their state will match, hence they will agree on state.

We will need to implement a mutual exclusion algorithm on top of the RAFT log. In other words RAFT itself just guarantees that a set of FSM will see the same sequence of messages via their Apply methods, but RAFT itself does not actually implement mutual exclusion.

There are many mutual exclusion algorithms that we can implement with that consistent log application guarantee I think. The book "Distributed Systems" by Wan Fokkink describes three mutual exclusion algorithms, and we may be able to find even a simpler one since RAFT takes care of the delivery in a specific order.

mdmarek avatar Nov 15 '17 17:11 mdmarek

@araddon Have you played with the hashicorp/raft to know if the method:

Apply(l *raft.Log) interface{}

can return an error that will prevent the log entry from getting applied? That would change some of the comments I made above.

mdmarek avatar Nov 15 '17 21:11 mdmarek

have not played with it, will experiment on that.

araddon avatar Nov 15 '17 23:11 araddon

https://github.com/hashicorp/raft/blob/master/future.go

https://github.com/hashicorp/raft/blob/master/fsm.go#L72

araddon avatar Nov 18 '17 22:11 araddon

https://github.com/k3s-io/kine Has nats as of last week

this gets you etcd with nats doing the raft

It allows global etcd by just using the standard Nats tools

gedw99 avatar May 17 '23 16:05 gedw99