我们要开始看最难的了,共识模块,这块事区块链的核心,也是流程多到让人痛苦,我们就尽力的看一看吧。

首先看一下,ConsensusState的定义

type ConsensusState struct {
    cmn.BaseService

    // 对于本peer的一些配置
    config        *cfg.ConsensusConfig
    privValidator types.PrivValidator // for signing votes

    // 对于区块进行的处理,包括区块执行,区块存储,内存池和证据池
    blockExec  *sm.BlockExecutor
    blockStore sm.BlockStore
    mempool    sm.Mempool
    evpool     sm.EvidencePool

    // 关于共识的内部状态
    mtx sync.RWMutex
    cstypes.RoundState
    triggeredTimeoutPrecommit bool
    state                     sm.State // State until height-1.

    // 共识状态可能会受到一下三个更新改变,包括对等节点的消息,自身的变化以及超时
    peerMsgQueue     chan msgInfo
    internalMsgQueue chan msgInfo
    timeoutTicker    TimeoutTicker

    // 状态信息队列
    statsMsgQueue chan msgInfo

    // 事件通知器
    eventBus *types.EventBus

    // WAL,崩溃回复处理之类的
    wal          WAL
    replayMode   bool // so we don't log signing errors during replay
    doWALCatchup bool // determines if we even try to do the catchup

    // for tests where we want to limit the number of transitions the state makes
    nSteps int

    // some functions can be overwritten for testing
    decideProposal func(height int64, round int)
    doPrevote      func(height int64, round int)
    setProposal    func(proposal *types.Proposal) error

    // closed when we finish shutting down
    done chan struct{}

    // synchronous pubsub between consensus state and reactor.
    // state only emits EventNewRoundStep and EventVote
    evsw tmevents.EventSwitch

    // for reporting metrics
    metrics *Metrics
}

我们对部分变量进行了注释,共识主要是处理投票,提案,对信息达成一致,提交区块,和abci程序进行通信等功能。
实现的功能挺多的,我们再仔细看一下怎么新建的共识。

func NewConsensusState(
    config *cfg.ConsensusConfig,
    state sm.State,
    blockExec *sm.BlockExecutor,
    blockStore sm.BlockStore,
    mempool sm.Mempool,
    evpool sm.EvidencePool,
    options ...StateOption,
) *ConsensusState {
    cs := &ConsensusState{
        config:           config,
        blockExec:        blockExec,
        blockStore:       blockStore,
        mempool:          mempool,
        peerMsgQueue:     make(chan msgInfo, msgQueueSize),
        internalMsgQueue: make(chan msgInfo, msgQueueSize),
        timeoutTicker:    NewTimeoutTicker(),
        statsMsgQueue:    make(chan msgInfo, msgQueueSize),
        done:             make(chan struct{}),
        doWALCatchup:     true,
        wal:              nilWAL{},
        evpool:           evpool,
        evsw:             tmevents.NewEventSwitch(),
        metrics:          NopMetrics(),
    }
    // set function defaults (may be overwritten before calling Start)
    cs.decideProposal = cs.defaultDecideProposal
    cs.doPrevote = cs.defaultDoPrevote
    cs.setProposal = cs.defaultSetProposal

    cs.updateToState(state)

    // Don't call scheduleRound0 yet.
    // We do that upon Start().
    cs.reconstructLastCommit(state)
    cs.BaseService = *cmn.NewBaseService(nil, "ConsensusState", cs)
    for _, option := range options {
        option(cs)
    }
    return cs
}

里面有两个挺重要的函数,一个是把共识更新到当前状态,另一个事是建最后一次的提交。
先看一下是如何更新到当前状态的

func (cs *ConsensusState) updateToState(state sm.State) {
    // 指定高度H和状态记录的高度不一致
    if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight {
        cmn.PanicSanity(fmt.Sprintf("updateToState() expected state height of %v but found %v",
            cs.Height, state.LastBlockHeight))
    }
    if !cs.state.IsEmpty() && cs.state.LastBlockHeight+1 != cs.Height {
        // This might happen when someone else is mutating cs.state.
        // Someone forgot to pass in state.Copy() somewhere?!
        cmn.PanicSanity(fmt.Sprintf("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v",
            cs.state.LastBlockHeight+1, cs.Height))
    }

    // If state isn't further out than cs.state, just ignore.
    // This happens when SwitchToConsensus() is called in the reactor.
    // We don't want to reset e.g. the Votes, but we still want to
    // signal the new round step, because other services (eg. mempool)
    // depend on having an up-to-date peer state!
    if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
        cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
        cs.newStep()
        return
    }

    // Reset fields based on state.
    validators := state.Validators
    lastPrecommits := (*types.VoteSet)(nil)
    if cs.CommitRound > -1 && cs.Votes != nil {
        if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() {
            cmn.PanicSanity("updateToState(state) called but last Precommit round didn't have +2/3")
        }
        lastPrecommits = cs.Votes.Precommits(cs.CommitRound)
    }

    // Next desired block height
    height := state.LastBlockHeight + 1

    // RoundState fields
    cs.updateHeight(height)
    cs.updateRoundStep(0, cstypes.RoundStepNewHeight)
    if cs.CommitTime.IsZero() {
        // "Now" makes it easier to sync up dev nodes.
        // We add timeoutCommit to allow transactions
        // to be gathered for the first block.
        // And alternative solution that relies on clocks:
        //  cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
        cs.StartTime = cs.config.Commit(tmtime.Now())
    } else {
        cs.StartTime = cs.config.Commit(cs.CommitTime)
    }

    cs.Validators = validators
    cs.Proposal = nil
    cs.ProposalBlock = nil
    cs.ProposalBlockParts = nil
    cs.LockedRound = -1
    cs.LockedBlock = nil
    cs.LockedBlockParts = nil
    cs.ValidRound = -1
    cs.ValidBlock = nil
    cs.ValidBlockParts = nil
    cs.Votes = cstypes.NewHeightVoteSet(state.ChainID, height, validators)
    cs.CommitRound = -1
    cs.LastCommit = lastPrecommits
    cs.LastValidators = state.LastValidators

    cs.state = state

    // Finally, broadcast RoundState
    cs.newStep()
}

太难了,先放着

区块链 golang 源码 tendermint

发表新评论