经过上一篇文章关于一些基础“接口”的定义,我们本篇开始操作核心,节点。

一个节点,有三个状态,Follower,Candidate和Leader。他们肯定具有一些共性,比如他们都具有自己的ID,自己的交互通信工具,执行状态机,超时时间,任期,投票人等。
这些信息我们肯定是无法以下就能够确定全的,我们不妨先知道有这些属性,用到了就做详细解释,没有用到就先忽略。

type Node struct {
    sync.RWMutex

    ID              string
    Server          GRPCServer
    Client          GRPCClient
    Status          int
    Log             Logger
    StateMachine    Applyer
    ElectionTimeout time.Duration
    Uncommitted     map[int64]*CommandRequest

    Term         int64
    ElectionTerm int64
    VoteFor      string
    Ballots      int
    Cluster      []*Peer

    endElectionChan      chan int
    finishedElectionChan chan int

    VoteResponseChan          chan pb.VoteRespond
    RequestVoteChan           chan pb.VoteRequest
    RequestVoteResponseChan   chan pb.VoteRespond
    AppendEntriesChan         chan pb.EntryRequest
    AppendEntriesResponseChan chan pb.EntryResponse
    CommandChan               chan CommandRequest
    ExitChan                  chan int
}

为了方便调用,我们可以创建一个new函数,直接把需要传递的参数传递进来,然后需要初始化的进行初始化即可。

func NewNode(id string, client GRPCClient, server GRPCServer, logger Logger, applyer Applyer) *Node {
    node := &Node{
        ID:              id,
        Server:          server,
        Client:          client,
        Log:             logger,
        Status:          FOLLOWER,
        StateMachine:    applyer,
        ElectionTimeout: 500 * time.Millisecond,
        Uncommitted:     make(map[int64]*CommandRequest),

        VoteResponseChan:          make(chan pb.VoteRespond),
        RequestVoteChan:           make(chan pb.VoteRequest),
        RequestVoteResponseChan:   make(chan pb.VoteRespond),
        AppendEntriesChan:         make(chan pb.EntryRequest),
        AppendEntriesResponseChan: make(chan pb.EntryResponse),
        CommandChan:               make(chan CommandRequest),
        ExitChan:                  make(chan int),
    }
    return node
}

我们的功能按置顶向下的方式来分类。这样容易理解,也容易完善流程。

顶层功能

  1. 启动功能
func (n *Node) Serve() error {
    err := n.Server.Start(n)
    log.Println("serve start ")
    go n.loop()
    return err
}

我们需要第一步把GRPCServer启动起来,然后启动一个loop函数,这个loop函数是什么作用呢?
这个node节点需要处理超时选举,处理请求信息,这个内容需要一个循环函数一致进行监听处理。

  1. 结束功能
func (n *Node) Stop() {
    n.Server.Stop()
}

我们直接结束服务器即可。

  1. 添加服务器
func (n *Node) AddToCluster(member string) {
    p := &Peer{
        ID: member,
    }
    n.Cluster = append(n.Cluster, p)
}

很明显,进来一个服务器,我们要添加到定义的Node里面的cluster变量里面。

底层功能

底层提供的功能非常多,我们需要一点一点解释。
为了方便理解,我们不妨按照启动流程进行解释。

                                 分 离 或 超 时
                                   +-------+
                                   |       |
                                   |       |
                                   |       |
                                   |       |
                      超  时       |       v               成  功
Follower +------------------------->Candidate+--------------------------->Leader
 ^ ^                                    +                                    +
 | |                                    |                                    |
 | |                                    |                                    |
 | |                                    |                                    |
 | |                  失  败            |                                    |
 | +------------------------------------+          更 高 任 期               |
 |                                                                           |
 +---------------------------------------------------------------------------+

这里多说一句,这个工具是asciiflow很方便
第一步就是选举超时,我们需要Follower将自己提升为Cnadidate
在循环里面应该是这样

func (n *Node) loop() {
    electionTimer := time.NewTimer(n.randomElectionTimeout())
    for {
        select {
        case <-electionTimer.C:
            n.electionTimeout()
        case <-n.ExitChan:
            n.stepDown()
            goto exit
        }
        randElectionTimeout := n.randomElectionTimeout()
        if !electionTimer.Reset(randElectionTimeout) {
            electionTimer = time.NewTimer(randElectionTimeout)
        }
    }
exit:
    log.Printf("[%s] exiting ioLoop()", n.ID)
}

这个n.randomElectionTimeout()含义看单词意思就可以明白,就是生成随机的超时时间。在论文里面不同的节点具有不同的超时时间,这样才可以选举出leader。
那么就定义下这个函数吧

func (n *Node) randomElectionTimeout() time.Duration {
    return n.ElectionTimeout + time.Duration(rand.Int63n(int64(n.ElectionTimeout)))
}

就是在基础超时时间的基础上添加一个随机数,生成不同的时间。
那么n.electionTimeout()是怎么工作的呢?

func (n *Node) electionTimeout() {
    if n.Status == LEADER {
        return
    }
    n.Status = CANDIDATE
    n.endElection()
    n.nextTerm()
    n.runForLeader()
}

毋庸置疑,如果节点已经是Leader了,就不需要再进行选举了。否则的话,把自己提升为Candidate,结束当前的选举,进入下一个任期,并且竞争Leader。

func (n *Node) endElection() {
    if n.Status != CANDIDATE || n.endElectionChan == nil {
        return
    }
    close(n.endElectionChan)
    <-n.finishedElectionChan

    n.endElectionChan = nil
    n.finishedElectionChan = nil
}

结束选举也不难理解,如果当前状态不是候选人,那就没必要进行了,否则关闭channel

func (n *Node) nextTerm() {
    n.Status = FOLLOWER
    n.Term++
    n.Ballots = 0
}

为什么只会设置状态为Follower呢?因为,进入下一个任期的出发条件不一定是选举过程,也有可能是已经选举出来了。当然了,在此处我们要把自己的投票初始化为0.
最后是要竞争选Leader了。

func (n *Node) runForLeader() {
    n.Status = CANDIDATE
    n.Ballots++
    n.ElectionTerm = n.Term
    n.endElectionChan = make(chan int)
    n.finishedElectionChan = make(chan int)
    vr := &pb.VoteRequest{
        Term:         n.Term,
        CandidateID:  n.ID,
        LastLogIndex: n.Log.Index(),
        LastLogTerm:  n.Log.Term()}
    go n.gatherVotes(vr)
}

提升自己的状态,并且构造请求,发送给其他节点,收集投票结果。

func (n *Node) gatherVotes(request *pb.VoteRequest) {
    for _, peer := range n.Cluster {
        go func(p string) {
            vresp, err := n.Client.RequestVote(p, request)
            log.Println(vresp)
            if err != nil {
                log.Printf("[%s] error in RequestVoteRPC() to %s - %s", n.ID, p, err)
                return
            }
            n.VoteResponseChan <- *vresp
        }(peer.ID)
    }

    <-n.endElectionChan
    close(n.finishedElectionChan)
}

怎么发送的呢?就是按集群顺序全部发送,等到结果,把结果放到n.VoteResponseChan里面
这里就体现出来n.endElectionChan的意义了。如果不使用的话,这个go的协程就停止了。
然后,把收到的结果放到n.VoteResponseChan里面,我们进行投票处理就好了!
在我们前面的循环里面要有个对VoteResponseChan的处理

case vresp := <-n.VoteResponseChan:
        n.doRespondVote(vresp)

具体的处理流程应当如下

func (n *Node) doRespondVote(vr pb.VoteRespond) {
    if n.Status != CANDIDATE {
        return
    }
    if vr.Term != n.ElectionTerm {
        if vr.Term > n.ElectionTerm {
            // we discovered a higher term
            n.endElection()
            n.setTerm(vr.Term)
        }
        return
    }

    if vr.VoteGranted {
        n.voteGranted()
    }

}

如果当前的状态不是Candidate的话,肯定不需要处理的(实际上,不是发出去让别人投票请求,谁会主动给他发相应呢?)
除此之外,如果回复的任期不一致,那一定不用进行选举了(有别人发了或者有人同意了其他人的Leader请求)然后,需要将任期更新到最新的。

func (n *Node) setTerm(term int64) {
    // check freshness
    if term <= n.Term {
        return
    }

    n.Term = term
    n.Status = FOLLOWER
    n.VoteFor = ""
    n.Ballots = 0
}

很容易理解,就是将任期更新,一切初始化。
我们继续上面的投票内容。经过层层筛选,回复的消息是我同意你当选,也就是vr.VoteGranted==True的时候,我就认为有人投票同意了,开始进行处理。

func (n *Node) voteGranted() {
    n.Ballots++
    majority := (len(n.Cluster)+1)/2 + 1
    if n.Ballots >= majority {
        n.endElection()
        n.promoteToLeader()
    }
}

只要票数大于大多数,就认为通过了,就把自己提权提到Leader就好了!

func (n *Node) promoteToLeader() {
    n.Status = LEADER
    for _, peer := range n.Cluster {
        peer.NextIndex = n.Log.Index()
    }
}

我们关注的核心是目前将状态提升至Leader,并且维护peer的内容也要更新Index。
可能会有人问,难道不需要广播吗?给所有的人说声,我当选了!
这个就是在心跳包里面实现的了!
我们下篇继续。

算法 go 分布式 raft

发表新评论