我们现在来看一下Tendermint的工作流程,接上我们第一篇文章了。

为了方便说,我们再重新把NewNode拿过来,看一遍

func NewNode(config *cfg.Config,
    privValidator types.PrivValidator,
    nodeKey *p2p.NodeKey,
    clientCreator proxy.ClientCreator,
    genesisDocProvider GenesisDocProvider,
    dbProvider DBProvider,
    metricsProvider MetricsProvider,
    logger log.Logger) (*Node, error) {

    // 获取我们一个区块存储的对象,我们在blockchain模块提到了这个功能
    blockStoreDB, err := dbProvider(&DBContext{"blockstore", config})
    if err != nil {
        return nil, err
    }
    blockStore := bc.NewBlockStore(blockStoreDB)

    // 系统状态的数据库
    stateDB, err := dbProvider(&DBContext{"state", config})
    if err != nil {
        return nil, err
    }

    // 默认的创始区块信息
    genDoc, err := loadGenesisDoc(stateDB)
    if err != nil {
        genDoc, err = genesisDocProvider()
        if err != nil {
            return nil, err
        }
        // save genesis doc to prevent a certain class of user errors (e.g. when it
        // was changed, accidentally or not). Also good for audit trail.
        saveGenesisDoc(stateDB, genDoc)
    }

    // 读取系统状态,这个我们在state模块提到了本功能
    state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
    if err != nil {
        return nil, err
    }

    // 和abci通信的程序
    proxyApp := proxy.NewAppConns(clientCreator)
    proxyApp.SetLogger(logger.With("module", "proxy"))
    if err := proxyApp.Start(); err != nil {
        return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
    }

    // 会在proxyApp.start中调用handshake,进行区块重放进行APPHASH的校验
    consensusLogger := logger.With("module", "consensus")
    handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
    handshaker.SetLogger(consensusLogger)
    if err := handshaker.Handshake(proxyApp); err != nil {
        return nil, fmt.Errorf("Error during handshake: %v", err)
    }

    // 重新加载一下区块的状态
    state = sm.LoadState(stateDB)

    // Log the version info.
    logger.Info("Version info",
        "software", version.TMCoreSemVer,
        "block", version.BlockProtocol,
        "p2p", version.P2PProtocol,
    )

    // If the state and software differ in block version, at least log it.
    if state.Version.Consensus.Block != version.BlockProtocol {
        logger.Info("Software and state have different block protocols",
            "software", version.BlockProtocol,
            "state", state.Version.Consensus.Block,
        )
    }

    // 创建一个验证角色,并且和其他的validator能够通信
    if config.PrivValidatorListenAddr != "" {
        // If an address is provided, listen on the socket for a connection from an
        // external signing process.
        // FIXME: we should start services inside OnStart
        privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
        if err != nil {
            return nil, errors.Wrap(err, "Error with private validator socket client")
        }
    }

    // Decide whether to fast-sync or not
    // We don't fast-sync when the only validator is us.
    fastSync := config.FastSync
    if state.Validators.Size() == 1 {
        addr, _ := state.Validators.GetByIndex(0)
        if bytes.Equal(privValidator.GetAddress(), addr) {
            fastSync = false
        }
    }

    // Log whether this node is a validator or an observer
    if state.Validators.HasAddress(privValidator.GetAddress()) {
        consensusLogger.Info("This node is a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
    } else {
        consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
    }

    csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider()

    // 创建内存池
    mempool := mempl.NewMempool(
        config.Mempool,
        proxyApp.Mempool(),
        state.LastBlockHeight,
        mempl.WithMetrics(memplMetrics),
        mempl.WithPreCheck(sm.TxPreCheck(state)),
        mempl.WithPostCheck(sm.TxPostCheck(state)),
    )
    mempoolLogger := logger.With("module", "mempool")
    mempool.SetLogger(mempoolLogger)
    if config.Mempool.WalEnabled() {
        mempool.InitWAL() // no need to have the mempool wal during tests
    }
    mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
    mempoolReactor.SetLogger(mempoolLogger)

    if config.Consensus.WaitForTxs() {
        mempool.EnableTxsAvailable()
    }

    // 创建证据Reactor
    evidenceDB, err := dbProvider(&DBContext{"evidence", config})
    if err != nil {
        return nil, err
    }
    evidenceLogger := logger.With("module", "evidence")
    evidenceStore := evidence.NewEvidenceStore(evidenceDB)
    evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
    evidencePool.SetLogger(evidenceLogger)
    evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
    evidenceReactor.SetLogger(evidenceLogger)

    blockExecLogger := logger.With("module", "state")
    // make block executor for consensus and blockchain reactors to execute blocks
    blockExec := sm.NewBlockExecutor(
        stateDB,
        blockExecLogger,
        proxyApp.Consensus(),
        mempool,
        evidencePool,
        sm.BlockExecutorWithMetrics(smMetrics),
    )

    // 方便后面我们的ApplyBlock
    bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
    bcReactor.SetLogger(logger.With("module", "blockchain"))

    // 创建共识的reactor
    consensusState := cs.NewConsensusState(
        config.Consensus,
        state.Copy(),
        blockExec,
        blockStore,
        mempool,
        evidencePool,
        cs.StateMetrics(csMetrics),
    )
    consensusState.SetLogger(consensusLogger)
    if privValidator != nil {
        consensusState.SetPrivValidator(privValidator)
    }
    consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
    consensusReactor.SetLogger(consensusLogger)

    eventBus := types.NewEventBus()
    eventBus.SetLogger(logger.With("module", "events"))

    // services which will be publishing and/or subscribing for messages (events)
    // consensusReactor will set it on consensusState and blockExecutor
    consensusReactor.SetEventBus(eventBus)

    // 交易索引的服务
    var txIndexer txindex.TxIndexer
    switch config.TxIndex.Indexer {
    case "kv":
        store, err := dbProvider(&DBContext{"tx_index", config})
        if err != nil {
            return nil, err
        }
        if config.TxIndex.IndexTags != "" {
            txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " ")))
        } else if config.TxIndex.IndexAllTags {
            txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
        } else {
            txIndexer = kv.NewTxIndex(store)
        }
    default:
        txIndexer = &null.TxIndex{}
    }

    indexerService := txindex.NewIndexerService(txIndexer, eventBus)
    indexerService.SetLogger(logger.With("module", "txindex"))

    p2pLogger := logger.With("module", "p2p")
    nodeInfo, err := makeNodeInfo(
        config,
        nodeKey.ID(),
        txIndexer,
        genDoc.ChainID,
        p2p.NewProtocolVersion(
            version.P2PProtocol, // global
            state.Version.Consensus.Block,
            state.Version.Consensus.App,
        ),
    )
    if err != nil {
        return nil, err
    }

    // Setup Transport.
    var (
        mConnConfig = p2p.MConnConfig(config.P2P)
        transport   = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
        connFilters = []p2p.ConnFilterFunc{}
        peerFilters = []p2p.PeerFilterFunc{}
    )

    if !config.P2P.AllowDuplicateIP {
        connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
    }

    // Filter peers by addr or pubkey with an ABCI query.
    // If the query return code is OK, add peer.
    if config.FilterPeers {
        connFilters = append(
            connFilters,
            // ABCI query for address filtering.
            func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
                res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
                    Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
                })
                if err != nil {
                    return err
                }
                if res.IsErr() {
                    return fmt.Errorf("Error querying abci app: %v", res)
                }

                return nil
            },
        )

        peerFilters = append(
            peerFilters,
            // ABCI query for ID filtering.
            func(_ p2p.IPeerSet, p p2p.Peer) error {
                res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
                    Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
                })
                if err != nil {
                    return err
                }
                if res.IsErr() {
                    return fmt.Errorf("Error querying abci app: %v", res)
                }

                return nil
            },
        )
    }

    p2p.MultiplexTransportConnFilters(connFilters...)(transport)

    // Setup Switch.
    sw := p2p.NewSwitch(
        config.P2P,
        transport,
        p2p.WithMetrics(p2pMetrics),
        p2p.SwitchPeerFilters(peerFilters...),
    )
    sw.SetLogger(p2pLogger)
    // 把各种组件都添加进去,之前的模块都在这集合起来了
    sw.AddReactor("MEMPOOL", mempoolReactor)
    sw.AddReactor("BLOCKCHAIN", bcReactor)
    sw.AddReactor("CONSENSUS", consensusReactor)
    sw.AddReactor("EVIDENCE", evidenceReactor)
    sw.SetNodeInfo(nodeInfo)
    sw.SetNodeKey(nodeKey)

    p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())

    // 地址信息,方便节点连接其他节点,发现其他节点
    addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)

    // Add ourselves to addrbook to prevent dialing ourselves
    addrBook.AddOurAddress(nodeInfo.NetAddress())

    addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
    if config.P2P.PexReactor {
        // TODO persistent peers ? so we can have their DNS addrs saved
        pexReactor := pex.NewPEXReactor(addrBook,
            &pex.PEXReactorConfig{
                Seeds:    splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
                SeedMode: config.P2P.SeedMode,
            })
        pexReactor.SetLogger(logger.With("module", "pex"))
        sw.AddReactor("PEX", pexReactor)
    }

    sw.SetAddrBook(addrBook)

    // run the profile server
    profileHost := config.ProfListenAddress
    if profileHost != "" {
        go func() {
            logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil))
        }()
    }

    node := &Node{
        config:        config,
        genesisDoc:    genDoc,
        privValidator: privValidator,

        transport: transport,
        sw:        sw,
        addrBook:  addrBook,
        nodeInfo:  nodeInfo,
        nodeKey:   nodeKey,

        stateDB:          stateDB,
        blockStore:       blockStore,
        bcReactor:        bcReactor,
        mempoolReactor:   mempoolReactor,
        consensusState:   consensusState,
        consensusReactor: consensusReactor,
        evidencePool:     evidencePool,
        proxyApp:         proxyApp,
        txIndexer:        txIndexer,
        indexerService:   indexerService,
        eventBus:         eventBus,
    }
    node.BaseService = *cmn.NewBaseService(logger, "Node", node)
    return node, nil
}

结合我们之前的分析,我们可以知道在新建一个peer的时候的主要的流程

  1. 创建一个blockstore的实例,并且有个Reactor,方便注册到switch里面
  2. 创建或者打开了state实例,方便更新维护本节点的状态信息
  3. 创建mempool,方便处理各种交易,往ABCI以及共识传递信息,并且写入区块链
  4. 创建ABCI,并且重放所有的区块
  5. 创建证据池,证据Reactor
  6. 创建共识
  7. 创建P2P还有switch,开启地址簿功能
  8. 成功

接下来就得开始运行了

func (n *Node) OnStart() error {
    now := tmtime.Now()
    genTime := n.genesisDoc.GenesisTime
    if genTime.After(now) {
        n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
        time.Sleep(genTime.Sub(now))
    }

    err := n.eventBus.Start()
    if err != nil {
        return err
    }

    // Add private IDs to addrbook to block those peers being added
    n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))

    // Start the RPC server before the P2P server
    // so we can eg. receive txs for the first block
    if n.config.RPC.ListenAddress != "" {
        listeners, err := n.startRPC()
        if err != nil {
            return err
        }
        n.rpcListeners = listeners
    }

    if n.config.Instrumentation.Prometheus &&
        n.config.Instrumentation.PrometheusListenAddr != "" {
        n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
    }

    // Start the transport.
    addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
    if err != nil {
        return err
    }
    if err := n.transport.Listen(*addr); err != nil {
        return err
    }

    n.isListening = true

    // Start the switch (the P2P server).
    err = n.sw.Start()
    if err != nil {
        return err
    }

    // Always connect to persistent peers
    if n.config.P2P.PersistentPeers != "" {
        err = n.sw.DialPeersAsync(n.addrBook, splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "), true)
        if err != nil {
            return err
        }
    }

    // start tx indexer
    return n.indexerService.Start()
}

首先是开启RPC服务,是为了和我们这些客户端进行通信的,然后开启P2P的地址监听,最后打开所有的服务。
程序运行起来了。
结束的函数也差不多,就不粘贴了。
我们现在可以大体说一下Tendermint的工作流程了,创建一系列服务,然后注册到switch上,由switch统一管理。
看完后想着去画一个Tendermint的结构图,找时间试一试看看能不能画出来。

区块链 golang 源码 tendermint

发表新评论