我们上一篇文章分析了一个节点作为主动发起方,向别的节点请求投票的信息,本篇就分析下接收方的工作流程。

在前面的几篇文章里,我们定义了几个通信的接口,只是没有定义具体的实现,我们现在就先实现以下。

server

首先,server我们用的是GRPC,所以很多底层的功能都是已经封装好的,我们只需要外部调用即可。

type grpcServerImpl struct {
    address string
    server *grpc.Server
    lock *sync.Mutex
    node *Node
}

这个结构不复杂,加Node的目标是为了和node进行关联。我们稍后就明白为什么要这么做。
继续看构造函数。

func NewGRPCServer(address string) (GRPCServer, error) {

    if address == "" {
        return nil, errors.New("Missing address parameter")
    }
    //create our listener
    lis, err := net.Listen("tcp", address)

    if err != nil {
        return nil, err
    }
    log.Println("listen to address "+address)
    return NewGRPCServerFromListener(lis)

}

func NewGRPCServerFromListener(listener net.Listener) (GRPCServer, error) {

    grpcServer := &grpcServerImpl{
        address:  listener.Addr().String(),
        listener: listener,
        lock:     &sync.Mutex{},
    }

    var serverOpts []grpc.ServerOption

    serverOpts = append(serverOpts, grpc.MaxSendMsgSize(1024*1024))
    serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(1024*1024))

    grpcServer.server = grpc.NewServer(serverOpts...)
    pb.RegisterMgrServer(grpcServer.server,grpcServer)

    return grpcServer, nil
}

这段内容是我直接从fabric里面复制出来的,可能看着内容比较多,主要目标就是监听一个指定地址,然后配置grpc连接参数,开启server,与我们proto文件生成的服务进行绑定。
在我们启动这个server的时候,需要把Node传递进来

func (gServer *grpcServerImpl) Start(n *Node) error {
    gServer.node = n
    go gServer.server.Serve(gServer.listener)
    return nil
}

为什么会是协程呢?因为如果不这样的话,程序会阻塞在serve这,我们之前的循环内容无法进行操作。
结束功能也很简单

func (gServer *grpcServerImpl) Stop() {
    gServer.server.Stop()
}

问题不大!
接下来要做的就是想用客户端的RPC请求了!我们需要实现这请求处理接口!

func (gServer *grpcServerImpl) RequestVote(ctx context.Context, vr *pb.VoteRequest) (*pb.VoteRespond, error) {
    vresp, err := gServer.node.RequestVote(*vr)
    return &vresp, err
}

我们调用了node的RequestVote()函数进行处理,进行具体定义吧!

节点

在节点这端,我们知道大部分的消息处理都是在loop函数里面进行操作的,而消息已经获取了,我们就需要怎么把消息通过channel传递过去,这个就是我们RequestVote函数需要做的内容

func (n *Node) RequestVote(vr pb.VoteRequest) (pb.VoteRespond, error) {
    n.RequestVoteChan <- vr
    return <-n.RequestVoteResponseChan, nil
}

在for的select循环里面,就需要有n.RequestVoteChan的处理了

case vreq := <-n.RequestVoteChan:
        vresp, _ := n.doRequestVote(vreq)
        n.RequestVoteResponseChan <- vresp

OK!来具体的解析其他用户发过来的投票请求吧!

func (n *Node) doRequestVote(vr pb.VoteRequest) (pb.VoteRespond, error) {
    if vr.Term < n.Term {
        return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
    }
    if vr.Term > n.Term {
        n.endElection()
        n.setTerm(vr.Term)
    }
    if n.VoteFor != "" && n.VoteFor != vr.CandidateID {
        return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
    }
    if n.Log.FresherThan(vr.LastLogIndex, vr.LastLogTerm) {
        return pb.VoteRespond{Term: n.Term, VoteGranted: false}, nil
    }
    n.VoteFor = vr.CandidateID
    return pb.VoteRespond{Term: n.Term, VoteGranted: true}, nil
}

这个处理逻辑在论文里面已经描述的很请处理,有这么几点情况需要返回false

  1. vote的任期小于自身存储的任期(其他人的太旧了,或者自己这边已经有更新的leader了)
  2. 如果自己已经给别人投票了,那么久不能在给这个人投票了
  3. 如果自己记录的日志内容比vote人的日志内容新,也不能给他投同意了!

只有以上三点都不满足,才记录下来,我给你投票,同意你的选举!
当然啦,如果vote人的任期比我的新,肯定是我比较落伍,就得更新下自己的任期,然后停止选举。
如此一来!投票选举流程就结束了!
可能有的朋友意识到了一个问题!
n.Log.FresherThan(vr.LastLogIndex, vr.LastLogTerm)是在哪实现的?
我们先留个问号!下一篇再解决!

算法 go 分布式 raft

发表新评论