package bboltimport ()// txid represents the internal transaction identifier.type txid uint64// Tx represents a read-only or read/write transaction on the database.// Read-only transactions can be used for retrieving values for keys and creating cursors.// Read/write transactions can create and remove buckets and create and remove keys.//// IMPORTANT: You must commit or rollback transactions when you are done with// them. Pages can not be reclaimed by the writer until no more transactions// are using them. A long running read transaction can cause the database to// quickly grow.typeTxstruct { writable bool managed bool db *DB meta *meta root Bucket pages map[pgid]*page stats TxStats commitHandlers []func()// WriteFlag specifies the flag for write-related methods like WriteTo(). // Tx opens the database file with the specified flag to copy the data. // // By default, the flag is unset, which works well for mostly in-memory // workloads. For databases that are much larger than available RAM, // set the flag to syscall.O_DIRECT to avoid trashing the page cache. WriteFlag int}// init initializes the transaction.func ( *Tx) ( *DB) { .db = .pages = nil// Copy the meta page since it can be changed by the writer. .meta = &meta{} .meta().copy(.meta)// Copy over the root bucket. .root = newBucket() .root.bucket = &bucket{} *.root.bucket = .meta.root// Increment the transaction id and add a page cache for writable transactions.if .writable { .pages = make(map[pgid]*page) .meta.txid += txid(1) }}// ID returns the transaction id.func ( *Tx) () int {returnint(.meta.txid)}// DB returns a reference to the database that created the transaction.func ( *Tx) () *DB {return .db}// Size returns current database size in bytes as seen by this transaction.func ( *Tx) () int64 {returnint64(.meta.pgid) * int64(.db.pageSize)}// Writable returns whether the transaction can perform write operations.func ( *Tx) () bool {return .writable}// Cursor creates a cursor associated with the root bucket.// All items in the cursor will return a nil value because all root bucket keys point to buckets.// The cursor is only valid as long as the transaction is open.// Do not use a cursor after the transaction is closed.func ( *Tx) () *Cursor {return .root.Cursor()}// Stats retrieves a copy of the current transaction statistics.func ( *Tx) () TxStats {return .stats}// Bucket retrieves a bucket by name.// Returns nil if the bucket does not exist.// The bucket instance is only valid for the lifetime of the transaction.func ( *Tx) ( []byte) *Bucket {return .root.Bucket()}// CreateBucket creates a new bucket.// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.// The bucket instance is only valid for the lifetime of the transaction.func ( *Tx) ( []byte) (*Bucket, error) {return .root.CreateBucket()}// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.// Returns an error if the bucket name is blank, or if the bucket name is too long.// The bucket instance is only valid for the lifetime of the transaction.func ( *Tx) ( []byte) (*Bucket, error) {return .root.CreateBucketIfNotExists()}// DeleteBucket deletes a bucket.// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.func ( *Tx) ( []byte) error {return .root.DeleteBucket()}// ForEach executes a function for each bucket in the root.// If the provided function returns an error then the iteration is stopped and// the error is returned to the caller.func ( *Tx) ( func( []byte, *Bucket) error) error {return .root.ForEach(func(, []byte) error {return (, .root.Bucket()) })}// OnCommit adds a handler function to be executed after the transaction successfully commits.func ( *Tx) ( func()) { .commitHandlers = append(.commitHandlers, )}// Commit writes all changes to disk and updates the meta page.// Returns an error if a disk write error occurs, or if Commit is// called on a read-only transaction.func ( *Tx) () error {_assert(!.managed, "managed tx commit not allowed")if .db == nil {returnErrTxClosed } elseif !.writable {returnErrTxNotWritable }// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.// Rebalance nodes which have had deletions.var = time.Now() .root.rebalance()if .stats.Rebalance > 0 { .stats.RebalanceTime += time.Since() }// spill data onto dirty pages. = time.Now()if := .root.spill(); != nil { .rollback()return } .stats.SpillTime += time.Since()// Free the old root bucket. .meta.root.root = .root.root// Free the old freelist because commit writes out a fresh freelist.if .meta.freelist != pgidNoFreelist { .db.freelist.free(.meta.txid, .db.page(.meta.freelist)) }if !.db.NoFreelistSync { := .commitFreelist()if != nil {return } } else { .meta.freelist = pgidNoFreelist }// Write dirty pages to disk. = time.Now()if := .write(); != nil { .rollback()return }// If strict mode is enabled then perform a consistency check.if .db.StrictMode { := .Check()var []stringfor { , := <-if ! {break } = append(, .Error()) }iflen() > 0 {panic("check fail: " + strings.Join(, "\n")) } }// Write meta to disk.if := .writeMeta(); != nil { .rollback()return } .stats.WriteTime += time.Since()// Finalize the transaction. .close()// Execute commit handlers now that the locks have been removed.for , := range .commitHandlers { () }returnnil}func ( *Tx) () error {// Allocate new pages for the new free list. This will overestimate // the size of the freelist but not underestimate the size (which would be bad). := .meta.pgid , := .allocate((.db.freelist.size() / .db.pageSize) + 1)if != nil { .rollback()return }if := .db.freelist.write(); != nil { .rollback()return } .meta.freelist = .id// If the high water mark has moved up then attempt to grow the database.if .meta.pgid > {if := .db.grow(int(.meta.pgid+1) * .db.pageSize); != nil { .rollback()return } }returnnil}// Rollback closes the transaction and ignores all previous updates. Read-only// transactions must be rolled back and not committed.func ( *Tx) () error {_assert(!.managed, "managed tx rollback not allowed")if .db == nil {returnErrTxClosed } .nonPhysicalRollback()returnnil}// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.func ( *Tx) () {if .db == nil {return }if .writable { .db.freelist.rollback(.meta.txid) } .close()}// rollback needs to reload the free pages from disk in case some system error happens like fsync error.func ( *Tx) () {if .db == nil {return }if .writable { .db.freelist.rollback(.meta.txid)if !.db.hasSyncedFreelist() {// Reconstruct free page list by scanning the DB to get the whole free page list. // Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode. .db.freelist.noSyncReload(.db.freepages()) } else {// Read free page list from freelist page. .db.freelist.reload(.db.page(.db.meta().freelist)) } } .close()}func ( *Tx) () {if .db == nil {return }if .writable {// Grab freelist stats.var = .db.freelist.free_count()var = .db.freelist.pending_count()var = .db.freelist.size()// Remove transaction ref & writer lock. .db.rwtx = nil .db.rwlock.Unlock()// Merge statistics. .db.statlock.Lock() .db.stats.FreePageN = .db.stats.PendingPageN = .db.stats.FreeAlloc = ( + ) * .db.pageSize .db.stats.FreelistInuse = .db.stats.TxStats.add(&.stats) .db.statlock.Unlock() } else { .db.removeTx() }// Clear all references. .db = nil .meta = nil .root = Bucket{tx: } .pages = nil}// Copy writes the entire database to a writer.// This function exists for backwards compatibility.//// Deprecated; Use WriteTo() instead.func ( *Tx) ( io.Writer) error { , := .WriteTo()return}// WriteTo writes the entire database to a writer.// If err == nil then exactly tx.Size() bytes will be written into the writer.func ( *Tx) ( io.Writer) ( int64, error) {// Attempt to open reader with WriteFlag , := .db.openFile(.db.path, os.O_RDONLY|.WriteFlag, 0)if != nil {return0, }deferfunc() {if := .Close(); == nil { = } }()// Generate a meta page. We use the same page data for both meta pages. := make([]byte, .db.pageSize) := (*page)(unsafe.Pointer(&[0])) .flags = metaPageFlag *.meta() = *.meta// Write meta 0. .id = 0 .meta().checksum = .meta().sum64() , := .Write() += int64()if != nil {return , fmt.Errorf("meta 0 copy: %s", ) }// Write meta 1 with a lower transaction id. .id = 1 .meta().txid -= 1 .meta().checksum = .meta().sum64() , = .Write() += int64()if != nil {return , fmt.Errorf("meta 1 copy: %s", ) }// Move past the meta pages in the file.if , := .Seek(int64(.db.pageSize*2), io.SeekStart); != nil {return , fmt.Errorf("seek: %s", ) }// Copy data pages. , := io.CopyN(, , .Size()-int64(.db.pageSize*2)) += if != nil {return , }return , nil}// CopyFile copies the entire database to file at the given path.// A reader transaction is maintained during the copy so it is safe to continue// using the database while a copy is in progress.func ( *Tx) ( string, os.FileMode) error { , := .db.openFile(, os.O_RDWR|os.O_CREATE|os.O_TRUNC, )if != nil {return } _, = .WriteTo()if != nil { _ = .Close()return }return .Close()}// Check performs several consistency checks on the database for this transaction.// An error is returned if any inconsistency is found.//// It can be safely run concurrently on a writable transaction. However, this// incurs a high cost for large databases and databases with a lot of subbuckets// because of caching. This overhead can be removed if running on a read-only// transaction, however, it is not safe to execute other writer transactions at// the same time.func ( *Tx) () <-chanerror { := make(chanerror)go .check()return}func ( *Tx) ( chanerror) {// Force loading free list if opened in ReadOnly mode. .db.loadFreelist()// Check if any pages are double freed. := make(map[pgid]bool) := make([]pgid, .db.freelist.count()) .db.freelist.copyall()for , := range {if [] { <- fmt.Errorf("page %d: already freed", ) } [] = true }// Track every reachable page. := make(map[pgid]*page) [0] = .page(0) // meta0 [1] = .page(1) // meta1if .meta.freelist != pgidNoFreelist {for := uint32(0); <= .page(.meta.freelist).overflow; ++ { [.meta.freelist+pgid()] = .page(.meta.freelist) } }// Recursively check buckets. .checkBucket(&.root, , , )// Ensure all pages below high water mark are either reachable or freed.for := pgid(0); < .meta.pgid; ++ { , := []if ! && ![] { <- fmt.Errorf("page %d: unreachable unfreed", int()) } }// Close the channel to signal completion.close()}func ( *Tx) ( *Bucket, map[pgid]*page, map[pgid]bool, chanerror) {// Ignore inline buckets.if .root == 0 {return }// Check every page used by this bucket. .tx.forEachPage(.root, 0, func( *page, int) {if .id > .meta.pgid { <- fmt.Errorf("page %d: out of bounds: %d", int(.id), int(.tx.meta.pgid)) }// Ensure each page is only referenced once.for := pgid(0); <= pgid(.overflow); ++ {var = .id + if , := []; { <- fmt.Errorf("page %d: multiple references", int()) } [] = }// We should only encounter un-freed leaf and branch pages.if [.id] { <- fmt.Errorf("page %d: reachable freed", int(.id)) } elseif (.flags&branchPageFlag) == 0 && (.flags&leafPageFlag) == 0 { <- fmt.Errorf("page %d: invalid type: %s", int(.id), .typ()) } })// Check each bucket within this bucket. _ = .ForEach(func(, []byte) error {if := .Bucket(); != nil { .(, , , ) }returnnil })}// allocate returns a contiguous block of memory starting at a given page.func ( *Tx) ( int) (*page, error) { , := .db.allocate(.meta.txid, )if != nil {returnnil, }// Save to our page cache. .pages[.id] = // Update statistics. .stats.PageCount += .stats.PageAlloc += * .db.pageSizereturn , nil}// write writes any dirty pages to disk.func ( *Tx) () error {// Sort pages by id. := make(pages, 0, len(.pages))for , := range .pages { = append(, ) }// Clear out page cache early. .pages = make(map[pgid]*page)sort.Sort()// Write pages to disk in order.for , := range { := (uint64(.overflow) + 1) * uint64(.db.pageSize) := int64(.id) * int64(.db.pageSize)varuintptr// Write out page in "max allocation" sized chunks.for { := if > maxAllocSize-1 { = maxAllocSize - 1 } := unsafeByteSlice(unsafe.Pointer(), , 0, int())if , := .db.ops.writeAt(, ); != nil {return }// Update statistics. .stats.Write++// Exit inner for loop if we've written all the chunks. -= if == 0 {break }// Otherwise move offset forward and move pointer to next chunk. += int64() += uintptr() } }// Ignore file sync if flag is set on DB.if !.db.NoSync || IgnoreNoSync {if := fdatasync(.db); != nil {return } }// Put small pages back to page pool.for , := range {// Ignore page sizes over 1 page. // These are allocated using make() instead of the page pool.ifint(.overflow) != 0 {continue } := unsafeByteSlice(unsafe.Pointer(), 0, 0, .db.pageSize)// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1for := range { [] = 0 } .db.pagePool.Put() }returnnil}// writeMeta writes the meta to the disk.func ( *Tx) () error {// Create a temporary buffer for the meta page. := make([]byte, .db.pageSize) := .db.pageInBuffer(, 0) .meta.write()// Write the meta page to file.if , := .db.ops.writeAt(, int64(.id)*int64(.db.pageSize)); != nil {return }if !.db.NoSync || IgnoreNoSync {if := fdatasync(.db); != nil {return } }// Update statistics. .stats.Write++returnnil}// page returns a reference to the page with a given id.// If page has been written to then a temporary buffered page is returned.func ( *Tx) ( pgid) *page {// Check the dirty pages first.if .pages != nil {if , := .pages[]; {return } }// Otherwise return directly from the mmap.return .db.page()}// forEachPage iterates over every page within a given page and executes a function.func ( *Tx) ( pgid, int, func(*page, int)) { := .page()// Execute function. (, )// Recursively loop over children.if (.flags & branchPageFlag) != 0 {for := 0; < int(.count); ++ { := .branchPageElement(uint16()) .(.pgid, +1, ) } }}// Page returns page information for a given page number.// This is only safe for concurrent use when used by a writable transaction.func ( *Tx) ( int) (*PageInfo, error) {if .db == nil {returnnil, ErrTxClosed } elseifpgid() >= .meta.pgid {returnnil, nil }// Build the page info. := .db.page(pgid()) := &PageInfo{ID: ,Count: int(.count),OverflowCount: int(.overflow), }// Determine the type (or if it's free).if .db.freelist.freed(pgid()) { .Type = "free" } else { .Type = .typ() }return , nil}// TxStats represents statistics about the actions performed by the transaction.typeTxStatsstruct {// Page statistics. PageCount int// number of page allocations PageAlloc int// total bytes allocated// Cursor statistics. CursorCount int// number of cursors created// Node statistics NodeCount int// number of node allocations NodeDeref int// number of node dereferences// Rebalance statistics. Rebalance int// number of node rebalances RebalanceTime time.Duration// total time spent rebalancing// Split/Spill statistics. Split int// number of nodes split Spill int// number of nodes spilled SpillTime time.Duration// total time spent spilling// Write statistics. Write int// number of writes performed WriteTime time.Duration// total time spent writing to disk}func ( *TxStats) ( *TxStats) { .PageCount += .PageCount .PageAlloc += .PageAlloc .CursorCount += .CursorCount .NodeCount += .NodeCount .NodeDeref += .NodeDeref .Rebalance += .Rebalance .RebalanceTime += .RebalanceTime .Split += .Split .Spill += .Spill .SpillTime += .SpillTime .Write += .Write .WriteTime += .WriteTime}// Sub calculates and returns the difference between two sets of transaction stats.// This is useful when obtaining stats at two different points and time and// you need the performance counters that occurred within that time span.func ( *TxStats) ( *TxStats) TxStats {varTxStats .PageCount = .PageCount - .PageCount .PageAlloc = .PageAlloc - .PageAlloc .CursorCount = .CursorCount - .CursorCount .NodeCount = .NodeCount - .NodeCount .NodeDeref = .NodeDeref - .NodeDeref .Rebalance = .Rebalance - .Rebalance .RebalanceTime = .RebalanceTime - .RebalanceTime .Split = .Split - .Split .Spill = .Spill - .Spill .SpillTime = .SpillTime - .SpillTime .Write = .Write - .Write .WriteTime = .WriteTime - .WriteTimereturn}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.