我们看完了peer,看点稍微简单点的mempool实现吧。
这个和我们写的abci程序有直接的关联,因此,我们需要仔细阅读下。

废话不多说,我们先看下Mempool的定义

type Mempool struct {
    config *cfg.MempoolConfig

    proxyMtx             sync.Mutex
    proxyAppConn         proxy.AppConnMempool
    txs                  *clist.CList    // concurrent linked-list of good txs
    height               int64           // the last block Update()'d to
    rechecking           int32           // for re-checking filtered txs on Update()
    recheckCursor        *clist.CElement // next expected response
    recheckEnd           *clist.CElement // re-checking stops here
    notifiedTxsAvailable bool
    txsAvailable         chan struct{} // fires once for each height, when the mempool is not empty
    preCheck             PreCheckFunc
    postCheck            PostCheckFunc

    // Keep a cache of already-seen txs.
    // This reduces the pressure on the proxyApp.
    cache txCache

    // A log of mempool txs
    wal *auto.AutoFile

    logger log.Logger

    metrics *Metrics
}

就是一堆和和交易相关的内容,还有一些和上层通信所需要的接口。
我们看下怎么定义的

func NewMempool(
    config *cfg.MempoolConfig,
    proxyAppConn proxy.AppConnMempool,
    height int64,
    options ...MempoolOption,
) *Mempool {
    mempool := &Mempool{
        config:        config,
        proxyAppConn:  proxyAppConn,
        txs:           clist.New(),
        height:        height,
        rechecking:    0,
        recheckCursor: nil,
        recheckEnd:    nil,
        logger:        log.NewNopLogger(),
        metrics:       NopMetrics(),
    }
    if config.CacheSize > 0 {
        mempool.cache = newMapTxCache(config.CacheSize)
    } else {
        mempool.cache = nopTxCache{}
    }
    proxyAppConn.SetResponseCallback(mempool.resCb)
    for _, option := range options {
        option(mempool)
    }
    return mempool
}

里面有一个需要注意的,就是为proxyAppConn设置了一个回调。
因为我们知道,内存池把一个交易提交给我们的abci程序之后,根据返回的结果来决定下一步如何处理,所以这个地方定义了mempool.resCb
我们看一下时怎么从内存池中取出来一个交易的。为了方便起见,不妨假设是按照交易笔数取出来的

func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
    mem.proxyMtx.Lock()
    defer mem.proxyMtx.Unlock()

    if max < 0 {
        max = mem.txs.Len()
    }

    for atomic.LoadInt32(&mem.rechecking) > 0 {
        // TODO: Something better?
        time.Sleep(time.Millisecond * 10)
    }

    txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
    for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
        memTx := e.Value.(*mempoolTx)
        txs = append(txs, memTx.tx)
    }
    return txs
}

主要流程不复杂,就是从tx列表中按照要求的数目来读取交易,按照用户要求的数目读取出来,然后返回回去。
那怎么更新的呢?

func (mem *Mempool) Update(
    height int64,
    txs types.Txs,
    preCheck PreCheckFunc,
    postCheck PostCheckFunc,
) error {
    // Set height
    mem.height = height
    mem.notifiedTxsAvailable = false

    if preCheck != nil {
        mem.preCheck = preCheck
    }
    if postCheck != nil {
        mem.postCheck = postCheck
    }

    // Add committed transactions to cache (if missing).
    for _, tx := range txs {
        _ = mem.cache.Push(tx)
    }

    // Remove committed transactions.
    txsLeft := mem.removeTxs(txs)

    // Either recheck non-committed txs to see if they became invalid
    // or just notify there're some txs left.
    if len(txsLeft) > 0 {
        if mem.config.Recheck {
            mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
            mem.recheckTxs(txsLeft)
            // At this point, mem.txs are being rechecked.
            // mem.recheckCursor re-scans mem.txs and possibly removes some txs.
            // Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
        } else {
            mem.notifyTxsAvailable()
        }
    }

    // Update metrics
    mem.metrics.Size.Set(float64(mem.Size()))

    return nil
}

重点就是txsLeft := mem.removeTxs(txs)会把已经提交的tx给移除出去,然后其他的就都无所谓了。
接下来再看看如何检查交易的,换句话说,如何把底层的交易传递给abci程序的

func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
    mem.proxyMtx.Lock()
    // use defer to unlock mutex because application (*local client*) might panic
    defer mem.proxyMtx.Unlock()

    if mem.Size() >= mem.config.Size {
        return ErrMempoolIsFull
    }

    if mem.preCheck != nil {
        if err := mem.preCheck(tx); err != nil {
            return ErrPreCheck{err}
        }
    }

    // CACHE
    if !mem.cache.Push(tx) {
        return ErrTxInCache
    }
    // END CACHE

    // WAL
    if mem.wal != nil {
        // TODO: Notify administrators when WAL fails
        _, err := mem.wal.Write([]byte(tx))
        if err != nil {
            mem.logger.Error("Error writing to WAL", "err", err)
        }
        _, err = mem.wal.Write([]byte("\n"))
        if err != nil {
            mem.logger.Error("Error writing to WAL", "err", err)
        }
    }
    // END WAL

    // NOTE: proxyAppConn may error if tx buffer is full
    if err = mem.proxyAppConn.Error(); err != nil {
        return err
    }
    reqRes := mem.proxyAppConn.CheckTxAsync(tx)
    if cb != nil {
        reqRes.SetCallback(cb)
    }

    return nil
}

可以看到,最后的方法就是reqRes := mem.proxyAppConn.CheckTxAsync(tx)来调用上层的内容,获取处理结果。
那么checkTxAsync具体怎么工作的呢?点进去我们发现proxyAppConn是一个接口?再同一个文件中查找,我们会发现只有一个实现了这个接口,就是appConnMempool,具体的定义如下

type appConnMempool struct {
    appConn abcicli.Client
}

然后往下看程序继承实现

func (app *appConnMempool) CheckTxAsync(tx []byte) *abcicli.ReqRes {
    return app.appConn.CheckTxAsync(tx)
}

再进去一看,又是接口实现。根据我们对abci的了解,有三种实现,socket,rpc以及直接golang交互,我们为了简单,看直接和golang交互的。
localClient的实现如下

type localClient struct {
    cmn.BaseService

    mtx *sync.Mutex
    types.Application
    Callback
}

里面的types.Application就是我们自己的abci程序,看下checkTxAsync,可以看到

func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
    app.mtx.Lock()
    defer app.mtx.Unlock()

    res := app.Application.CheckTx(tx)
    return app.callback(
        types.ToRequestCheckTx(tx),
        types.ToResponseCheckTx(res),
    )
}

最后面return了一下回调,因为我们之前看到代码是

proxyAppConn.SetResponseCallback(mempool.resCb)

这个return就是使得回调产生了作用,那我们再看看resCb。

func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
    if mem.recheckCursor == nil {
        mem.resCbNormal(req, res)
    } else {
        mem.metrics.RecheckTimes.Add(1)
        mem.resCbRecheck(req, res)
    }
    mem.metrics.Size.Set(float64(mem.Size()))
}

正常情况下,我们应该去追踪Normal的情况

func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
    switch r := res.Value.(type) {
    case *abci.Response_CheckTx:
        tx := req.GetCheckTx().Tx
        var postCheckErr error
        if mem.postCheck != nil {
            postCheckErr = mem.postCheck(tx, r.CheckTx)
        }
        if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
            memTx := &mempoolTx{
                height:    mem.height,
                gasWanted: r.CheckTx.GasWanted,
                tx:        tx,
            }
            mem.txs.PushBack(memTx)
            mem.logger.Info("Added good transaction",
                "tx", TxID(tx),
                "res", r,
                "height", memTx.height,
                "total", mem.Size(),
            )
            mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
            mem.notifyTxsAvailable()
        } else {
            // ignore bad transaction
            mem.logger.Info("Rejected bad transaction", "tx", TxID(tx), "res", r, "err", postCheckErr)
            mem.metrics.FailedTxs.Add(1)
            // remove from cache (it might be good later)
            mem.cache.Remove(tx)
        }
    default:
        // ignore other messages
    }
}

看这段代码,我们就可以看到如果abci程序返回的结果正确,就可以添加到内存池中了。
可是问题来了,我们刚刚都是在考虑checkTx是如何执行的,那么是谁调用的呢?Reactor!

type MempoolReactor struct {
    p2p.BaseReactor
    config  *cfg.MempoolConfig
    Mempool *Mempool
}

类型定义很简单。
正常情况下得有OnStart函数,我们点进去一看,很简单,就不看了。
再看一下Receive函数

func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
    msg, err := decodeMsg(msgBytes)
    if err != nil {
        memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
        memR.Switch.StopPeerForError(src, err)
        return
    }
    memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)

    switch msg := msg.(type) {
    case *TxMessage:
        err := memR.Mempool.CheckTx(msg.Tx, nil)
        if err != nil {
            memR.Logger.Info("Could not check tx", "tx", TxID(msg.Tx), "err", err)
        }
        // broadcasting happens from go routines per peer
    default:
        memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
    }
}

也不难理解,就是把接收到的消息添加到内存池当中,然后进行checkTx。
那么如何给别人发呢?就是在AddPeer中。

func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
    go memR.broadcastTxRoutine(peer)
}

看下启动的协程是如何工作的。

func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
    if !memR.config.Broadcast {
        return
    }

    var next *clist.CElement
    for {
        // This happens because the CElement we were looking at got garbage
        // collected (removed). That is, .NextWait() returned nil. Go ahead and
        // start from the beginning.
        if next == nil {
            select {
            case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available
                if next = memR.Mempool.TxsFront(); next == nil {
                    continue
                }
            case <-peer.Quit():
                return
            case <-memR.Quit():
                return
            }
        }

        memTx := next.Value.(*mempoolTx)

        // make sure the peer is up to date
        peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
        if !ok {
            // Peer does not have a state yet. We set it in the consensus reactor, but
            // when we add peer in Switch, the order we call reactors#AddPeer is
            // different every time due to us using a map. Sometimes other reactors
            // will be initialized before the consensus reactor. We should wait a few
            // milliseconds and retry.
            time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
            continue
        }
        if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block
            time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
            continue
        }

        // send memTx
        msg := &TxMessage{Tx: memTx.tx}
        success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
        if !success {
            time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
            continue
        }

        select {
        case <-next.NextWaitChan():
            // see the start of the for loop for nil check
            next = next.Next()
        case <-peer.Quit():
            return
        case <-memR.Quit():
            return
        }
    }
}

还行,还可以看懂,就是看下peer的状态,然后封装mempool的交易,发送给peer。
到这块,内存池的内容就结束了,不难理解,这个内存池只是一个存储区,对交易的内容不做任何处理,来了新的交易就取出来发给上层abci程序,如果通过就放到内存池,然后等待共识层接受,给出去。所以是一笔一笔的交易的转移。

区块链 golang 源码 tendermint

发表新评论