* Fix bug on migration 111 * Upgrade bleve to 1.0.10 Co-authored-by: zeripath <art27@cantab.net> Co-authored-by: techknowlogick <techknowlogick@gitea.io>
		
			
				
	
	
		
			724 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			724 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
package bbolt
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
	"unsafe"
 | 
						|
)
 | 
						|
 | 
						|
// txid represents the internal transaction identifier.
 | 
						|
type txid uint64
 | 
						|
 | 
						|
// Tx represents a read-only or read/write transaction on the database.
 | 
						|
// Read-only transactions can be used for retrieving values for keys and creating cursors.
 | 
						|
// Read/write transactions can create and remove buckets and create and remove keys.
 | 
						|
//
 | 
						|
// IMPORTANT: You must commit or rollback transactions when you are done with
 | 
						|
// them. Pages can not be reclaimed by the writer until no more transactions
 | 
						|
// are using them. A long running read transaction can cause the database to
 | 
						|
// quickly grow.
 | 
						|
type Tx struct {
 | 
						|
	writable       bool
 | 
						|
	managed        bool
 | 
						|
	db             *DB
 | 
						|
	meta           *meta
 | 
						|
	root           Bucket
 | 
						|
	pages          map[pgid]*page
 | 
						|
	stats          TxStats
 | 
						|
	commitHandlers []func()
 | 
						|
 | 
						|
	// WriteFlag specifies the flag for write-related methods like WriteTo().
 | 
						|
	// Tx opens the database file with the specified flag to copy the data.
 | 
						|
	//
 | 
						|
	// By default, the flag is unset, which works well for mostly in-memory
 | 
						|
	// workloads. For databases that are much larger than available RAM,
 | 
						|
	// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
 | 
						|
	WriteFlag int
 | 
						|
}
 | 
						|
 | 
						|
// init initializes the transaction.
 | 
						|
func (tx *Tx) init(db *DB) {
 | 
						|
	tx.db = db
 | 
						|
	tx.pages = nil
 | 
						|
 | 
						|
	// Copy the meta page since it can be changed by the writer.
 | 
						|
	tx.meta = &meta{}
 | 
						|
	db.meta().copy(tx.meta)
 | 
						|
 | 
						|
	// Copy over the root bucket.
 | 
						|
	tx.root = newBucket(tx)
 | 
						|
	tx.root.bucket = &bucket{}
 | 
						|
	*tx.root.bucket = tx.meta.root
 | 
						|
 | 
						|
	// Increment the transaction id and add a page cache for writable transactions.
 | 
						|
	if tx.writable {
 | 
						|
		tx.pages = make(map[pgid]*page)
 | 
						|
		tx.meta.txid += txid(1)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ID returns the transaction id.
 | 
						|
func (tx *Tx) ID() int {
 | 
						|
	return int(tx.meta.txid)
 | 
						|
}
 | 
						|
 | 
						|
// DB returns a reference to the database that created the transaction.
 | 
						|
func (tx *Tx) DB() *DB {
 | 
						|
	return tx.db
 | 
						|
}
 | 
						|
 | 
						|
// Size returns current database size in bytes as seen by this transaction.
 | 
						|
func (tx *Tx) Size() int64 {
 | 
						|
	return int64(tx.meta.pgid) * int64(tx.db.pageSize)
 | 
						|
}
 | 
						|
 | 
						|
// Writable returns whether the transaction can perform write operations.
 | 
						|
func (tx *Tx) Writable() bool {
 | 
						|
	return tx.writable
 | 
						|
}
 | 
						|
 | 
						|
// Cursor creates a cursor associated with the root bucket.
 | 
						|
// All items in the cursor will return a nil value because all root bucket keys point to buckets.
 | 
						|
// The cursor is only valid as long as the transaction is open.
 | 
						|
// Do not use a cursor after the transaction is closed.
 | 
						|
func (tx *Tx) Cursor() *Cursor {
 | 
						|
	return tx.root.Cursor()
 | 
						|
}
 | 
						|
 | 
						|
// Stats retrieves a copy of the current transaction statistics.
 | 
						|
func (tx *Tx) Stats() TxStats {
 | 
						|
	return tx.stats
 | 
						|
}
 | 
						|
 | 
						|
// Bucket retrieves a bucket by name.
 | 
						|
// Returns nil if the bucket does not exist.
 | 
						|
// The bucket instance is only valid for the lifetime of the transaction.
 | 
						|
func (tx *Tx) Bucket(name []byte) *Bucket {
 | 
						|
	return tx.root.Bucket(name)
 | 
						|
}
 | 
						|
 | 
						|
// CreateBucket creates a new bucket.
 | 
						|
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
 | 
						|
// The bucket instance is only valid for the lifetime of the transaction.
 | 
						|
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
 | 
						|
	return tx.root.CreateBucket(name)
 | 
						|
}
 | 
						|
 | 
						|
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
 | 
						|
// Returns an error if the bucket name is blank, or if the bucket name is too long.
 | 
						|
// The bucket instance is only valid for the lifetime of the transaction.
 | 
						|
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
 | 
						|
	return tx.root.CreateBucketIfNotExists(name)
 | 
						|
}
 | 
						|
 | 
						|
// DeleteBucket deletes a bucket.
 | 
						|
// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
 | 
						|
func (tx *Tx) DeleteBucket(name []byte) error {
 | 
						|
	return tx.root.DeleteBucket(name)
 | 
						|
}
 | 
						|
 | 
						|
// ForEach executes a function for each bucket in the root.
 | 
						|
// If the provided function returns an error then the iteration is stopped and
 | 
						|
// the error is returned to the caller.
 | 
						|
func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
 | 
						|
	return tx.root.ForEach(func(k, v []byte) error {
 | 
						|
		return fn(k, tx.root.Bucket(k))
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// OnCommit adds a handler function to be executed after the transaction successfully commits.
 | 
						|
func (tx *Tx) OnCommit(fn func()) {
 | 
						|
	tx.commitHandlers = append(tx.commitHandlers, fn)
 | 
						|
}
 | 
						|
 | 
						|
// Commit writes all changes to disk and updates the meta page.
 | 
						|
// Returns an error if a disk write error occurs, or if Commit is
 | 
						|
// called on a read-only transaction.
 | 
						|
func (tx *Tx) Commit() error {
 | 
						|
	_assert(!tx.managed, "managed tx commit not allowed")
 | 
						|
	if tx.db == nil {
 | 
						|
		return ErrTxClosed
 | 
						|
	} else if !tx.writable {
 | 
						|
		return ErrTxNotWritable
 | 
						|
	}
 | 
						|
 | 
						|
	// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
 | 
						|
 | 
						|
	// Rebalance nodes which have had deletions.
 | 
						|
	var startTime = time.Now()
 | 
						|
	tx.root.rebalance()
 | 
						|
	if tx.stats.Rebalance > 0 {
 | 
						|
		tx.stats.RebalanceTime += time.Since(startTime)
 | 
						|
	}
 | 
						|
 | 
						|
	// spill data onto dirty pages.
 | 
						|
	startTime = time.Now()
 | 
						|
	if err := tx.root.spill(); err != nil {
 | 
						|
		tx.rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	tx.stats.SpillTime += time.Since(startTime)
 | 
						|
 | 
						|
	// Free the old root bucket.
 | 
						|
	tx.meta.root.root = tx.root.root
 | 
						|
 | 
						|
	// Free the old freelist because commit writes out a fresh freelist.
 | 
						|
	if tx.meta.freelist != pgidNoFreelist {
 | 
						|
		tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
 | 
						|
	}
 | 
						|
 | 
						|
	if !tx.db.NoFreelistSync {
 | 
						|
		err := tx.commitFreelist()
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		tx.meta.freelist = pgidNoFreelist
 | 
						|
	}
 | 
						|
 | 
						|
	// Write dirty pages to disk.
 | 
						|
	startTime = time.Now()
 | 
						|
	if err := tx.write(); err != nil {
 | 
						|
		tx.rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	// If strict mode is enabled then perform a consistency check.
 | 
						|
	// Only the first consistency error is reported in the panic.
 | 
						|
	if tx.db.StrictMode {
 | 
						|
		ch := tx.Check()
 | 
						|
		var errs []string
 | 
						|
		for {
 | 
						|
			err, ok := <-ch
 | 
						|
			if !ok {
 | 
						|
				break
 | 
						|
			}
 | 
						|
			errs = append(errs, err.Error())
 | 
						|
		}
 | 
						|
		if len(errs) > 0 {
 | 
						|
			panic("check fail: " + strings.Join(errs, "\n"))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Write meta to disk.
 | 
						|
	if err := tx.writeMeta(); err != nil {
 | 
						|
		tx.rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	tx.stats.WriteTime += time.Since(startTime)
 | 
						|
 | 
						|
	// Finalize the transaction.
 | 
						|
	tx.close()
 | 
						|
 | 
						|
	// Execute commit handlers now that the locks have been removed.
 | 
						|
	for _, fn := range tx.commitHandlers {
 | 
						|
		fn()
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (tx *Tx) commitFreelist() error {
 | 
						|
	// Allocate new pages for the new free list. This will overestimate
 | 
						|
	// the size of the freelist but not underestimate the size (which would be bad).
 | 
						|
	opgid := tx.meta.pgid
 | 
						|
	p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
 | 
						|
	if err != nil {
 | 
						|
		tx.rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := tx.db.freelist.write(p); err != nil {
 | 
						|
		tx.rollback()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	tx.meta.freelist = p.id
 | 
						|
	// If the high water mark has moved up then attempt to grow the database.
 | 
						|
	if tx.meta.pgid > opgid {
 | 
						|
		if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
 | 
						|
			tx.rollback()
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Rollback closes the transaction and ignores all previous updates. Read-only
 | 
						|
// transactions must be rolled back and not committed.
 | 
						|
func (tx *Tx) Rollback() error {
 | 
						|
	_assert(!tx.managed, "managed tx rollback not allowed")
 | 
						|
	if tx.db == nil {
 | 
						|
		return ErrTxClosed
 | 
						|
	}
 | 
						|
	tx.nonPhysicalRollback()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
 | 
						|
func (tx *Tx) nonPhysicalRollback() {
 | 
						|
	if tx.db == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if tx.writable {
 | 
						|
		tx.db.freelist.rollback(tx.meta.txid)
 | 
						|
	}
 | 
						|
	tx.close()
 | 
						|
}
 | 
						|
 | 
						|
// rollback needs to reload the free pages from disk in case some system error happens like fsync error.
 | 
						|
func (tx *Tx) rollback() {
 | 
						|
	if tx.db == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if tx.writable {
 | 
						|
		tx.db.freelist.rollback(tx.meta.txid)
 | 
						|
		if !tx.db.hasSyncedFreelist() {
 | 
						|
			// Reconstruct free page list by scanning the DB to get the whole free page list.
 | 
						|
			// Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
 | 
						|
			tx.db.freelist.noSyncReload(tx.db.freepages())
 | 
						|
		} else {
 | 
						|
			// Read free page list from freelist page.
 | 
						|
			tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	tx.close()
 | 
						|
}
 | 
						|
 | 
						|
func (tx *Tx) close() {
 | 
						|
	if tx.db == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if tx.writable {
 | 
						|
		// Grab freelist stats.
 | 
						|
		var freelistFreeN = tx.db.freelist.free_count()
 | 
						|
		var freelistPendingN = tx.db.freelist.pending_count()
 | 
						|
		var freelistAlloc = tx.db.freelist.size()
 | 
						|
 | 
						|
		// Remove transaction ref & writer lock.
 | 
						|
		tx.db.rwtx = nil
 | 
						|
		tx.db.rwlock.Unlock()
 | 
						|
 | 
						|
		// Merge statistics.
 | 
						|
		tx.db.statlock.Lock()
 | 
						|
		tx.db.stats.FreePageN = freelistFreeN
 | 
						|
		tx.db.stats.PendingPageN = freelistPendingN
 | 
						|
		tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
 | 
						|
		tx.db.stats.FreelistInuse = freelistAlloc
 | 
						|
		tx.db.stats.TxStats.add(&tx.stats)
 | 
						|
		tx.db.statlock.Unlock()
 | 
						|
	} else {
 | 
						|
		tx.db.removeTx(tx)
 | 
						|
	}
 | 
						|
 | 
						|
	// Clear all references.
 | 
						|
	tx.db = nil
 | 
						|
	tx.meta = nil
 | 
						|
	tx.root = Bucket{tx: tx}
 | 
						|
	tx.pages = nil
 | 
						|
}
 | 
						|
 | 
						|
// Copy writes the entire database to a writer.
 | 
						|
// This function exists for backwards compatibility.
 | 
						|
//
 | 
						|
// Deprecated; Use WriteTo() instead.
 | 
						|
func (tx *Tx) Copy(w io.Writer) error {
 | 
						|
	_, err := tx.WriteTo(w)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// WriteTo writes the entire database to a writer.
 | 
						|
// If err == nil then exactly tx.Size() bytes will be written into the writer.
 | 
						|
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
 | 
						|
	// Attempt to open reader with WriteFlag
 | 
						|
	f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
 | 
						|
	if err != nil {
 | 
						|
		return 0, err
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		if cerr := f.Close(); err == nil {
 | 
						|
			err = cerr
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	// Generate a meta page. We use the same page data for both meta pages.
 | 
						|
	buf := make([]byte, tx.db.pageSize)
 | 
						|
	page := (*page)(unsafe.Pointer(&buf[0]))
 | 
						|
	page.flags = metaPageFlag
 | 
						|
	*page.meta() = *tx.meta
 | 
						|
 | 
						|
	// Write meta 0.
 | 
						|
	page.id = 0
 | 
						|
	page.meta().checksum = page.meta().sum64()
 | 
						|
	nn, err := w.Write(buf)
 | 
						|
	n += int64(nn)
 | 
						|
	if err != nil {
 | 
						|
		return n, fmt.Errorf("meta 0 copy: %s", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Write meta 1 with a lower transaction id.
 | 
						|
	page.id = 1
 | 
						|
	page.meta().txid -= 1
 | 
						|
	page.meta().checksum = page.meta().sum64()
 | 
						|
	nn, err = w.Write(buf)
 | 
						|
	n += int64(nn)
 | 
						|
	if err != nil {
 | 
						|
		return n, fmt.Errorf("meta 1 copy: %s", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Move past the meta pages in the file.
 | 
						|
	if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
 | 
						|
		return n, fmt.Errorf("seek: %s", err)
 | 
						|
	}
 | 
						|
 | 
						|
	// Copy data pages.
 | 
						|
	wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
 | 
						|
	n += wn
 | 
						|
	if err != nil {
 | 
						|
		return n, err
 | 
						|
	}
 | 
						|
 | 
						|
	return n, nil
 | 
						|
}
 | 
						|
 | 
						|
// CopyFile copies the entire database to file at the given path.
 | 
						|
// A reader transaction is maintained during the copy so it is safe to continue
 | 
						|
// using the database while a copy is in progress.
 | 
						|
func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
 | 
						|
	f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err = tx.Copy(f)
 | 
						|
	if err != nil {
 | 
						|
		_ = f.Close()
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return f.Close()
 | 
						|
}
 | 
						|
 | 
						|
// Check performs several consistency checks on the database for this transaction.
 | 
						|
// An error is returned if any inconsistency is found.
 | 
						|
//
 | 
						|
// It can be safely run concurrently on a writable transaction. However, this
 | 
						|
// incurs a high cost for large databases and databases with a lot of subbuckets
 | 
						|
// because of caching. This overhead can be removed if running on a read-only
 | 
						|
// transaction, however, it is not safe to execute other writer transactions at
 | 
						|
// the same time.
 | 
						|
func (tx *Tx) Check() <-chan error {
 | 
						|
	ch := make(chan error)
 | 
						|
	go tx.check(ch)
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (tx *Tx) check(ch chan error) {
 | 
						|
	// Force loading free list if opened in ReadOnly mode.
 | 
						|
	tx.db.loadFreelist()
 | 
						|
 | 
						|
	// Check if any pages are double freed.
 | 
						|
	freed := make(map[pgid]bool)
 | 
						|
	all := make([]pgid, tx.db.freelist.count())
 | 
						|
	tx.db.freelist.copyall(all)
 | 
						|
	for _, id := range all {
 | 
						|
		if freed[id] {
 | 
						|
			ch <- fmt.Errorf("page %d: already freed", id)
 | 
						|
		}
 | 
						|
		freed[id] = true
 | 
						|
	}
 | 
						|
 | 
						|
	// Track every reachable page.
 | 
						|
	reachable := make(map[pgid]*page)
 | 
						|
	reachable[0] = tx.page(0) // meta0
 | 
						|
	reachable[1] = tx.page(1) // meta1
 | 
						|
	if tx.meta.freelist != pgidNoFreelist {
 | 
						|
		for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
 | 
						|
			reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Recursively check buckets.
 | 
						|
	tx.checkBucket(&tx.root, reachable, freed, ch)
 | 
						|
 | 
						|
	// Ensure all pages below high water mark are either reachable or freed.
 | 
						|
	for i := pgid(0); i < tx.meta.pgid; i++ {
 | 
						|
		_, isReachable := reachable[i]
 | 
						|
		if !isReachable && !freed[i] {
 | 
						|
			ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Close the channel to signal completion.
 | 
						|
	close(ch)
 | 
						|
}
 | 
						|
 | 
						|
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
 | 
						|
	// Ignore inline buckets.
 | 
						|
	if b.root == 0 {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// Check every page used by this bucket.
 | 
						|
	b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
 | 
						|
		if p.id > tx.meta.pgid {
 | 
						|
			ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
 | 
						|
		}
 | 
						|
 | 
						|
		// Ensure each page is only referenced once.
 | 
						|
		for i := pgid(0); i <= pgid(p.overflow); i++ {
 | 
						|
			var id = p.id + i
 | 
						|
			if _, ok := reachable[id]; ok {
 | 
						|
				ch <- fmt.Errorf("page %d: multiple references", int(id))
 | 
						|
			}
 | 
						|
			reachable[id] = p
 | 
						|
		}
 | 
						|
 | 
						|
		// We should only encounter un-freed leaf and branch pages.
 | 
						|
		if freed[p.id] {
 | 
						|
			ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
 | 
						|
		} else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
 | 
						|
			ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
 | 
						|
		}
 | 
						|
	})
 | 
						|
 | 
						|
	// Check each bucket within this bucket.
 | 
						|
	_ = b.ForEach(func(k, v []byte) error {
 | 
						|
		if child := b.Bucket(k); child != nil {
 | 
						|
			tx.checkBucket(child, reachable, freed, ch)
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// allocate returns a contiguous block of memory starting at a given page.
 | 
						|
func (tx *Tx) allocate(count int) (*page, error) {
 | 
						|
	p, err := tx.db.allocate(tx.meta.txid, count)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	// Save to our page cache.
 | 
						|
	tx.pages[p.id] = p
 | 
						|
 | 
						|
	// Update statistics.
 | 
						|
	tx.stats.PageCount += count
 | 
						|
	tx.stats.PageAlloc += count * tx.db.pageSize
 | 
						|
 | 
						|
	return p, nil
 | 
						|
}
 | 
						|
 | 
						|
// write writes any dirty pages to disk.
 | 
						|
func (tx *Tx) write() error {
 | 
						|
	// Sort pages by id.
 | 
						|
	pages := make(pages, 0, len(tx.pages))
 | 
						|
	for _, p := range tx.pages {
 | 
						|
		pages = append(pages, p)
 | 
						|
	}
 | 
						|
	// Clear out page cache early.
 | 
						|
	tx.pages = make(map[pgid]*page)
 | 
						|
	sort.Sort(pages)
 | 
						|
 | 
						|
	// Write pages to disk in order.
 | 
						|
	for _, p := range pages {
 | 
						|
		rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
 | 
						|
		offset := int64(p.id) * int64(tx.db.pageSize)
 | 
						|
		var written uintptr
 | 
						|
 | 
						|
		// Write out page in "max allocation" sized chunks.
 | 
						|
		for {
 | 
						|
			sz := rem
 | 
						|
			if sz > maxAllocSize-1 {
 | 
						|
				sz = maxAllocSize - 1
 | 
						|
			}
 | 
						|
			buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
 | 
						|
 | 
						|
			if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
 | 
						|
			// Update statistics.
 | 
						|
			tx.stats.Write++
 | 
						|
 | 
						|
			// Exit inner for loop if we've written all the chunks.
 | 
						|
			rem -= sz
 | 
						|
			if rem == 0 {
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// Otherwise move offset forward and move pointer to next chunk.
 | 
						|
			offset += int64(sz)
 | 
						|
			written += uintptr(sz)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Ignore file sync if flag is set on DB.
 | 
						|
	if !tx.db.NoSync || IgnoreNoSync {
 | 
						|
		if err := fdatasync(tx.db); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Put small pages back to page pool.
 | 
						|
	for _, p := range pages {
 | 
						|
		// Ignore page sizes over 1 page.
 | 
						|
		// These are allocated using make() instead of the page pool.
 | 
						|
		if int(p.overflow) != 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
 | 
						|
 | 
						|
		// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
 | 
						|
		for i := range buf {
 | 
						|
			buf[i] = 0
 | 
						|
		}
 | 
						|
		tx.db.pagePool.Put(buf)
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// writeMeta writes the meta to the disk.
 | 
						|
func (tx *Tx) writeMeta() error {
 | 
						|
	// Create a temporary buffer for the meta page.
 | 
						|
	buf := make([]byte, tx.db.pageSize)
 | 
						|
	p := tx.db.pageInBuffer(buf, 0)
 | 
						|
	tx.meta.write(p)
 | 
						|
 | 
						|
	// Write the meta page to file.
 | 
						|
	if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if !tx.db.NoSync || IgnoreNoSync {
 | 
						|
		if err := fdatasync(tx.db); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Update statistics.
 | 
						|
	tx.stats.Write++
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// page returns a reference to the page with a given id.
 | 
						|
// If page has been written to then a temporary buffered page is returned.
 | 
						|
func (tx *Tx) page(id pgid) *page {
 | 
						|
	// Check the dirty pages first.
 | 
						|
	if tx.pages != nil {
 | 
						|
		if p, ok := tx.pages[id]; ok {
 | 
						|
			return p
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Otherwise return directly from the mmap.
 | 
						|
	return tx.db.page(id)
 | 
						|
}
 | 
						|
 | 
						|
// forEachPage iterates over every page within a given page and executes a function.
 | 
						|
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
 | 
						|
	p := tx.page(pgid)
 | 
						|
 | 
						|
	// Execute function.
 | 
						|
	fn(p, depth)
 | 
						|
 | 
						|
	// Recursively loop over children.
 | 
						|
	if (p.flags & branchPageFlag) != 0 {
 | 
						|
		for i := 0; i < int(p.count); i++ {
 | 
						|
			elem := p.branchPageElement(uint16(i))
 | 
						|
			tx.forEachPage(elem.pgid, depth+1, fn)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Page returns page information for a given page number.
 | 
						|
// This is only safe for concurrent use when used by a writable transaction.
 | 
						|
func (tx *Tx) Page(id int) (*PageInfo, error) {
 | 
						|
	if tx.db == nil {
 | 
						|
		return nil, ErrTxClosed
 | 
						|
	} else if pgid(id) >= tx.meta.pgid {
 | 
						|
		return nil, nil
 | 
						|
	}
 | 
						|
 | 
						|
	// Build the page info.
 | 
						|
	p := tx.db.page(pgid(id))
 | 
						|
	info := &PageInfo{
 | 
						|
		ID:            id,
 | 
						|
		Count:         int(p.count),
 | 
						|
		OverflowCount: int(p.overflow),
 | 
						|
	}
 | 
						|
 | 
						|
	// Determine the type (or if it's free).
 | 
						|
	if tx.db.freelist.freed(pgid(id)) {
 | 
						|
		info.Type = "free"
 | 
						|
	} else {
 | 
						|
		info.Type = p.typ()
 | 
						|
	}
 | 
						|
 | 
						|
	return info, nil
 | 
						|
}
 | 
						|
 | 
						|
// TxStats represents statistics about the actions performed by the transaction.
 | 
						|
type TxStats struct {
 | 
						|
	// Page statistics.
 | 
						|
	PageCount int // number of page allocations
 | 
						|
	PageAlloc int // total bytes allocated
 | 
						|
 | 
						|
	// Cursor statistics.
 | 
						|
	CursorCount int // number of cursors created
 | 
						|
 | 
						|
	// Node statistics
 | 
						|
	NodeCount int // number of node allocations
 | 
						|
	NodeDeref int // number of node dereferences
 | 
						|
 | 
						|
	// Rebalance statistics.
 | 
						|
	Rebalance     int           // number of node rebalances
 | 
						|
	RebalanceTime time.Duration // total time spent rebalancing
 | 
						|
 | 
						|
	// Split/Spill statistics.
 | 
						|
	Split     int           // number of nodes split
 | 
						|
	Spill     int           // number of nodes spilled
 | 
						|
	SpillTime time.Duration // total time spent spilling
 | 
						|
 | 
						|
	// Write statistics.
 | 
						|
	Write     int           // number of writes performed
 | 
						|
	WriteTime time.Duration // total time spent writing to disk
 | 
						|
}
 | 
						|
 | 
						|
func (s *TxStats) add(other *TxStats) {
 | 
						|
	s.PageCount += other.PageCount
 | 
						|
	s.PageAlloc += other.PageAlloc
 | 
						|
	s.CursorCount += other.CursorCount
 | 
						|
	s.NodeCount += other.NodeCount
 | 
						|
	s.NodeDeref += other.NodeDeref
 | 
						|
	s.Rebalance += other.Rebalance
 | 
						|
	s.RebalanceTime += other.RebalanceTime
 | 
						|
	s.Split += other.Split
 | 
						|
	s.Spill += other.Spill
 | 
						|
	s.SpillTime += other.SpillTime
 | 
						|
	s.Write += other.Write
 | 
						|
	s.WriteTime += other.WriteTime
 | 
						|
}
 | 
						|
 | 
						|
// Sub calculates and returns the difference between two sets of transaction stats.
 | 
						|
// This is useful when obtaining stats at two different points and time and
 | 
						|
// you need the performance counters that occurred within that time span.
 | 
						|
func (s *TxStats) Sub(other *TxStats) TxStats {
 | 
						|
	var diff TxStats
 | 
						|
	diff.PageCount = s.PageCount - other.PageCount
 | 
						|
	diff.PageAlloc = s.PageAlloc - other.PageAlloc
 | 
						|
	diff.CursorCount = s.CursorCount - other.CursorCount
 | 
						|
	diff.NodeCount = s.NodeCount - other.NodeCount
 | 
						|
	diff.NodeDeref = s.NodeDeref - other.NodeDeref
 | 
						|
	diff.Rebalance = s.Rebalance - other.Rebalance
 | 
						|
	diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
 | 
						|
	diff.Split = s.Split - other.Split
 | 
						|
	diff.Spill = s.Spill - other.Spill
 | 
						|
	diff.SpillTime = s.SpillTime - other.SpillTime
 | 
						|
	diff.Write = s.Write - other.Write
 | 
						|
	diff.WriteTime = s.WriteTime - other.WriteTime
 | 
						|
	return diff
 | 
						|
}
 |