这次我们来看一个复杂的模块,blockchain模块,实际上我个人感觉,叫block模块似乎更好。

先看一下基本的区块池的定义

type BlockPool struct {
    cmn.BaseService
    startTime time.Time

    mtx sync.Mutex
    // 请求下载区块的进程
    requesters map[int64]*bpRequester
    height     int64 // 最低的区块高度
    // 其他的对等节点
    peers         map[p2p.ID]*bpPeer
    maxPeerHeight int64

    // atomic
    numPending int32 // 等待分配下载区块请求或者等待相应的requester的数目

    requestsCh chan<- BlockRequest
    errorsCh   chan<- peerError
}

这里面的成员变量还能够看懂,就是等到看的时候不一定能够很清晰的弄明白。我们继续往后面看。
新建函数是这样的

func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
    bp := &BlockPool{
        peers: make(map[p2p.ID]*bpPeer),

        requesters: make(map[int64]*bpRequester),
        height:     start,
        numPending: 0,

        requestsCh: requestsCh,
        errorsCh:   errorsCh,
    }
    bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp)
    return bp
}

很简单,没有复杂的功能。那我们看一下启动函数吧。

func (pool *BlockPool) OnStart() error {
    go pool.makeRequestersRoutine()
    pool.startTime = time.Now()
    return nil
}

主要是启动了一个协程,看来主要负责下载区块的都在这了。

func (pool *BlockPool) makeRequestersRoutine() {
    for {
        if !pool.IsRunning() {
            break
        }

        _, numPending, lenRequesters := pool.GetStatus()
        if numPending >= maxPendingRequests {
            // sleep for a bit.
            time.Sleep(requestIntervalMS * time.Millisecond)
            // check for timed out peers
            pool.removeTimedoutPeers()
        } else if lenRequesters >= maxTotalRequesters {
            // sleep for a bit.
            time.Sleep(requestIntervalMS * time.Millisecond)
            // check for timed out peers
            pool.removeTimedoutPeers()
        } else {
            // request for more blocks.
            pool.makeNextRequester()
        }
    }
}

可以看到,如果我们分配的request足够了,或者等待相应的进程够了,我们就不再请求新的了。否则的话,就创建进程,继续下载。

func (pool *BlockPool) makeNextRequester() {
    pool.mtx.Lock()
    defer pool.mtx.Unlock()

    nextHeight := pool.height + pool.requestersLen()
    if nextHeight > pool.maxPeerHeight {
        return
    }

    request := newBPRequester(pool, nextHeight)

    pool.requesters[nextHeight] = request
    atomic.AddInt32(&pool.numPending, 1)

    err := request.Start()
    if err != nil {
        request.Logger.Error("Error starting request", "err", err)
    }
}

可以看到,每次都是创建一个requester来下载一个高度的区块,如果要下载的区块高度,超过了总的区块个数,那就不要下载了。
因为这个下载池肯定要给每个高度分配一个requester,所以创建了一个map来存储对应关系。最后启动起来进行下载。
看一看是怎么下载的
我们可以看到,下载时创建的BPRequester对象,然后下载时启动的Start函数

func (bpr *bpRequester) OnStart() error {
    go bpr.requestRoutine()
    return nil
}

又是一个协程

func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
    for {
        // Pick a peer to send request to.
        var peer *bpPeer
    PICK_PEER_LOOP:
        for {
            if !bpr.IsRunning() || !bpr.pool.IsRunning() {
                return
            }
            peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
            if peer == nil {
                //log.Info("No peers available", "height", height)
                time.Sleep(requestIntervalMS * time.Millisecond)
                continue PICK_PEER_LOOP
            }
            break PICK_PEER_LOOP
        }
        bpr.mtx.Lock()
        bpr.peerID = peer.id
        bpr.mtx.Unlock()

        // Send request and wait.
        bpr.pool.sendRequest(bpr.height, peer.id)
    WAIT_LOOP:
        for {
            select {
            case <-bpr.pool.Quit():
                bpr.Stop()
                return
            case <-bpr.Quit():
                return
            case peerID := <-bpr.redoCh:
                if peerID == bpr.peerID {
                    bpr.reset()
                    continue OUTER_LOOP
                } else {
                    continue WAIT_LOOP
                }
            case <-bpr.gotBlockCh:
                // We got a block!
                // Continue the for-loop and wait til Quit.
                continue WAIT_LOOP
            }
        }
    }
}

首先,对于指定的区块高度,就是选择一个peer进行下载,如果找不到,就等等,只要能够等到。
前面的几个case,很好理解,就是对应的内容做对应的操作。最后一个没有做处理,只是continue,为什么呢?因为这个地方接收到了区块,会发送给reactor,交给共识处理去了。
接下来,我们就要看下reactor了。

type BlockchainReactor struct {
    p2p.BaseReactor

    // immutable
    initialState sm.State

    blockExec *sm.BlockExecutor
    store     *BlockStore
    pool      *BlockPool
    fastSync  bool

    requestsCh <-chan BlockRequest
    errorsCh   <-chan peerError
}

这里面涉及到了其他的模块,我们暂时先忽略,就按照字面意思去理解。怎么创建的呢?

func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
    fastSync bool) *BlockchainReactor {

    if state.LastBlockHeight != store.Height() {
        panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
            store.Height()))
    }

    requestsCh := make(chan BlockRequest, maxTotalRequesters)

    const capacity = 1000                      // must be bigger than peers count
    errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock

    pool := NewBlockPool(
        store.Height()+1,
        requestsCh,
        errorsCh,
    )

    bcR := &BlockchainReactor{
        initialState: state,
        blockExec:    blockExec,
        store:        store,
        pool:         pool,
        fastSync:     fastSync,
        requestsCh:   requestsCh,
        errorsCh:     errorsCh,
    }
    bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
    return bcR
}

检查区块高度是否一直,然后创建和blockpool通信的channel,然后还创建了和peer出错进行通信的channel。
怎么工作的呢?

func (bcR *BlockchainReactor) OnStart() error {
    if bcR.fastSync {
        err := bcR.pool.Start()
        if err != nil {
            return err
        }
        go bcR.poolRoutine()
    }
    return nil
}

很简单,不是快同步的话就不执行了,然后启动下载池,再打开对接受信息处理的协程。

func (bcR *BlockchainReactor) poolRoutine() {

    trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
    statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
    switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)

    blocksSynced := 0

    chainID := bcR.initialState.ChainID
    state := bcR.initialState

    lastHundred := time.Now()
    lastRate := 0.0

    didProcessCh := make(chan struct{}, 1)

FOR_LOOP:
    for {
        select {
        case request := <-bcR.requestsCh:
            peer := bcR.Switch.Peers().Get(request.PeerID)
            if peer == nil {
                continue FOR_LOOP // Peer has since been disconnected.
            }
            msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height})
            queued := peer.TrySend(BlockchainChannel, msgBytes)
            if !queued {
                // We couldn't make the request, send-queue full.
                // The pool handles timeouts, just let it go.
                continue FOR_LOOP
            }

        case err := <-bcR.errorsCh:
            peer := bcR.Switch.Peers().Get(err.peerID)
            if peer != nil {
                bcR.Switch.StopPeerForError(peer, err)
            }

        case <-statusUpdateTicker.C:
            // ask for status updates
            go bcR.BroadcastStatusRequest() // nolint: errcheck

        case <-switchToConsensusTicker.C:
            height, numPending, lenRequesters := bcR.pool.GetStatus()
            outbound, inbound, _ := bcR.Switch.NumPeers()
            bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
                "outbound", outbound, "inbound", inbound)
            if bcR.pool.IsCaughtUp() {
                bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
                bcR.pool.Stop()

                conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
                if ok {
                    conR.SwitchToConsensus(state, blocksSynced)
                } else {
                    // should only happen during testing
                }

                break FOR_LOOP
            }

        case <-trySyncTicker.C: // chan time
            select {
            case didProcessCh <- struct{}{}:
            default:
            }

        case <-didProcessCh:
            // NOTE: It is a subtle mistake to process more than a single block
            // at a time (e.g. 10) here, because we only TrySend 1 request per
            // loop.  The ratio mismatch can result in starving of blocks, a
            // sudden burst of requests and responses, and repeat.
            // Consequently, it is better to split these routines rather than
            // coupling them as it's written here.  TODO uncouple from request
            // routine.

            // See if there are any blocks to sync.
            first, second := bcR.pool.PeekTwoBlocks()
            //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
            if first == nil || second == nil {
                // We need both to sync the first block.
                continue FOR_LOOP
            } else {
                // Try again quickly next loop.
                didProcessCh <- struct{}{}
            }

            firstParts := first.MakePartSet(types.BlockPartSizeBytes)
            firstPartsHeader := firstParts.Header()
            firstID := types.BlockID{first.Hash(), firstPartsHeader}
            // Finally, verify the first block using the second's commit
            // NOTE: we can probably make this more efficient, but note that calling
            // first.Hash() doesn't verify the tx contents, so MakePartSet() is
            // currently necessary.
            err := state.Validators.VerifyCommit(
                chainID, firstID, first.Height, second.LastCommit)
            if err != nil {
                bcR.Logger.Error("Error in validation", "err", err)
                peerID := bcR.pool.RedoRequest(first.Height)
                peer := bcR.Switch.Peers().Get(peerID)
                if peer != nil {
                    // NOTE: we've already removed the peer's request, but we
                    // still need to clean up the rest.
                    bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
                }
                peerID2 := bcR.pool.RedoRequest(second.Height)
                peer2 := bcR.Switch.Peers().Get(peerID2)
                if peer2 != nil && peer2 != peer {
                    // NOTE: we've already removed the peer's request, but we
                    // still need to clean up the rest.
                    bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err))
                }
                continue FOR_LOOP
            } else {
                bcR.pool.PopRequest()

                // TODO: batch saves so we dont persist to disk every block
                bcR.store.SaveBlock(first, firstParts, second.LastCommit)

                // TODO: same thing for app - but we would need a way to
                // get the hash without persisting the state
                var err error
                state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
                if err != nil {
                    // TODO This is bad, are we zombie?
                    cmn.PanicQ(fmt.Sprintf("Failed to process committed block (%d:%X): %v",
                        first.Height, first.Hash(), err))
                }
                blocksSynced++

                if blocksSynced%100 == 0 {
                    lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
                    bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
                        "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
                    lastHundred = time.Now()
                }
            }
            continue FOR_LOOP

        case <-bcR.Quit():
            break FOR_LOOP
        }
    }
}

创建了几个定时器,主要是为了方便在后面进行时间判断。
首先时如果有别的peer的请求发送过来了,就获取下具体的peer,然后发送过去请求的区块内容。
接下来时如果有peer出错了,就停止这个peer吧。这个超时时间是从peer开始下载block开始算起的,正常情况下如果一下在,立马就有回应的。
接下来就是我们看到的定时器的工作了,包括获取别的peer的区块高度,是否达到最高块。
然后接下来的case <-trySyncTicker.C写的很好玩,每10ms出发一下,处理接收到的区块。不过为什么分开,是为了处理其他情况下的区块吧。
处理块这块是最主要的内容。首先是检查从peer那获取的区块是否正确,如果不正确,就得把这个peer移除,然后重新下载。如果正确的话,没问题了,可以处理区块了。
这些都是上层的处理,我们得看下具体信息过来的处理。

func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
    msg, err := decodeMsg(msgBytes)
    if err != nil {
        bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
        bcR.Switch.StopPeerForError(src, err)
        return
    }

    if err = msg.ValidateBasic(); err != nil {
        bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
        bcR.Switch.StopPeerForError(src, err)
        return
    }

    bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)

    switch msg := msg.(type) {
    case *bcBlockRequestMessage:
        if queued := bcR.respondToPeer(msg, src); !queued {
            // Unfortunately not queued since the queue is full.
        }
    case *bcBlockResponseMessage:
        bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes))
    case *bcStatusRequestMessage:
        // Send peer our state.
        msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
        queued := src.TrySend(BlockchainChannel, msgBytes)
        if !queued {
            // sorry
        }
    case *bcStatusResponseMessage:
        // Got a peer status. Unverified.
        bcR.pool.SetPeerHeight(src.ID(), msg.Height)
    default:
        bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
    }
}

就像我们上面说的,给别人发送了消息,我们肯定也会收到消息,这里面就是接受到对消息的处理。
整体的流程就是首先获取别的peer的最新区块高度,然后为每一个区块高度分配一个peer,去找这个peer下载对应高度的区块。
然后每一个区块高度对应一个下载器,下载好了进行验证,然后存储,共识,结束。

golang 源码 tendermint

发表新评论