我们在上一篇文章中提到,block处理有处理state,以及相应的执行过程。我们这一节就来看一看。

我们先看一下State的定义

type State struct {
    Version Version

    // immutable
    ChainID string

    // LastBlockHeight=0 at genesis (ie. block(H=0) does not exist)
    LastBlockHeight  int64
    LastBlockTotalTx int64
    LastBlockID      types.BlockID
    LastBlockTime    time.Time

    // LastValidators is used to validate block.LastCommit.
    // Validators are persisted to the database separately every time they change,
    // so we can query for historical validator sets.
    // Note that if s.LastBlockHeight causes a valset change,
    // we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1 + 1
    // Extra +1 due to nextValSet delay.
    NextValidators              *types.ValidatorSet
    Validators                  *types.ValidatorSet
    LastValidators              *types.ValidatorSet
    LastHeightValidatorsChanged int64

    // Consensus parameters used for validating blocks.
    // Changes returned by EndBlock and updated after Commit.
    ConsensusParams                  types.ConsensusParams
    LastHeightConsensusParamsChanged int64

    // Merkle root of the results from executing prev block
    LastResultsHash []byte

    // the latest AppHash we've received from calling abci.Commit()
    AppHash []byte
}

看这个结构,也可以发现State是一个记录上一个区块链的状态集合。包括上一个已经提交区块的高度,交易信息,上一个区块的时间和ID。然后是一系列的验证节点集合,包括下一次的验证节点集合,所有的验证节点以及上一个区块的验证节点。
根据要求,State的状态是不能直接修改的,要使用得自己Copy出去,或者用NextState生成一个新的状态。
这些都是一些基本的,重要的是生成区块,我们看下是怎么封装成区块的

func (state State) MakeBlock(
    height int64,
    txs []types.Tx,
    commit *types.Commit,
    evidence []types.Evidence,
    proposerAddress []byte,
) (*types.Block, *types.PartSet) {

    // Build base block with block data.
    block := types.MakeBlock(height, txs, commit, evidence)

    // Set time.
    var timestamp time.Time
    if height == 1 {
        timestamp = state.LastBlockTime // genesis time
    } else {
        timestamp = MedianTime(commit, state.LastValidators)
    }

    // Fill rest of header with state data.
    block.Header.Populate(
        state.Version.Consensus, state.ChainID,
        timestamp, state.LastBlockID, state.LastBlockTotalTx+block.NumTxs,
        state.Validators.Hash(), state.NextValidators.Hash(),
        state.ConsensusParams.Hash(), state.AppHash, state.LastResultsHash,
        proposerAddress,
    )

    return block, block.MakePartSet(types.BlockPartSizeBytes)
}

这个看着还好理解,就是把区块的内容封装起来,生成一个完整的区块信息。这个地方使用了一个中值时间,主要的目标就是防止有恶意节点估计篡改区块生成时间,因为是所有的中间值,在出现之前很难修改成功。
我们再来看下block是怎么被执行的。

type BlockExecutor struct {
    // save state, validators, consensus params, abci responses here
    db dbm.DB

    // execute the app against this
    proxyApp proxy.AppConnConsensus

    // events
    eventBus types.BlockEventPublisher

    // update these with block results after commit
    mempool Mempool
    evpool  EvidencePool

    logger log.Logger

    metrics *Metrics
}

没什么大问题,存储状态,更新内存池,和ABCI程序通信。
怎么执行的呢?

func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, block *types.Block) (State, error) {
    // 验证区块是否正确
    if err := blockExec.ValidateBlock(state, block); err != nil {
        return state, ErrInvalidBlock(err)
    }
    // 让ABCI程序验证
    startTime := time.Now().UnixNano()
    abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
    endTime := time.Now().UnixNano()
    blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
    if err != nil {
        return state, ErrProxyAppConn(err)
    }

    fail.Fail() // XXX

    // 把返回的结果记录到数据库
    saveABCIResponses(blockExec.db, block.Height, abciResponses)

    fail.Fail() // XXX

    // 更新验证节点集合
    abciValUpdates := abciResponses.EndBlock.ValidatorUpdates
    err = validateValidatorUpdates(abciValUpdates, state.ConsensusParams.Validator)
    if err != nil {
        return state, fmt.Errorf("Error in validator updates: %v", err)
    }
    validatorUpdates, err := types.PB2TM.ValidatorUpdates(abciValUpdates)
    if err != nil {
        return state, err
    }
    if len(validatorUpdates) > 0 {
        blockExec.logger.Info("Updates to validators", "updates", makeValidatorUpdatesLogString(validatorUpdates))
    }

    // 根据验证的结果,获取写一个区块链状态
    state, err = updateState(state, blockID, &block.Header, abciResponses, validatorUpdates)
    if err != nil {
        return state, fmt.Errorf("Commit failed for application: %v", err)
    }

    // 调用ABCI程序,获取APPHASH,并且更新下内存池
    appHash, err := blockExec.Commit(state, block)
    if err != nil {
        return state, fmt.Errorf("Commit failed for application: %v", err)
    }

    // 更新证据池
    blockExec.evpool.Update(block, state)

    fail.Fail() // XXX

    // 保存状态
    state.AppHash = appHash
    SaveState(blockExec.db, state)

    fail.Fail() // XXX

    // Events are fired after everything else.
    // NOTE: if we crash between Commit and Save, events wont be fired during replay
    fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses, validatorUpdates)

    return state, nil
}

看着我们代码里面的注释,还能够理解出来,主要的流程就是

  1. 验证当前区块是否合法
  2. 把数据提交给ABCI,获取对应的响应结果
  3. 根绝当前区块信息还有ABCI的结果,生成新的区块
  4. 调用下ABCI的commit返回APPHASH,这时候才算要保存了
  5. 保存状态,生成下一个状态
    怎么验证的呢?
func validateBlock(stateDB dbm.DB, state State, block *types.Block) error {
    // Validate internal consistency.
    if err := block.ValidateBasic(); err != nil {
        return err
    }

    // Validate basic info.
    if block.Version != state.Version.Consensus {
        return fmt.Errorf("Wrong Block.Header.Version. Expected %v, got %v",
            state.Version.Consensus,
            block.Version,
        )
    }
    if block.ChainID != state.ChainID {
        return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v",
            state.ChainID,
            block.ChainID,
        )
    }
    if block.Height != state.LastBlockHeight+1 {
        return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v",
            state.LastBlockHeight+1,
            block.Height,
        )
    }

    // Validate prev block info.
    if !block.LastBlockID.Equals(state.LastBlockID) {
        return fmt.Errorf("Wrong Block.Header.LastBlockID.  Expected %v, got %v",
            state.LastBlockID,
            block.LastBlockID,
        )
    }

    newTxs := int64(len(block.Data.Txs))
    if block.TotalTxs != state.LastBlockTotalTx+newTxs {
        return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v",
            state.LastBlockTotalTx+newTxs,
            block.TotalTxs,
        )
    }

    // Validate app info
    if !bytes.Equal(block.AppHash, state.AppHash) {
        return fmt.Errorf("Wrong Block.Header.AppHash.  Expected %X, got %v",
            state.AppHash,
            block.AppHash,
        )
    }
    if !bytes.Equal(block.ConsensusHash, state.ConsensusParams.Hash()) {
        return fmt.Errorf("Wrong Block.Header.ConsensusHash.  Expected %X, got %v",
            state.ConsensusParams.Hash(),
            block.ConsensusHash,
        )
    }
    if !bytes.Equal(block.LastResultsHash, state.LastResultsHash) {
        return fmt.Errorf("Wrong Block.Header.LastResultsHash.  Expected %X, got %v",
            state.LastResultsHash,
            block.LastResultsHash,
        )
    }
    if !bytes.Equal(block.ValidatorsHash, state.Validators.Hash()) {
        return fmt.Errorf("Wrong Block.Header.ValidatorsHash.  Expected %X, got %v",
            state.Validators.Hash(),
            block.ValidatorsHash,
        )
    }
    if !bytes.Equal(block.NextValidatorsHash, state.NextValidators.Hash()) {
        return fmt.Errorf("Wrong Block.Header.NextValidatorsHash.  Expected %X, got %v",
            state.NextValidators.Hash(),
            block.NextValidatorsHash,
        )
    }

    // Validate block LastCommit.
    if block.Height == 1 {
        if len(block.LastCommit.Precommits) != 0 {
            return errors.New("Block at height 1 can't have LastCommit precommits")
        }
    } else {
        if len(block.LastCommit.Precommits) != state.LastValidators.Size() {
            return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
                state.LastValidators.Size(),
                len(block.LastCommit.Precommits),
            )
        }
        // 此处和其他人理解有点不一致。我认为是用状态记录的区块验证集合,跟区块中的
        // 验证集合进行对比。而不是用这个的信息去验证上一个区块。
        err := state.LastValidators.VerifyCommit(
            state.ChainID, state.LastBlockID, block.Height-1, block.LastCommit)
        if err != nil {
            return err
        }
    }

    // Validate block Time
    if block.Height > 1 {
        if !block.Time.After(state.LastBlockTime) {
            return fmt.Errorf("Block time %v not greater than last block time %v",
                block.Time,
                state.LastBlockTime,
            )
        }

        medianTime := MedianTime(block.LastCommit, state.LastValidators)
        if !block.Time.Equal(medianTime) {
            return fmt.Errorf("Invalid block time. Expected %v, got %v",
                medianTime,
                block.Time,
            )
        }
    } else if block.Height == 1 {
        genesisTime := state.LastBlockTime
        if !block.Time.Equal(genesisTime) {
            return fmt.Errorf("Block time %v is not equal to genesis time %v",
                block.Time,
                genesisTime,
            )
        }
    }

    // Limit the amount of evidence
    maxEvidenceBytes := types.MaxEvidenceBytesPerBlock(state.ConsensusParams.BlockSize.MaxBytes)
    evidenceBytes := int64(len(block.Evidence.Evidence)) * types.MaxEvidenceBytes
    if evidenceBytes > maxEvidenceBytes {
        return types.NewErrEvidenceOverflow(maxEvidenceBytes, evidenceBytes)
    }

    // Validate all evidence.
    for _, ev := range block.Evidence.Evidence {
        if err := VerifyEvidence(stateDB, state, ev); err != nil {
            return types.NewErrEvidenceInvalid(ev, err)
        }
    }

    // NOTE: We can't actually verify it's the right proposer because we dont
    // know what round the block was first proposed. So just check that it's
    // a legit address and a known validator.
    if len(block.ProposerAddress) != crypto.AddressSize ||
        !state.Validators.HasAddress(block.ProposerAddress) {
        return fmt.Errorf("Block.Header.ProposerAddress, %X, is not a validator",
            block.ProposerAddress,
        )
    }

    return nil
}

主要的就是验证区块的各块信息是否正确,能否达成一致。不一致就返回错误信息。
接下来我们要看一下是怎么传给ABCI程序执行的

func execBlockOnProxyApp(
    logger log.Logger,
    proxyAppConn proxy.AppConnConsensus,
    block *types.Block,
    lastValSet *types.ValidatorSet,
    stateDB dbm.DB,
) (*ABCIResponses, error) {
    var validTxs, invalidTxs = 0, 0

    txIndex := 0
    abciResponses := NewABCIResponses(block)

    // Execute transactions and get hash.
    proxyCb := func(req *abci.Request, res *abci.Response) {
        switch r := res.Value.(type) {
        case *abci.Response_DeliverTx:
            // TODO: make use of res.Log
            // TODO: make use of this info
            // Blocks may include invalid txs.
            txRes := r.DeliverTx
            if txRes.Code == abci.CodeTypeOK {
                validTxs++
            } else {
                logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
                invalidTxs++
            }
            abciResponses.DeliverTx[txIndex] = txRes
            txIndex++
        }
    }
    proxyAppConn.SetResponseCallback(proxyCb)

    commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)

    // Begin block
    var err error
    abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
        Hash:                block.Hash(),
        Header:              types.TM2PB.Header(&block.Header),
        LastCommitInfo:      commitInfo,
        ByzantineValidators: byzVals,
    })
    if err != nil {
        logger.Error("Error in proxyAppConn.BeginBlock", "err", err)
        return nil, err
    }

    // Run txs of block.
    for _, tx := range block.Txs {
        proxyAppConn.DeliverTxAsync(tx)
        if err := proxyAppConn.Error(); err != nil {
            return nil, err
        }
    }

    // End block.
    abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{Height: block.Height})
    if err != nil {
        logger.Error("Error in proxyAppConn.EndBlock", "err", err)
        return nil, err
    }

    logger.Info("Executed block", "height", block.Height, "validTxs", validTxs, "invalidTxs", invalidTxs)

    return abciResponses, nil
}

整体的流程是,首先通知abci程序,需要进行生成区块了,等待ABCI程序的结果。所以,我们如果了解ABCI的话,我们会发现,我们可以自己实现BeginBlock的操作,返回结果,不过,官方例都没有做。
接下来是把TX一个一个的发给ABCI,这个官方历程都做了,最后是EndBlock,嗯,没做示例。
到这个地方,和abci的通信已经做完了,接下来就是更新自己的状态内容了。跟ABCI没关系了。
接下来是updateState

func updateState(
    state State,
    blockID types.BlockID,
    header *types.Header,
    abciResponses *ABCIResponses,
    validatorUpdates []*types.Validator,
) (State, error) {

    // Copy the valset so we can apply changes from EndBlock
    // and update s.LastValidators and s.Validators.
    nValSet := state.NextValidators.Copy()

    // Update the validator set with the latest abciResponses.
    lastHeightValsChanged := state.LastHeightValidatorsChanged
    if len(validatorUpdates) > 0 {
        err := updateValidators(nValSet, validatorUpdates)
        if err != nil {
            return state, fmt.Errorf("Error changing validator set: %v", err)
        }
        // Change results from this height but only applies to the next next height.
        lastHeightValsChanged = header.Height + 1 + 1
    }

    // Update validator proposer priority and set state variables.
    nValSet.IncrementProposerPriority(1)

    // Update the params with the latest abciResponses.
    nextParams := state.ConsensusParams
    lastHeightParamsChanged := state.LastHeightConsensusParamsChanged
    if abciResponses.EndBlock.ConsensusParamUpdates != nil {
        // NOTE: must not mutate s.ConsensusParams
        nextParams = state.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
        err := nextParams.Validate()
        if err != nil {
            return state, fmt.Errorf("Error updating consensus params: %v", err)
        }
        // Change results from this height but only applies to the next height.
        lastHeightParamsChanged = header.Height + 1
    }

    // TODO: allow app to upgrade version
    nextVersion := state.Version

    // NOTE: the AppHash has not been populated.
    // It will be filled on state.Save.
    return State{
        Version:                          nextVersion,
        ChainID:                          state.ChainID,
        LastBlockHeight:                  header.Height,
        LastBlockTotalTx:                 state.LastBlockTotalTx + header.NumTxs,
        LastBlockID:                      blockID,
        LastBlockTime:                    header.Time,
        NextValidators:                   nValSet,
        Validators:                       state.NextValidators.Copy(),
        LastValidators:                   state.Validators.Copy(),
        LastHeightValidatorsChanged:      lastHeightValsChanged,
        ConsensusParams:                  nextParams,
        LastHeightConsensusParamsChanged: lastHeightParamsChanged,
        LastResultsHash:                  abciResponses.ResultsHash(),
        AppHash:                          nil,
    }, nil
}

首先是更新验证节点集合,更新策略就是这样的
如果当前不存在则直接加入一个验证者
如果当前存在并且投票权为0则删除
如果当前存在其投票权不为0则更新
然后更新共识参数,生成state,但是,这个时候还没有APPHASH呢,因为还没有调用完。
然后执行commit

func (blockExec *BlockExecutor) Commit(
    state State,
    block *types.Block,
) ([]byte, error) {
    blockExec.mempool.Lock()
    defer blockExec.mempool.Unlock()

    // while mempool is Locked, flush to ensure all async requests have completed
    // in the ABCI app before Commit.
    err := blockExec.mempool.FlushAppConn()
    if err != nil {
        blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
        return nil, err
    }

    // Commit block, get hash back
    res, err := blockExec.proxyApp.CommitSync()
    if err != nil {
        blockExec.logger.Error(
            "Client error during proxyAppConn.CommitSync",
            "err", err,
        )
        return nil, err
    }
    // ResponseCommit has no error code - just data

    blockExec.logger.Info(
        "Committed state",
        "height", block.Height,
        "txs", block.NumTxs,
        "appHash", fmt.Sprintf("%X", res.Data),
    )

    // Update mempool.
    err = blockExec.mempool.Update(
        block.Height,
        block.Txs,
        TxPreCheck(state),
        TxPostCheck(state),
    )

    return res.Data, err
}

不复杂,就是通知ABCI程序要commit到数据库了,调用了ABCI程序的COMMIT函数。然后从mempool中移除本次提交的交易。
最后一步,把状态写入区块。

func saveState(db dbm.DB, state State, key []byte) {
    nextHeight := state.LastBlockHeight + 1
    // If first block, save validators for block 1.
    if nextHeight == 1 {
        // This extra logic due to Tendermint validator set changes being delayed 1 block.
        // It may get overwritten due to InitChain validator updates.
        lastHeightVoteChanged := int64(1)
        saveValidatorsInfo(db, nextHeight, lastHeightVoteChanged, state.Validators)
    }
    // Save next validators.
    saveValidatorsInfo(db, nextHeight+1, state.LastHeightValidatorsChanged, state.NextValidators)
    // Save next consensus params.
    saveConsensusParamsInfo(db, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams)
    db.SetSync(key, state.Bytes())
}

我们可以看明白了,通过这个流程,最最最最主要的就是ApplyBlock程序了。
这几个环节我们中间也进行了分析,再写ABCI程序也更清晰了。

区块链 golang 源码 tendermint

发表新评论