Raft算法及其实现(Lab 2A, Lab2B, Lab 2C)

前言

这是分布式计算实验的其中一个小作业,跟随的是MIT-6.824-2022(现在是6.5840)课程的实验进行实现。Lab2主要实现的是一个Raft的功能,其中Lab2有ABCD四个部分,在本篇博客中仅实现了其ABC三个部分,Lab2D部分将随着后续Lab3的实验一同出现。

本系列其他链接

LAB2ABC LAB2D & LAB3 Lab4
本文 还得等等

所述实验均有完整的配有中文注释的代码,已上传至Github,详见lankoestee/MIT-6.824-2022

下面的东西还是有可能存在以下理论错误的,敬请担待。

Raft算法及作业任务基本介绍

Raft是一种用于替代Paxos,Raft的目标是提供更清晰的逻辑分工使得算法本身能被更好地理解,同时它安全性更高,并能提供一些额外的特性。Raft能为在计算机集群之间部署有限状态机提供一种通用方法,并确保集群内的任意节点在某种状态转换上保持一致。Raft算法的开源实现众多,在Go、C++、Java以及 Scala中都有完整的代码实现。Raft这一名字来源于"Reliable, Replicated, Redundant, And Fault-Tolerant"(“可靠、可复制、可冗余、可容错”)的首字母缩写。

Raft并不是一套孤立的算法,而是由许多不同的部分叠加而成,包括集群选主(Lab 2A)、日志同步(Lab 2B)、安全性三个大的部分,以及一些附加功能的实现,譬如状态信息持久化(Lab 2C)以及快照机制(Lab 2D)。MIT的6.824课程对其有完整的实验步骤阐述,并提供了对应的待补全代码以供我们进行实现。

在进行Raft算法的实现之前,需要将其中的重要概念进行详述。一是Raft算法中相关的服务器角色,Raft算法一共包括3类的角色,每台服务器均属于其中的一种。

  • Leader(领袖):领袖由群众投票选举得出,每次选举,只能选出一名领袖;
  • Candidate(候选人):当没有领袖时,某些跟随者可以成为候选人,然后去竞争领袖的位置;
  • Follower(跟随者):与领袖通过心跳连接进行通信,具有领袖选举的投票权。

相较于其他多主的算法而言,Raft的Leader是唯一的,在同一集群中仅有一个的,是需要通过选举机制进行新主的选取的。Leader会负责集群算法的核心实现,对于众多的跟随者进行信息通信,同时Leader上的日志记录是集群日志的主副本。

Raft作为多集群的计算机设备之间的一种信息交互算法,一个设计正确的Raft应该具备如下的几个特点,这也是6.824中的Lab 2所要求我们实现的。

  • 集群选主:在Leader宕机或网络超时时,剩余服务器能够自动选取新的Leader,以继续提供服务;
  • 日志同步:当外部应用程序将日志操作发送至计算机集群时,Raft算法将会把这个日志信息同步到集群中所有的服务器上去;
  • 安全性:对于集群选主和日志同步中的操作,必须保证操作的安全性,不能够向上层应用程序返回错误的信息。

据此,我们可以进行相关的实现了。

集群选主(Lab 2A)

任务分析

在上面的简介中曾经提到了Raft算法中的3中状态,分别是领导者(Leader)、候选人(Candidate)以及跟随者(Follower)。集群选主,就是所有计算机共同进行民主选举,在Candidate中选举一名为新的Leader,负责该集群的整体任务。

上述所说的3类状态,是存在转换关系的,其状态转换流程如图1所示。

图1 Raft状态转换流程

在起始状态时候,所有节点都是处于Follower的状态,他们将主动开始进行Leader选举,并在Leader发生宕机的时候自动的重新进行选主。

为了实现Raft的集群选主功能,原论文图2中的这张表格是非常重要的,如图2所示。它规定了一系列需要进行传递的操作,这将方便后续的诸多实现。

图2 Raft结构体组成

对于一般的,没有例外情况的集群选主而言,其经历如下的过程。

  1. 集群初始化阶段,所有节点的初始任期编号均为0,角色均为Follower。并且,每个节点产生一个在一定限度范围内随机的心跳超时时间,此时的集群没有Leader。
  2. 时间推移,心跳超时时间最短的节点发生了心跳超时,它将身份转变为Candidate,并向其他节点发送请求投票信息,同时自己为自己投一票,任期编号+1。
  3. Follower在收到投票信息后,如果请求投票的Candidate符合投票要求,Follower则会将这一票投给Candidate,向其发送投票信息,并将自己的任期+1。
  4. 当发起投票的Candidate节点收到大多数的选票后,它将成为本次任期的领导者。并向其他Follower节点时刻发送心跳信息,防止其出现心跳超时。

上述过程可以用图3进行表示。

图3 一般情况下的投票选主流程

但是很明显,这只是最为一般的情况,在现实的分布式系统中,如此顺滑的实现基本是不可能的,会发生诸多的意外情况。

其一是若一个发生超时的Follower转变为Candidate后接受到一个Leader发来的AppendEntries RPC(Remote Procedure Call,远程过程调用),如果领导者的任期(包含在其RPC中)至少与Candidate的当前时间周期一样大,那么Candidate将该Leader认为是合法的,并恢复到Follower状态。这说明本次发起的选举是无效的,将恢复到新Leader任期下的Follower状态。

其二是Leader失联后,有多个合法的Candidate发起选举,这将很可能导致没有一个Candidate获得超过半数的投票,这将导致脑裂,保持僵局。Raft采用了随机选举超时时间的机制预防这一点,当未能在一定时间选出Leader的情况下,首先超过选举超时时间的节点将会发起下一轮投票,由于各个Candidate的选举超时时间是随机的,故较高的概率不会在此出现多个Candidate进行选举的过程,只要获得过半投票,便可以作为新的Leader开启新任期。

其三是旧有节点发生宕机后,重新接入集群后,仍会认为自己是Leader,向外界发送AppendEntries RPC,而其他节点在接收到该信息后,由于旧有Leader节点任期小于本地节点任期,将会拒绝执行。当旧有Leader节点接收到新的Leader发出的任期更大的AppendEntries RPC后,将会自动转为Follower状态。

上面的三种情况是在选主中最主要会出现的几种情况,他们共同保证的选主系统的完整性。仅有一种可能使得其无法选出Leader,即有超过一半的节点宕机,此时Raft算法将无法运行。但在此,这种可能性微乎其微,不是我们需要考虑的问题。

功能设计和实现

基本结构体实现

根据图2,我们可以在raft上进行对应结构体的实现,其中根据raft原论文中的示例,认为日志条目下标从 1 开始,任期号也从 1 开始,0 则表示服务器上暂时没有日志条目,这样更加方便进行操作,另外越界的下标一律将任期表示为 - 1。根据图2表格中的State部分,可以得到如下的Raft基本结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// 所有服务器上的持久状态
currentTerm int // 服务器任务的最新任期,单调递增
votedFor int // 为哪个节点投票,如果没有投票则为null(-1)
log LogEntries // 日志条目

// 所有服务器上的易失状态
commitIndex int // 表示日志条目中已提交条目的最高下标
lastApplied int // 表示应用于状态机的最高日志条目下标

// 领导者上的易失状态
nextIndex []int // 对于每一个服务器,需要发送给他的下一个日志条目的索引值
matchIndex []int // 对于每一个服务器,已经复制给他的日志的最高索引值

// 本地状态
leaderId int // 领导者的ID
state RaftState // 节点状态
heartbeatTime time.Time // 心跳超时时间
electionTime time.Time // 选举超时时间
}

其中前5个状态是源代码自带的状态,我们仅需要进行使用,无需进行维护。而所有的服务器上都具有的持久状态即是判断进行选主判断的几个重要状态,Raft协议通过他们来保证选主的安全性。易失状态在Lab 2A的选主实验中暂时用不到,而最下的本地状态并非Raft论文中所带有的状态,是为了方便后续的实现而赋予的状态。

对于请求投票信息而言,他是一个特殊的RPC,用于向其他节点发起投票请求。Raft论文的图2也说明了RequestVote RPC的基本数据结构。同时,我们也为请求投票信息的回复(是或否)设计了一个数据结构,这两个结构如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
// RequestVote RPC 请求投票信息,论文图2所示
type RequestVoteArgs struct {
Term int // 候选人的任期
CandidateId int // 请求选票的候选人的ID
LastLogIndex int // 候选人的最后日志条目的索引值
LastLogTerm int // 候选人最后日志条目的任期号
}

// RequestVote RPC reply 请求投票信息的回复
type RequestVoteReply struct {
Term int // 当前任期号,以便于候选人去更新自己的任期号
VoteGranted bool // 候选人赢得了此张选票时为真
}

上述的基本数据结构以及AppendEntries RPC(可以充当心跳连接,将在Lab 2B中进行介绍)以及其Reply,5种数据结构将足以是我们进行集群选主。

发起选举函数实现

当Follower发生心跳超时或选举超时后,都将发起一轮新的选举,借助RequestVote RPC,我设计了raiseElection函数发起选举。发起选举过程中,Follower节点将自己的状态变为Candidate,并增加自身任期,给自己投票,重置自己的超时选举时间,准备RequestVote RPC信息并向其他节点进行发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 发起选举
func (rf *Raft) raiseElection() {
rf.state = Candidate
rf.currentTerm++

// 为自己投票
rf.votedFor = rf.me

// 重置选举超时时间
rf.setElectionTimeout(randElectionTimeout())
lastLogIndex, lastLogTerm := rf.log.lastLogInfo()

// 记录请求投票信息
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}

// 记录目前投票结果
voteCount := 1
var once sync.Once

// 并发向其他节点发送请求投票信息
for peer := range rf.peers {
if peer == rf.me {
continue
}
go rf.candidateRequestVote(&voteCount, args, &once, peer)
}
}

Follower投票函数实现

Follower节点在收到RequestVote RPC后,将会进入一个投票合法性的判断逻辑,选择投票或不投票,我将其写为了RequestVote函数。在该函数中,若节点接收到Candidate发来的RPC小于自己的任期,则说明投票发起是无效的,将不会投票。同样的,如果在本轮投票中已经投过票,也将不会对新的Candidate进行投票。只有在本轮尚未投票,且Candidate任期大于等于本节点任期时才会发起投票。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 投票实现核心函数
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.VoteGranted = false // 默认拒绝投票

// 如果candidate的任期小于当前任期,拒绝投票
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}

// 如果candidate的任期大于当前任期,更新任期
rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

// 如果没有投票或者已经投票给了candidate,且candidate的日志至少和自己一样新,投票给candidate
lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
if lastLogTerm < args.LastLogTerm ||
(lastLogTerm == args.LastLogTerm && lastLogIndex <= args.LastLogIndex) {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.setElectionTimeout(randElectionTimeout())
}
}
}

Candidate收到投票后的处理

在Candidate发起投票后,如何处理汇总而来的投票信息也是重要的,在等待投票信息的过程中,Candidate会时刻监视RequestVoteReply,若发生合法投票则将自身所获得的票数+1,直至获得选票过半,直接发布当选信息AppendEntries RPC。但若出现收到的选票任期大于自身任期时(无论是正常投票还是拒绝投票),则说明自身发起的该轮投票无效,在取消投票的同时要更新自身的日期。若收到Follower发来的拒绝投票回复,则直接说明认为本次投票无效,将会将自身转变为Follower。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 处理请求投票信息
func (rf *Raft) candidateRequestVote(voteCount *int, args *RequestVoteArgs, once *sync.Once, server int) {
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(server, args, reply)

// 如果请求投票信息发送成功
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term < rf.currentTerm {
return
}

// 检查收到的投票任期是否大于当前任期,如果是,转变为跟随者
rf.checkTerm(reply.Term)

// 如果收到同意投票
if reply.VoteGranted {
*voteCount++
Debug(dVote, "S%d <- S%d Get a yes vote at T%d.", rf.me, server, rf.currentTerm)

// 如果收到多数节点的投票:成为领导者
if *voteCount > len(rf.peers)/2 {
once.Do(func() {
rf.state = Leader
lastLogIndex, _ := rf.log.lastLogInfo()
for peer := range rf.peers {
rf.nextIndex[peer] = lastLogIndex + 1
rf.matchIndex[peer] = 0
}

// 成为领导者,向其他节点发送AppendEntries RPC,宣布胜选
rf.sendEntries(true)
})
}
} else { // 如果收到拒绝投票:转变为跟随者
rf.state = Follower
}
}
}

// 检查是否需要更新任期以及转变为Follower状态,辅助函数
func (rf *Raft) checkTerm(term int) bool {
if rf.currentTerm < term {
rf.state = Follower // 转变为跟随者
rf.currentTerm = term // 更新任期
rf.votedFor = -1 // 重置投票
return true
}
return false
}

接收AppendEntries RPC后的处理

接收到Leader发来的AppendEntries RPC后,将会涉及到日志同步的相关问题,我们按下不表。但是在选举方面,如果Candidate收到一个AppendEntries RPC,且任期号相同或大于,则直接说明本次发起的选举是无效的,仍然存在一个主导的Leader,此时Candidate需要转变为普通的Follower。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// AppendEntries Follower接收Leader的追加/心跳包
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// 先默认拒绝请求
reply.Success = false

// 如果任期小于当前任期,拒绝请求
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}

// 如果是候选人,且任期相同,转变为Follower
if rf.state == Candidate && rf.currentTerm == args.Term {
rf.state = Follower
}

// 检查任期,如果自身任期小于RPC的任期,仍需要转变为Follower
rf.checkTerm(args.Term)
reply.Term = rf.currentTerm
rf.setElectionTimeout(randHeartbeatTimeout())

// 下面是日志同步相关部分,暂不涉及
// ....................

reply.Success = true
}

此外,还有很多函数在其中发挥了重要的作用,辅助完成任务,具体实现见详细的Lab 2A代码。该代码完成并仅仅完成集群选主任务,尚未完成后续的日志复制、持久化任务。

结果展示

上述展示的函数仅是主要函数,结合大量的辅助函数,以及尚未说明的AppendEntries RPC结构,将共同完成2A的相关任务,其运行结果如图4所示。

图4 Lab 2A运行结果

总结

Lab 2A实际上是所有内容中最为困难的一个内容,主要是由于为了实现该内容,基本需要从头搭建框架,并写好最初的辅助函数。6.824原本提供的框架并不能满足相关功能的方便实现。同时,分布式代码的调试是十分困难的,日志繁琐等困难始终困扰着初期的调试。

此外,一个显著的难点是,对代码运行流程的不清楚,最主要的问题是不知道哪个节点会在什么时候调用什么函数,什么函数是一直保持运行什么函数又是会间断运行的,我花费了较长的时间研读6.824课程提供的源代码,大致弄清楚了各个函数的作用以及调用的时机。

在明白交互逻辑过后,代码的编写显得就不是过于的难了,在我上述的代码中,我可以简要概述其运行逻辑。

  1. 发生心跳超时后,节点运行raiseElection函数,将自身状态转变为Candidate;
  2. Candidate通过raiseElection函数向其他节点发送RequestVoteArgs类型的信息;
  3. 其他节点收到该信息后,运行RequestVote函数,处理投票,并返回RequestVoteReply类型的回应给Candidate;
  4. Candidate处理选票,运行candidateRequestVote函数;
  5. 若Candiadte胜选,晋升Leader,则会运行发送心跳连接,Follower通过AppendEntries函数处理这些心跳连接。

有了上述的基本逻辑后,相关函数的实现也不会很难。

日志同步(Lab 2B)

任务分析

对于Raft协议的日志同步而言,其一般遵循以下的步骤:

  1. 节点向Leader发送修改命令,Leader将其写入自己的日志中。
  2. Leader向以并行的方式所有Follower发送心跳连接或日志写入请求(两者并无区别)AppendEntries RPC,并等待他们的回复。
  3. 一旦Leader收到足够多的响应,可以它认为该条命令已经在多数服务器上处于已提交状态时,那么该条命令就可以被执行。
  4. Leader这时会将命令发送给状态机,当执行结束后,它会将结果返回给客户端。

原论文的图1可以很好的反应在日志同步的最终效果,如图5所示。

图9 日志同步效果图

如此而言,Raft协议并不是一个严格的2PC提交协议,而是始终采用过半的方法进行信息的确认。这样的好处是能够得到较大程度的性能提升,暂时忽略未能够提交AppendEntries RPC Reply的节点,提高运行效率。

上述的步骤只是一般情况,然而实际情况以及Lab 2B的相关测试中,却给予了许多苛刻的条件,上述的一般情况是不足以应对测试的。可能存在如下的几个问题,这些问题在图5中都可以展示出来。

  • Follower短暂宕机,恢复时候添加新数据会导致该节点日志出现一定程度的空缺。
  • 旧Leader在处理完毕信息后发生宕机,新Leader面临复杂而混乱的回复和调用。
  • Follower可能会出现不同的日志记录。

图5 日志同步可能会遇到的问题

面对以上的几个问题,Raft协议在AppendEntries RPC结构中设计了PrevLogIndexPrevLogTerm两个结构用于进行判断它们分别代表准备添加的最新日志条目的最后一条,理应而言,这条日志是全部成员都应该持有的。但是由于宕机的发生,部分失联成员重新连接后可能会出现日志不全的情况。Follower将检查Leader发送来的日志前序索引和日志前序任期是否与自身相匹配,如不匹配产生冲突,则将拒绝日志修改请求。这时候,如果并非Leader造成的问题,Leader会耐心地重新发送前序日志索引以及日期编号与之相匹配的日志(该日志包括了后续的所有日志)。这是日志补充操作的实施方法。

图6 日志修复

而日志修改操作却略显复杂。对于可能出现的Follower日志超前或旧有日志不匹配问题(这主要是由于旧Leader发生宕机所导致的),领导者会为每个跟随者维护一个状态变量,这个变量称为 nextIndex ,这个变量存储日志的下一条记录的下标位置索引,服务器会把这个位置发送给跟随者(如图6所示,nextIndex = 11)。当一台服务器成为领导者后,它会将 nextIndex 值设置成当前日志记录的下一位置。所以在上面的例子中,任期 7 的领导者的最后一条记录的索引位置是 10 ,那么它会将 nextIndex 设置成 11 。领导者会根据 AppendEntries 调用发现一致性问题,因为当跟随者接收到 AppendEntries 调用时,都会进行检查。这个检查就可以发现所有的问题。所以当下一次领导者想要与跟随者进行通信时,它都会包括下标位置索引(10)以及任期号(6)作为请求的参数。当选为领导者后,下一次请求也有可能是以心跳检测的方式发送的,心跳检测与 AppendEntries 调用的方式一样,只是没有新值创建,但还是包括一致性检查的。所以当消息到达跟随者(a)后,它会将接收到的下标位置索引与任期与自己的日志信息进行比较,并没有匹配的记录,所以它会拒绝 AppendEntries 请求,当领导者收到拒绝的响应之后,它的响应很简单,它要做的只是将 nextIndex 减 1 ,所以这个值就变成了 10 。如此逐一减少,直到最终 nextIndex 为 5 的时候,领导者再次发送请求的信息会包括下标位置索引(4)以及任期号(4),这时它与跟随者(a)当前的日志记录信息是相匹配的,所以这时跟随者会接受 AppendEntries 请求,并追加记录 5-4 。直到领导者将跟随者的日志记录填充完整。相似的过程也会在跟随者(b)上出现。当 nextIndex 减少到 4 时,领导者会包括下标位置索引(3)以及任期号(1)作为请求的参数,并修正跟随者(b)上的日志记录。

上述的操作是Raft论文中的标准操作,但是在我的实现中,由于上述操作面对图6的情况需要多次的RPC连接方可以将信息更新到最新的程度,故我采用了一次性更新的方法。即,从Follower错误的条目开始,直接一次性替换更新多条日志,这样会省去大量RPC交互,但是会增加一次性信息的交互大小。对于Lab 2A而言,实际上并未传递有效的信息,仅是心跳信息,故基本不会增大RPC信息的大小。

总而言之,通过PrevLogIndex、PrevLogTerm以及自身所期望的nextIndex和matchIndex,Leader节点可以通过AppendEntries RPC完成对Follower节点的快速增补操作以及错误修改操作,保证所有在线节点的数据一致性。

功能设计与实现

AppendEntries相关数据结构实现

关于AppendEntries方面,有两个重要的数据结构,一是负责日志传递的AppendEntries RPC,二是Follower对于RPC信息的回复reply。其中AppendEntries RPC的数据结构实现在原论文的图2已经给出相关的内容了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AppendEntries RPC 论文图2所示
type AppendEntriesArgs struct {
Term int // 领导者的任期
LeaderId int // 领导者的ID,以便于跟随者重定向请求
PrevLogIndex int // 新的日志条目紧随之前的索引值
PrevLogTerm int // PrevLogIndex 条目的任期号
Entries []LogEntry // 准备存储的日志条目(表示心跳时为空)
LeaderCommit int // 领导者的已知已提交的最高日志条目的索引值
}

// AppendEntries RPC 回复的数据结构
type AppendEntriesReply struct {
Term int // 当前任期,用于领导者更新自己
Success bool // 如果跟随者包含与PrevLogIndex和PrevLogTerm匹配的日志条目,则为true
}

Follower接收AppendEntries RPC后的处理

AppendEntries RPC是由Leader发送过来的数据,故在此,需要在Lab 2A的基础上进行补充,完成其日志同步部分。正常的数据直接追加就好,但是若出现日志不匹配的情况,也就是PrevLogTerm不等于日志前序条目的任期的情况下,Follower将会拒绝请求并通知Leader重试,Leader进行重试至当前任期后,将会一次性发送所有后续已提交的日志,Follower对其进行追加或替换更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// AppendEntries Follower接收Leader的追加/心跳包
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()

// 先默认拒绝请求
reply.Success = false

// 如果任期小于当前任期,拒绝请求
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}

// 如果是候选人,且任期相同,转变为Follower
if rf.state == Candidate && rf.currentTerm == args.Term {
rf.state = Follower
}

// 检查任期,如果自身任期小于RPC的任期,仍需要转变为Follower
rf.checkTerm(args.Term)
reply.Term = rf.currentTerm
rf.setElectionTimeout(randHeartbeatTimeout())

// 对自身的LeaderId进行更新
rf.leaderId = args.LeaderId

// 如果日志不包含在prevLogIndex处的任期或prevLogTerm不匹配,拒绝请求
// 拒绝后,Leader会递减nextIndex并在自身进行重试直至匹配
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
return
}

// Leader匹配成功后直接发来后续所有已提交的日志
// Follower使用Leader发来的Entries完全替换自己的错误的
for i, entry := range args.Entries {
if rf.log.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
rf.log = append(rf.log.getSlice(1, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}

// 如果leaderCommit > commitIndex,令commitIndex等于leaderCommit和新日志条目索引值中较小的一个
if args.LeaderCommit > rf.commitIndex {entTerm, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
}

reply.Success = true
}

Leader发送AppendEntries RPC

在日志同步中最为重要的内容,便是Leader向所有Follower发送AppendEntries RPC的内容并等待回复,我写了leaderSendEntries函数以完成上述相关功能。该函数实现主要功能并不难,较难的实现是Leader在进行日志发送的过程中需要同步维护各个其他节点的nextIndex和matchIndex。在此之前,Leader通过sendAppendEntries发送AppendEntries RPC后,首要的任务是确定自己的“合法性”,如果回复的任期大于自身当前的任期,说明该节点Leader的地位已经丧失,需要主动转换为Follower。

若来自Follower的回复成功,则Leader将会更新该Follower节点的nextIndex和matchIndex的值,以确定下一条日志需要给他们发送什么。此外,Leader将遍历自身的日志,从末到头逐个查找满足大多数Follower节点已匹配的日志条目索引,并将最后一条与大多数节点相匹配的日志传入作为提交索引commitIndex。这将产生对状态机的更新。

如果Follower节点回复失败,则说明该节点的日志目前与大部分节点的日志是不匹配的,Leader将会逐步减小该节点的nextIndex直至与Follower传来的lastLogIndex相匹配,此时Leader将会一次性发送其缺失的所有日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// leaderSendEntries函数实现了Raft算法中Leader节点向Follower节点发送日志条目的功能。
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
// 发送AppendEntries RPC请求并等待回复
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()

// 如果回复的任期小于当前任期,回复无效
if reply.Term < rf.currentTerm {
return
}

// 如果回复的任期大于当前任期,转变为跟随者
if rf.currentTerm != args.Term {
return
}

// 如果回复的任期大于当前任期,转变为跟随者
if rf.checkTerm(reply.Term) {
return
}

// 如果成功,更新nextIndex和matchIndex
if reply.Success {
newNext := args.PrevLogIndex + 1 + len(args.Entries)
newMatch := args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])

// 遍历日志,查找满足大多数Follower节点已匹配的日志条目索引
for N := len(rf.log); N > rf.commitIndex && rf.log.getEntry(N).Term == rf.currentTerm; N-- {
count := 1

// 计算已匹配的Follower节点数量
for peer, matchIndex := range rf.matchIndex {
if peer == rf.me {
continue
}
if matchIndex >= N {
count++
}
}

// 如果已匹配的Follower节点数量超过半数,更新commitIndex,可以进行提交
if count > len(rf.peers)/2 {
rf.commitIndex = N
break
}
}

} else { // 如果失败,减小nextIndex并重试
if rf.nextIndex[server] > 1 {
rf.nextIndex[server]--
}
lastLogIndex, _ := rf.log.lastLogInfo()
nextIndex := rf.nextIndex[server]

// 如果Leader节点与Follower节点的日志不一致,重试
if lastLogIndex >= nextIndex {
entries := make([]LogEntry, lastLogIndex-nextIndex+1)

// 重试时,只发送nextIndex之后的日志条目
copy(entries, rf.log.getSlice(nextIndex, lastLogIndex+1))
}
}
}
}

Leader将结果提交状态机

在完成基本的同步之后,我设计了applyLogLoop函数,定期将commitIndex之前的日志提交状态机,也就是对状态机进行更新。该函数通过打包发送的方式,对不再会出现错误的日志提交状态机(数据库)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 定期将已提交日志应用到状态机
func (rf *Raft) applyLogsLoop(applyCh chan ApplyMsg) {
for !rf.killed() {
rf.mu.Lock()
appliedMsgs := []ApplyMsg{}

// 如果有新的日志被提交,将其应用到状态机
if rf.commitIndex > rf.lastApplied {
rf.lastApplied++

// 对已提交的日志打包
appliedMsgs = append(appliedMsgs, ApplyMsg{
CommandValid: true,
Command: rf.log.getEntry(rf.lastApplied).Command,
CommandIndex: rf.lastApplied,
})
}
rf.mu.Unlock()

// 将已提交的日志发送到applyCh
for _, msg := range appliedMsgs {
applyCh <- msg
}
time.Sleep(time.Duration(TickInterval) * time.Millisecond)
}
}

结果展示

通过上述的2个数据结构以及2个函数,结合部分起到辅助性质和源代码中已经提供的函数,便可以通过6.824所提供的Lab 2B测试,如图7所示,这也说明了上述代码的正确性。源代码被存放在了Lab 2B文件夹中,该份代码仅能够通过Lab 2A以及2B的测试,暂无法通过2C以及2D的相关测试。

图7 Lab 2B运行结果

总结

日志同步的内容相较于Lab 2A的选举而言,其难度也并没有下降许多,不过在Lab2A中实际上已经涉及心跳连接,也就是AppendEntries RPC的相关发送和接收,实际上降低了部分难度,在逻辑上也稍微更好理解,就是如下的情况。

  • Leader通过leaderSendEntries函数将心跳连接或日志内容(以AppendEntriesArgs结构体保存)发布给所有的节点;
  • 其余节点通过AppendEntries函数处理所收到的AppendEntriesArgs,并发送AppendEntriesReply给Leader回馈正确与否;
  • Leader通过applyLogsLoop函数将目前已经提交的状态提交状态机。

上面的三个步骤帮助我们捋清了Lab 2B的主要内容,可以通过图10进行展示。据此,具体的实现根据论文一步步来就变得简单了。

图10 日志同步实现内容

持久化(Lab 2C)

任务分析

如果一台 Raft 服务器重启了,那么他就应该在刚才停下的地方继续运行服务。这就要求 Raft 协议要将一部份状态进行持久化,使得在重启后相关状态可以保留。在论文的图 2 种描述了哪些状态需要进行持久化保存。

一种正确的实现方式是,每当Raft的需持久化状态发生改变时,将其写入硬盘,并在重启之后从硬盘中读取出相关的状态信息。在本次Lab的实现中,我们不会使用到硬盘,而是从一个叫做Persister的对象中保存和恢复需持久化的状态。Raft.Make()方法的调用者会提供一个Persister对象,如果有的话该对象中会存储 Raft 最近持久化的相关状态信息。Raft 需要从这个Persister对象中初始化自己的状态,并且应当在每次相关持久化状态改变时使用该对象进行保存,我们使用Persister对象中ReadRaftState()和Save()方法进行持久化状态的恢复和保存。

Persister所接收的内容是byte类型的数组,而非go语言中一般的数据存储方式,源代码给我们提供了labgob编码器,并通过persist.Save()函数对当前状态进行保存。

相较于前文所提到的相关方法,持久化的实现并不是很难,最重要的是它仅需多写两个函数即可,分别是persist()函数以及readPersist()函数,分别代表了写入持久化数据和读取持久化数据。在6.824的提示中,其给出了我们一些处理方法,即原论文中对于大量拒绝的方法是进行一次次的回退直至同步,但是我们完全可以一次性回退到位,在 AppendEntries 的时候完整的带过去全量的更新信息,这样当有大量拒绝的时候就可以避免一次一条目的慢慢回退重新发送,好处是大量拒绝发生时可以有效减少回退成本。原课程的网站中给予了我们实现的一种可能手法,即在AppendEntries RPC的Reply中加入如下字段:

1
2
3
XTerm:  term in the conflicting entry (if any)
XIndex: index of first entry with that term (if any)
XLen: log length

简单而言,也就是如果发生冲突,Leader 节点可以选择回退到一个合适的位置重新开始同步。

如果被同步节点冲突位置的 Term 任期 Leader 节点日志中恰好有,那 Leader 就回退到该任期中的最后一个日志开始同步;反之如果没有那就可以考虑从被同步节点在该任期的第一个位置开始同步;而如果是因为同步 Index 位置在被同步节点上还不存在,也就是太短了导致的,那就回退到被同步节点尾部开始同步。

在持久化的实现上,还需要注意的一个点是,需要在何时进行持久化操作,也就是要在什么地方调用persist()函数,以下我列出了持久化操作应该进行的位置。

  • Leader节点接收到来自客户端的命令之后;
  • Follower节点完成日志同步之后;
  • 节点参与投票,改变votedFor之后;
  • 节点当前任期currentTerm发生改变之后;

在以上操作之后,需要立即进行持久化操作,这样的改变需要在Lab 2A和Lab 2B中进行改变,故在后续的功能实现中不对相关函数进行详细的展示。

功能设计与实现

持久化和其读取函数实现

持久化函数中,在原论文的设计中,其要求我们对3个参数进行持久化保存,分别是currentTerm、voteFor以及log三个内容。该函数的编写也是比较简单的,但要注意的是,需要判断编码是否正确,如果编码正确,那么编码的输出将是空值nil,反之将为一个非空的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 放入持久化数据
func (rf *Raft) persist() {
Debug(dPersist, "S%d Saving persistent state to stable storage at T%d.", rf.me, rf.currentTerm)

// 编码并保存当前信息
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if err := e.Encode(rf.currentTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.currentTerm\". err: %v, data: %v", err, rf.currentTerm)
}
if err := e.Encode(rf.votedFor); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.votedFor\". err: %v, data: %v", err, rf.votedFor)
}
if err := e.Encode(rf.log); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.log\". err: %v, data: %v", err, rf.log)
}

// 将信息送入Persistor中
data := w.Bytes()
rf.persister.SaveRaftState(data)
}

// 从持久化数据中读取信息
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
return
}
Debug(dPersist, "S%d Restoring previously persisted state at T%d.", rf.me, rf.currentTerm)

// 恢复持久化之前的状态
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
rf.mu.Lock()
defer rf.mu.Unlock()

// 调试信息
if err := d.Decode(&rf.currentTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.currentTerm\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.votedFor); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.votedFor\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.log); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.log\". err: %v, data: %s", err, data)
}
}

冲突信息的留取

如何保存在任务分析中所添加的冲突信息XTerm、XIndex以及XLen,是我们所需要考虑的重要问题之一,在Lab 2B中曾找出了多种的冲突情况,现在我们可以将其运用在冲突信息的读取中,对于Leader和Follower节点而言,即是在AppendEntries和leaderSendEntries函数中进行相关的修改,使其能够找到并应用冲突日志,减少重复的日志传输和读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// AppendEntries Follower接收Leader的追加/心跳包
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

// ......与Lab 2B相同的前序内容......

// 如果日志不包含在prevLogIndex处的任期或prevLogTerm不匹配,拒绝请求
// 拒绝后,Leader会递减nextIndex并在自身进行重试直至匹配
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
reply.XLen = len(rf.log)
reply.XTerm = rf.log.getEntry(args.PrevLogIndex).Term
// getBoundsWithTerm返回满足给定任期的第一个和最后一个日志条目的索引
reply.XIndex, _ = rf.log.getBoundsWithTerm(reply.XTerm)
return
}

// ......与Lab 2B相同......

// 如果日志长度大于0,将其持久化
if len(args.Entries) > 0 {
rf.persist()
}

// .......后续与Lab 2B相同......
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {

// ......与Lab 2B相同......

if reply.Success {

// ......与Lab 2B相同......

} else { // 如果追加日志项失败,则根据回复的信息进行相应的处理。

// Follower日志过短
if reply.XTerm == -1 {
rf.nextIndex[server] = reply.XLen + 1
} else {
_, maxIndex := rf.log.getBoundsWithTerm(reply.XTerm)

if maxIndex != -1 { // Leader存在XTerm
rf.nextIndex[server] = maxIndex
} else { // Leader不存在XTerm
rf.nextIndex[server] = reply.XIndex
}
}
lastLogIndex, _ := rf.log.lastLogInfo()
nextIndex := rf.nextIndex[server]

// ......与Lab 2B相同......
}
}
}

结果展示

上述代码仅展示了持久化过程中的部分重要函数,此外,还需要在合适的实际插入持久化的过程,该操作较为简单,故不作代码展示。结合上述代码以及Lab 2A和Lab 2B部分的内容,可以得到如下所示的结果。

图8 Lab 2C运行结果

总结

Lab 2C相对于Lab 2B和lab2A而言,难度有了明显的下降,主要内容变得简单起来。实际上如果不是为了避免原论文中一步步的日志恢复同步过程,我们只需要完成persist和readPersist函数,并在Lab 2B的函数中找到合适的调用时机便可以完成。但是如果不加入冲突发生位置的定位的话,运行消耗的时间将会大幅增加,还可能出现找不到一致性的可能,在2C的Figure 8 (unreliable)、churn以及unreliable churn测试中容易出现无法达成一致的可能。这是我们需要避免的。

为此,根据6.824课程的指引,我将其中添加的XTerm、XIndex以及Xlen参数,以快速找到冲突的发生点,并进行合适的日志持久化,以防止其日志存储和恢复时间过程,导致无法通过测试。综上而言,持久化部分的实现相较于集群选举和日志同步而言并不难。