我们上一篇说了领导的选举,本篇讨论日志和心跳包的处理。

由于有了领导选举的铺垫,我们这篇文章就要容易理解一些。当然,功能还没完善,以后会有后续文章来更新功能。
(os:怕再拖文章就写不完了)

日志

在日志和心跳包处理环节,使用的最多的就是日志管理了,需要对比,需要写入写出,所以,我们需要把日志处理功能给完善下。
在之前的文章中,我们定义了日志需要实现的接口

type Logger interface {
    Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error
    Append(e *Entry) error
    FresherThan(index int64, term int64) bool
    Get(index int64) *Entry
    GetEntryForRequest(index int64) (*Entry, int64, int64)
    Index() int64
    LastIndex() int64
    Term() int64
}

我们就开始实现具体的功能吧!

日志结构

首先定义日志结构体

type Log struct {
    sync.RWMutex

    index int64
    term  int64

    Entries []*Entry
}

所有的操作都有可能会同步操作,所以我们需要加锁进行维护。日志里面肯定有具体的控制历史,就是Entry内容

type Entry struct {
    CmdID int64
    Index int64
    Term  int64
    Data  []byte
}

结构内容很容易理解。

日志操作

日志的操作有些事比较容易理解的,比如

func (l *Log) Index() int64 {
    return l.index
}

func (l *Log) LastIndex() int64 {
    return l.index - 1
}

func (l *Log) Term() int64 {
    return l.term
}

func (l *Log) Get(index int64) *Entry {
    if index < 0 {
        return nil
    }
    return l.Entries[index]
}

这几个功能就是返回特定的内容,不需要解释太多。接下来我们要讲几个比较复杂的功能。

func (l *Log) GetEntryForRequest(index int64) (*Entry, int64, int64) {
    if index < 0 {
        return nil, -1, -1
    }
    if index < 1 {
        return l.Entries[index], -1, -1
    }
    return l.Entries[index], l.Entries[index-1].Index, l.Entries[index-1].Term
}

这个是客户端请求一个Entry的时候,我们不单单要发送这条entry,还需要prevLogIndex和prevLogTerm,所以需要返回多个变量。

func (l *Log) FresherThan(index int64, term int64) bool {
    if l.term > term {
        return true
    }

    if l.term < term {
        return false
    }

    return l.index > index
}

这个实现了判断新旧的功能,首先判断Term的新旧,再判断日志的新旧。

func (l *Log) Check(prevLogIndex int64, prevLogTerm int64, index int64, term int64) error {
    if len(l.Entries) > 0 && index > 0 {
        if index > int64(len(l.Entries)) {
            return errors.New("behind")
        }
        lastGoodEntry := l.Entries[index-1]
        if lastGoodEntry.Term != prevLogTerm && lastGoodEntry.Index != prevLogIndex {
            return errors.New("inconsistent")
        }
    } else if index != 0 {
        return errors.New("missing")
    }
    return nil
}

这个功能稍微复杂点,主要是用来校验本地日志和远端日志是否匹配的问题。
如果远端日志序号比本地大,那么一定是本地落后了,或者假设序号一致,但是这条日志的任期和prevLogIndex和远端不一致,那就一定不连续。当然了,本地日志也有可能直接没有,那就直接缺少内容,需要进行复制操作了。

func (l *Log) Append(e *Entry) error {
    if e.Term != l.term {
        l.term = e.Term
    }

    if e.Index < l.index {
        l.Entries = l.Entries[:e.Index]
    }

    l.Entries = append(l.Entries, e)
    l.index = e.Index + 1
    return nil
}

有可能本地存储的日志,并不一定是全部节点公认的日志,需要将错的删除。否则,直接添加上就好了。

服务端操作

我们在上篇文章已经定义了服务端的一些功能,在这需要实现AppendEntries的功能

func (gServer *grpcServerImpl) AppendEntries(ctx context.Context, er *pb.EntryRequest) (*pb.EntryResponse, error) {
    eresp, err := gServer.node.AppendEntries(*er)
    return &eresp, err
}

节点功能

节点也像选举过程,一方面是接受别人的请求,一方面是向别人发送请求。

接收

节点这一端首先需要实现的功能是AppendEntries的功能

func (n *Node) AppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
    n.AppendEntriesChan <- er
    return <-n.AppendEntriesResponseChan, nil
}

与选举过程非常相似,也是传递到channel,在循环里面处理。

case areq := <-n.AppendEntriesChan:
        aresp, _ := n.doAppendEntries(areq)
        n.AppendEntriesResponseChan <- aresp

具体的处理功能如下

func (n *Node) doAppendEntries(er pb.EntryRequest) (pb.EntryResponse, error) {
    if er.Term < n.Term {
        return pb.EntryResponse{n.Term, false}, nil
    }
    if n.Status != FOLLOWER {
        n.endElection()
        n.stepDown()
    }

    if er.Term > n.Term {
        n.setTerm(er.Term)
    }
    err := n.Log.Check(er.PrevLogIndex, er.PrevLogTerm, er.PrevLogIndex+1, er.Term)
    if err != nil {
        return pb.EntryResponse{Term: n.Term, Success: false}, nil
    }
    if bytes.Compare(er.Data, []byte("")) == 0 {
        return pb.EntryResponse{Term: n.Term, Success: true}, nil
    }
    e := &Entry{
        CmdID: er.LeaderCommit,
        Index: er.PrevLogIndex + 1,
        Term:  er.Term,
        Data:  er.Data,
    }
    err = n.Log.Append(e)
    return pb.EntryResponse{Term: n.Term, Success: err == nil}, nil
}

日志处理流程论文里面也详细规定了。

  1. 任期小于本地任期的话,就可以直接返回错误了
  2. 如果远端日志内容跟本地完全不匹配,也可以直接返回错误了。
  3. 如果是心跳包的话,以上检查通过就通过即可。
  4. 如果不是心跳包,添加到日志体,如果成功添加,返回通过。

还有几点需要注意的,节点收到这个消息的时候,有可能是在选举阶段,既然收到消息就证明有节点已经成功当选,本节点就不必再进行选举了,停止选举,退回Follower状态即可。

发送

什么时候会发送呢?心跳包时间到了的时候,或者,收到客户端发来执行命令请求的时候。

followerTimer := time.NewTicker(200 * time.Millisecond)
...
case <-followerTimer.C:
    n.updateFollowers()
    continue
case creq := <-n.CommandChan:
    n.doCommand(creq)
    n.updateFollowers()

这两个过程都会触发向别人发送的功能。
先看客户端请求执行一定指令的时候触发的功能吧

func (n *Node) doCommand(cr CommandRequest) {
    if n.Status != LEADER {
        cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
    }

    e := &Entry{
        CmdID: cr.ID,
        Index: n.Log.Index(),
        Term:  n.Term,
        Data:  cr.Body,
    }

    err := n.Log.Append(e)
    if err != nil {
        cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: false}
    }

    cr.State = Logged
    cr.ReplicationCount++
    n.Uncommitted[cr.ID] = &cr
}

收到别热的请求,首先得保证自己是Leader,然后构建日志体,写入到日志List中。为了保证日志能够最终统计是否可以被全部执行,需要记录已经执行了多少次。
然后Leader执行完后,就更新他的Follower

func (n *Node) updateFollowers() {
    var er *pb.EntryRequest
    if n.Status != LEADER {
        return
    }
    for _, peer := range n.Cluster {
        if n.Log.LastIndex() < peer.NextIndex {
            // heartbeat
            _, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(n.Log.LastIndex())

            er = &pb.EntryRequest{
                LeaderCommit: -1,
                Term:         n.Term,
                LeaderID:     n.ID,
                PrevLogIndex: prevLogIndex,
                PrevLogTerm:  prevLogTerm,
                Data:         []byte("")}
        } else {
            entry, prevLogIndex, prevLogTerm := n.Log.GetEntryForRequest(peer.NextIndex)
            er = &pb.EntryRequest{
                LeaderCommit: entry.CmdID,
                Term:         n.Term,
                LeaderID:     n.ID,
                PrevLogIndex: prevLogIndex,
                PrevLogTerm:  prevLogTerm,
                Data:         entry.Data}
        }
        _, err := n.Client.AppendEntries(peer.ID, er)
        if err != nil {
            peer.NextIndex--
            if peer.NextIndex < 0 {
                peer.NextIndex = 0
            }
            continue
        }

        if er.LeaderCommit == -1 {
            // skip commit checks for heartbeats
            continue
        }
        majority := int32((len(n.Cluster)+1)/2 + 1)
        cr := n.Uncommitted[er.LeaderCommit]
        cr.ReplicationCount++
        if cr.ReplicationCount >= majority && cr.State != Committed {
            cr.State = Committed
            log.Printf("[%s] !!! apply %+v", n.ID, cr)
            err := n.StateMachine.Apply(cr)
            if err != nil {
                // TODO: what do we do here?
            }
            cr.ResponseChan <- CommandResponse{LeaderID: n.VoteFor, Success: true}
        }
        peer.NextIndex++
    }
}

更新Follower的前提是自己是Leader,否则就没必要进行处理。
这块代码长度较长,但是不难理解,主要目标就是如果本地日志内容和peer日志内容一样新,就只发送心跳包,否则发送日志。然后统计日志被大多数执行了的话,就执行日志。然后更新peer的日志位置。
还有一个判断,如果远端的日志太旧了,就减少Index,直到找到远端的位置,再发送对应的日志。
我们的snapshot还没有,而且commitIndex还没处理,后续的文章会继续!

算法 go 分布式 raft

已有 2 条评论
  1. 曦曦夏日

    虽然看不懂你写的东西,但是,从第一个到这第六个日志,看下来,证明你第一个日志里面写的自己会坚持是已经在执行了的。不管结局如何,但是,重要的是在追寻结局的过程中学到了很多也明白了很多。相信你会成功的,代码写的真心不错,尽管看不懂,但是也看到了一些自己现在才学到的一些东西,也能从你的日志中学到很多,谢谢哟

    曦曦夏日 回复
    1. hundred

      很感谢能够得到你的认可。我也会继续努力的

      hundred 回复
发表新评论