[MIT 6.824 分布式系统课程] Lab2 Raft 心得

Raft struct 的成员

1 Raft 节点的角色,可以使用Go常量

const (
	Follower = iota         // 0
	Candidate		// 1
	Leader			// 2

2 使用Buffered Channel进行异步通信,比如等待心跳包结果等情况,我定义了如下Channel

	chanHeartbeat chan bool // 收到心跳
	chanWinVote chan bool	// 赢得选举
	chanGrantVote chan bool // 获得选举票

        chanApply chan ApplyMsg // 用来commit的channel

Make()中需要初始化Channel为Buffered Channel

	rf.chanWinVote = make(chan bool, 10)
	rf.chanGrantVote = make(chan bool, 10)
	rf.chanHeartbeat = make(chan bool, 10)

3 Raft struct大部分成员都是论文的Figure 2的内容

4 发送broadcastRequestVote后,需要进行voteCount 计数,以确定是否赢得Vote,所以 Raft struct需要增加voteCount成员



mu        sync.Mutex          // Lock to protect shared access to this peer's state



加锁时要小心加锁,以确保不会发生死锁,能不加锁就不加锁。此外测试时使用go test -race来判断是否发生冲突

善用 go 协程机制




// RequestVote中判断日志是否up-to-date,应该如下实现
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
			(args.LastLogTerm > rf.logs[rf.getLastIndex()].Term ||
				(args.LastLogTerm == rf.logs[rf.getLastIndex()].Term && args.LastLogIndex >= rf.getLastIndex())) {
		// Raft determines which of two logs is more up-to-date by comparing the
		// index and term of the last entries in the logs. If the logs have last
		// entries with different terms, then the log with the later term is
		// more up-to-date. If the logs end with the same term, then whichever
		// log is longer is more up-to-date.
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
	} else {
		reply.VoteGranted = false
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	/*	Receiver implementation:
	1. Reply false if term < currentTerm (§5.1)
	2. Reply false if log doesn’t contain an entry at prevLogIndex
	whose term matches prevLogTerm (§5.3)
	3. If an existing entry conflicts with a new one (same index
	but different terms), delete the existing entry and all that
	follow it (§5.3)
	4. Append any new entries not already in the log
	5. If leaderCommit > commitIndex, set commitIndex =
		min(leaderCommit, index of last new entry)	*/
	rf.chanHeartbeat <- true

	if args.Term > rf.currentTerm {
		rf.log("AppendEntries Term Bigger, convert to follower")
		rf.role = Follower
		rf.currentTerm = args.Term
		rf.votedFor = -1
	DPrintf("1 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
	reply.Term = rf.currentTerm
	reply.NextTryIndex = -1	//默认为-1,代表index--,此处为简化逻辑,除日志不匹配的其他情况都统一--
	if args.Term < rf.currentTerm {
		reply.Success = false
	} else if len(rf.logs) <= args.PrevLogIndex || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
		// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
		if len(rf.logs) <= args.PrevLogIndex{
			DPrintf("logsLen(%d) prevLogIndex(%d)",
				len(rf.logs), args.PrevLogIndex)
		} else {
			DPrintf("logsLen(%d) prevLogIndex(%d) logsTerm(%d) prevLogTerm(%d)",
				len(rf.logs), args.PrevLogIndex, rf.logs[args.PrevLogIndex].Term, args.PrevLogTerm)
		if len(rf.logs) > args.PrevLogIndex {
			// 如果日志内容不匹配,找到同Term中最早的Index,直接回退到那个Index
			term := rf.logs[args.PrevLogIndex].Term
			for reply.NextTryIndex = args.PrevLogIndex - 1;
				reply.NextTryIndex > 0 && rf.logs[reply.NextTryIndex].Term == term; reply.NextTryIndex-- {}
		} else {
			// 如果日志长度不匹配,则以当前的日志长度为准
			reply.NextTryIndex = rf.getLastIndex() + 1
		reply.Success = false
	} else {
		// If an existing entry conflicts with a new one (same index but different terms),
		// delete the existing entry and all that follow it.
		// ------------------------
		// The if here is crucial. If the follower has all the entries the leader sent,
		// the follower MUST NOT truncate its log. Any elements following the entries
		// sent by the leader MUST be kept. This is because we could be receiving an
		// outdated AppendEntries RPC from the leader, and truncating the log would
		// mean “taking back” entries that we may have already told the leader that
		// we have in our log.
		// ------- wrong code -------
		// preLogs := rf.logs[:args.PrevLogIndex + 1]
		// preLogs = append(preLogs, args.Entries...)
		// rf.log(fmt.Sprintf("%d", preLogs))
		// rf.logs = preLogs
		// --------------------------
		rf.log(fmt.Sprintf("Log append: rfLogs(%d) entries(%d) preLogIndex(%d)",
			rf.logs, args.Entries, args.PrevLogIndex))
		for i := 0; i < len(args.Entries); i++ {
			if i + args.PrevLogIndex + 2 > len(rf.logs) {
				// 如果Entries 长过了 rf.logs,那么直接append
				rf.logs = append(rf.logs, args.Entries[i])
			} else if rf.logs[i + args.PrevLogIndex + 1].Term != args.Entries[i].Term {
				rf.logs = rf.logs[:i + args.PrevLogIndex + 1]
				rf.logs = append(rf.logs, args.Entries[i])
		rf.log(fmt.Sprintf("Log append result: rfLogs(%d)", rf.logs))
		if args.LeaderCommit > rf.commitIndex {
			if args.LeaderCommit > rf.getLastIndex() {
				rf.commitIndex = rf.getLastIndex()
			} else {
				rf.commitIndex = args.LeaderCommit
			if rf.commitIndex > rf.lastApplied {
				go rf.commit()
		reply.Success = true
	DPrintf("2 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
	rf.log(fmt.Sprintf("reply AppendEntries from %d: %t", args.LeaderId, reply.Success))


// 还有  sendAppendEntries 返回成功后的处理
	if reply.Success {
		// If successful: update nextIndex and matchIndex for follower (§5.3)
		// 此处注意不能简单更新为 rf.getLastIndex(),因为sendAppendEntries是异步的,send的时候最新的index有可能已经变了
		rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
		rf.nextIndex[server] = rf.matchIndex[server] + 1


可以给Raft struce定义一个log方法,打印一些基本信息,如Term等,这样就不用每次都拼接。例如

func (rf *Raft) log(message string) {
	// 封装日志方法
	if Debug > 0 {
		log.Printf("node(%d)role(%d)term(%d)commitIndex(%d)logs(%d): %s",, rf.role, rf.currentTerm, rf.commitIndex, rf.logs, message)





	// 选举超时,Paper中写 150–300ms 随机,测试环境是1000ms
	electionTimeout := func() time.Duration {
		return time.Duration(500 + rand.Intn(150)) * time.Millisecond
	// broadcastTime << electionTimeout << MTBF(平均无故障工作时间)
	// << 代表数量级的差别
	heartBeatInterval := time.Duration(50) * time.Millisecond


for {
		rf.log("loop start")
		role := rf.role
		switch role {
		case Follower:
			// Respond to RPCs from candidates and leaders
			// If election timeout elapses without receiving AppendEntries RPC from current leader
			// or granting vote to candidate: convert to candidate
			select {
				case <-rf.chanHeartbeat:
				case <-rf.chanGrantVote:
				case <-time.After(electionTimeout()):
					rf.log("follower no heartbeat/vote and timeout, convert to candidate")
					rf.role = Candidate
		case Candidate:
			// On conversion to candidate, start election:
			// - Increment currentTerm
			// - Vote for self
			// - Reset election timer
			// - Send RequestVote RPCs to all other servers
			// If votes received from majority of servers: become leader
			// If AppendEntries RPC received from new leader: convert to follower
			// If election timeout elapses: start new election
			rf.votedFor =
			rf.voteCount = 1	// 自己算一票
			go rf.broadcastRequestVote()
			select {
			case <-rf.chanWinVote:
				if len(rf.chanHeartbeat) > 0 {
				rf.log("win vote, convert to leader")
				if rf.role != Leader {
					// 有一种情况是 chanWinVote 和 chanHeartbeat同时收到消息,这时候Go会随机选择一个case
					// 但是 chanWinVote中的数据还在,会干扰到下次选举
					// 临时的解决办法是在配置chanWinVote的时候同时设定role,此处二次验证
					rf.log("chanWinVote has outdated message, ignore it.")
				// 初始化leader的变量
				rf.nextIndex = make([]int, len(rf.peers))
				rf.matchIndex = make([]int, len(rf.peers))
				for i := range rf.peers {
					rf.nextIndex[i] = rf.getLastIndex() + 1
					rf.matchIndex[i] = 0
			case <-rf.chanHeartbeat:
				rf.log("received heartbeat, convert to follower")
				rf.role = Follower
			case <-time.After(electionTimeout()):
				rf.log("election timeout, skip")
		case Leader:
			// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
			// 		repeat during idle periods to prevent election timeouts (§5.2)
			// If command received from client: append entry to local log,
			// 		respond after entry applied to state machine (§5.3)
			// If last log index ≥ nextIndex for a follower:
			// 		send AppendEntries RPC with log entries starting at nextIndex
			// - If successful: update nextIndex and matchIndex for follower (§5.3)
			// - If AppendEntries fails because of log inconsistency:
			// 		decrement nextIndex and retry (§5.3)
			// If there exists an N such that N > commitIndex, a majority
			// 		of matchIndex[i] ≥ N, and log[N].term == currentTerm:
			// 		set commitIndex = N (§5.3, §5.4).



ninehills avatar Feb 28 '18 09:02 ninehills


ninehills avatar Feb 28 '18 09:02 ninehills

选举状态机的实现里, 在sleep心跳时长的时候, 没有办法接受处理AppendEntriesRPC

hd5970 avatar Jul 31 '18 02:07 hd5970

选举状态机的实现里, 在sleep心跳时长的时候, 没有办法接受处理AppendEntriesRPC

broadcastAppendEntries()里会用协程的方式启动多个 sendAppendEntries,Reply由此处理。

ninehills avatar Feb 15 '19 06:02 ninehills