我们现在要啃一个硬骨头了,共识模块。在开始之前,我们先看一下EvidencePool模块,一个合法证据池模块。

先看一下证据池

type EvidencePool struct {
    logger log.Logger

    evidenceStore *EvidenceStore
    evidenceList  *clist.CList // concurrent linked-list of evidence

    // needed to load validators to verify evidence
    stateDB dbm.DB

    // latest state
    mtx   sync.Mutex
    state sm.State
}

跟我们自己想的结果差不多,需要验证节点集合验证证据,存到证据库里面。
新建函数很简单。这个证据池的函数也不是太多,主要就几个,我们来主要关注下添加证据和移除证据。

func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {

    // TODO: check if we already have evidence for this
    // validator at this height so we dont get spammed

    if err := sm.VerifyEvidence(evpool.stateDB, evpool.State(), evidence); err != nil {
        return err
    }

    // fetch the validator and return its voting power as its priority
    // TODO: something better ?
    valset, _ := sm.LoadValidators(evpool.stateDB, evidence.Height())
    _, val := valset.GetByAddress(evidence.Address())
    priority := val.VotingPower

    added := evpool.evidenceStore.AddNewEvidence(evidence, priority)
    if !added {
        // evidence already known, just ignore
        return
    }

    evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence)

    // add evidence to clist
    evpool.evidenceList.PushBack(evidence)

    return nil
}

首先是验证证据是否正确,回到了我们之前看到的State模块

func VerifyEvidence(stateDB dbm.DB, state State, evidence types.Evidence) error {
    height := state.LastBlockHeight

    evidenceAge := height - evidence.Height()
    maxAge := state.ConsensusParams.Evidence.MaxAge
    if evidenceAge > maxAge {
        return fmt.Errorf("Evidence from height %d is too old. Min height is %d",
            evidence.Height(), height-maxAge)
    }

    valset, err := LoadValidators(stateDB, evidence.Height())
    if err != nil {
        // TODO: if err is just that we cant find it cuz we pruned, ignore.
        // TODO: if its actually bad evidence, punish peer
        return err
    }

    // The address must have been an active validator at the height.
    // NOTE: we will ignore evidence from H if the key was not a validator
    // at H, even if it is a validator at some nearby H'
    ev := evidence
    height, addr := ev.Height(), ev.Address()
    _, val := valset.GetByAddress(addr)
    if val == nil {
        return fmt.Errorf("Address %X was not a validator at height %d", addr, height)
    }

    if err := evidence.Verify(state.ChainID, val.PubKey); err != nil {
        return err
    }

    return nil
}

首先保证证据足够的新,不是很旧的节点产生的同意提案,然后这个证据是针对本区块高度,一个合理的验证者产生的
验证通过之后,就需要存到数据库中,以及添加到证据库中。
怎么移除的呢?

func (evpool *EvidencePool) removeEvidence(height, maxAge int64, blockEvidenceMap map[string]struct{}) {
    for e := evpool.evidenceList.Front(); e != nil; e = e.Next() {
        ev := e.Value.(types.Evidence)

        // Remove the evidence if it's already in a block
        // or if it's now too old.
        if _, ok := blockEvidenceMap[evMapKey(ev)]; ok ||
            ev.Height() < height-maxAge {

            // remove from clist
            evpool.evidenceList.Remove(e)
            e.DetachPrev()
        }
    }
}

就是如果区块已经被提交,或者这个证据太旧了,就移除就可以了。但是,注意,只从证据库里面移除,不从数据库中移除。
evidencestore是和数据库打交道的,提供了各种所需要的功能,我们就不进去细看了。
最后,我们再来看下reactor。

type EvidenceReactor struct {
    p2p.BaseReactor
    evpool   *EvidencePool
    eventBus *types.EventBus
}

这个reacotr就是处理证据池的证据的,再peer之间广播,把证据推给证据池,来进行处理。
这个类没有实现start,我们就先看一看Receive函数吧。

func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
    msg, err := decodeMsg(msgBytes)
    if err != nil {
        evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
        evR.Switch.StopPeerForError(src, err)
        return
    }

    if err = msg.ValidateBasic(); err != nil {
        evR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
        evR.Switch.StopPeerForError(src, err)
        return
    }

    evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)

    switch msg := msg.(type) {
    case *EvidenceListMessage:
        for _, ev := range msg.Evidence {
            err := evR.evpool.AddEvidence(ev)
            if err != nil {
                evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
                // punish peer
                evR.Switch.StopPeerForError(src, err)
            }
        }
    default:
        evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
    }
}

首先验证消息是否正确,正确的话就添加到证据库中。否则有问题就停止发过来证据的peer。
我们再看一个函数,addPeer,因为这个会在其他的地方被调用到。

func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
    var next *clist.CElement
    for {
        // This happens because the CElement we were looking at got garbage
        // collected (removed). That is, .NextWait() returned nil. Go ahead and
        // start from the beginning.
        if next == nil {
            select {
            case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
                if next = evR.evpool.EvidenceFront(); next == nil {
                    continue
                }
            case <-peer.Quit():
                return
            case <-evR.Quit():
                return
            }
        }

        ev := next.Value.(types.Evidence)
        msg, retry := evR.checkSendEvidenceMessage(peer, ev)
        if msg != nil {
            success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
            retry = !success
        }

        if retry {
            time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
            continue
        }

        afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
        select {
        case <-afterCh:
            // start from the beginning every tick.
            // TODO: only do this if we're at the end of the list!
            next = nil
        case <-next.NextWaitChan():
            // see the start of the for loop for nil check
            next = next.Next()
        case <-peer.Quit():
            return
        case <-evR.Quit():
            return
        }
    }
}

可以看到,添加一个peer后,只要自己有证据,就会发送给其他人。
这个evidence模块感觉还不是太复杂,因此,我们就等着看consensus模块吧。

区块链 golang 源码 tendermint

发表新评论