我们在前面的文章中创建了一个数据库,接下来就要对数据库进行操作了。

指令

为了方便后续的操作,我们沿着以下程序进行阅读

db.Update(func(tx *bolt.Tx) error {
    b, err := tx.CreateBucket([]byte("MyBucket"))
    if err != nil {
        return fmt.Errorf("create bucket: %s", err)
    }
    return nil
})

可以看到Update的参数是一个匿名函数,我们从Update开始。

Update

func (db *DB) Update(fn func(*Tx) error) error {
    t, err := db.Begin(true)
    if err != nil {
        return err
    }

    // Make sure the transaction rolls back in the event of a panic.
    defer func() {
        if t.db != nil {
            t.rollback()
        }
    }()

    // Mark as a managed tx so that the inner function cannot manually commit.
    t.managed = true

    // If an error is returned from the function then rollback and return error.
    err = fn(t)
    t.managed = false
    if err != nil {
        _ = t.Rollback()
        return err
    }

    return t.Commit()
}

第一步开始一个事务,我们看下事务是如何开始的。

Begin
func (db *DB) Begin(writable bool) (*Tx, error) {
    if writable {
        return db.beginRWTx()
    }
    return db.beginTx()
}

根据我们之前执行的命令,我们是需要开一个可读写的事务。

func (db *DB) beginRWTx() (*Tx, error) {
    // If the database was opened with Options.ReadOnly, return an error.
    if db.readOnly {
        return nil, ErrDatabaseReadOnly
    }

    // Obtain writer lock. This is released by the transaction when it closes.
    // This enforces only one writer transaction at a time.
    db.rwlock.Lock()

    // Once we have the writer lock then we can lock the meta pages so that
    // we can set up the transaction.
    db.metalock.Lock()
    defer db.metalock.Unlock()

    // Exit if the database is not open yet.
    if !db.opened {
        db.rwlock.Unlock()
        return nil, ErrDatabaseNotOpen
    }

    // Create a transaction associated with the database.
    t := &Tx{writable: true}
    t.init(db)
    db.rwtx = t

    // Free any pages associated with closed read-only transactions.
    var minid txid = 0xFFFFFFFFFFFFFFFF
    for _, t := range db.txs {
        if t.meta.txid < minid {
            minid = t.meta.txid
        }
    }
    if minid > 0 {
        db.freelist.release(minid - 1)
    }

    return t, nil
}

根据bolt的介绍,我们可以知道,bolt允许多个读操作,但在同一时刻,只能有一个写操作。
所以在程序开始之前,需要加上读写锁。
除此之外,我们还记得,在metapages中记录着交易序号txid,所以也需要对meta页面加锁,防止更新出错。
综合来说,这些都是为了保证在一个时间只有一个写交易。
(注:交易,事务,都是transaction的翻译,这里面我就混用了)
接下来就是创建交易,初始化交易,并且释放读交易所占用的页面。
我们需要看一下交易的定义了。

Tx

交易的定义比较简单

type Tx struct {
    writable       bool
    managed        bool
    db             *DB
    meta           *meta
    root           Bucket
    pages          map[pgid]*page
    stats          TxStats
    commitHandlers []func()

    WriteFlag int
}

writable是用来表明当前的tx是否可读,在我们这,tx是可读的。
managed表示当前这个tx是否通过db.Update()或者db.View()进行数据库操作。
db就是当前的db对象。
meta就是我们之前db中存储的meta信息。
root就是指的根bucket,所有的tx都要从根开始查起。
pages当前tx操作的page
stats统计tx的操作内容
commitHandler回调函数,在commit的时候执行的函数
WriteFlag复制或者剪切数据库的时候文件的打开方式
明白了交易的内容,我们需要看下交易的初始化操作。

func (tx *Tx) init(db *DB) {
    tx.db = db
    tx.pages = nil

    // Copy the meta page since it can be changed by the writer.
    tx.meta = &meta{}
    db.meta().copy(tx.meta)

    // Copy over the root bucket.
    tx.root = newBucket(tx)
    tx.root.bucket = &bucket{}
    *tx.root.bucket = tx.meta.root

    // Increment the transaction id and add a page cache for writable transactions.
    if tx.writable {
        tx.pages = make(map[pgid]*page)
        tx.meta.txid += txid(1)
    }
}

我们可以看到,tx首先初始化了db,并创建了空的pages。接下来把db中存储的meta复制给交易。
在最开始的时候,我们创建了两个meta,所以,复制的是哪一个呢?

func (db *DB) meta() *meta {
    // We have to return the meta with the highest txid which doesn't fail
    // validation. Otherwise, we can cause errors when in fact the database is
    // in a consistent state. metaA is the one with the higher txid.
    metaA := db.meta0
    metaB := db.meta1
    if db.meta1.txid > db.meta0.txid {
        metaA = db.meta1
        metaB = db.meta0
    }

    // Use higher meta page if valid. Otherwise fallback to previous, if valid.
    if err := metaA.validate(); err == nil {
        return metaA
    } else if err := metaB.validate(); err == nil {
        return metaB
    }

    // This should never be reached, because both meta1 and meta0 were validated
    // on mmap() and we do fsync() on every write.
    panic("bolt.DB.meta(): invalid meta pages")
}

可以看到,谁存储的txid大,就用哪个meta。我们还注意到,传递的是指针形式的meta,也就是meta信息会随着更新,这个tx中的meta也会更新。
接下来,为tx的root创建了一个包含本交易的bucket,并且是根bucket。此外,我们在meta中也记录了根bucket,这个地方我们使用meta中的bucket初始化了这个地方创建的bucket。
最后由于bolt是使用txid来标识数据库的状态信息,如果是可读写交易,就将txid加一,标识有了新的交易,随后会在commit中更新meta的txid。

执行

我们目前执行完了bolt的Update中的begin操作,后续还有一些容易理解的操作。
首先程序设置defer,如果执行出错,就进行rollback,否则执行我们手动传入的函数,并执行commit。
在这个地方有个有意思的内容

t.managed=true

正好对应了我们介绍tx的时候,对managed关键字的介绍。在使用update的时候,手动无法进行提交,因为已经将程序托管给update了。

golang boltdb 源码

发表新评论