package badger
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path/filepath"
"sync"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/options"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
)
type Manifest struct {
Levels []levelManifest
Tables map [uint64 ]TableManifest
Creations int
Deletions int
}
func createManifest() Manifest {
levels := make ([]levelManifest , 0 )
return Manifest {
Levels : levels ,
Tables : make (map [uint64 ]TableManifest ),
}
}
type levelManifest struct {
Tables map [uint64 ]struct {}
}
type TableManifest struct {
Level uint8
KeyID uint64
Compression options .CompressionType
}
type manifestFile struct {
fp *os .File
directory string
externalMagic uint16
deletionsRewriteThreshold int
appendLock sync .Mutex
manifest Manifest
inMemory bool
}
const (
ManifestFilename = "MANIFEST"
manifestRewriteFilename = "MANIFEST-REWRITE"
manifestDeletionsRewriteThreshold = 10000
manifestDeletionsRatio = 10
)
func (m *Manifest ) asChanges () []*pb .ManifestChange {
changes := make ([]*pb .ManifestChange , 0 , len (m .Tables ))
for id , tm := range m .Tables {
changes = append (changes , newCreateChange (id , int (tm .Level ), tm .KeyID , tm .Compression ))
}
return changes
}
func (m *Manifest ) clone (opt Options ) Manifest {
changeSet := pb .ManifestChangeSet {Changes : m .asChanges ()}
ret := createManifest ()
y .Check (applyChangeSet (&ret , &changeSet , opt ))
return ret
}
func openOrCreateManifestFile(opt Options ) (
ret *manifestFile , result Manifest , err error ) {
if opt .InMemory {
return &manifestFile {inMemory : true }, Manifest {}, nil
}
return helpOpenOrCreateManifestFile (opt .Dir , opt .ReadOnly , opt .ExternalMagicVersion ,
manifestDeletionsRewriteThreshold , opt )
}
func helpOpenOrCreateManifestFile(dir string , readOnly bool , extMagic uint16 ,
deletionsThreshold int , opt Options ) (*manifestFile , Manifest , error ) {
path := filepath .Join (dir , ManifestFilename )
var flags y .Flags
if readOnly {
flags |= y .ReadOnly
}
fp , err := y .OpenExistingFile (path , flags )
if err != nil {
if !os .IsNotExist (err ) {
return nil , Manifest {}, err
}
if readOnly {
return nil , Manifest {}, fmt .Errorf ("no manifest found, required for read-only db" )
}
m := createManifest ()
fp , netCreations , err := helpRewrite (dir , &m , extMagic )
if err != nil {
return nil , Manifest {}, err
}
y .AssertTrue (netCreations == 0 )
mf := &manifestFile {
fp : fp ,
directory : dir ,
externalMagic : extMagic ,
manifest : m .clone (opt ),
deletionsRewriteThreshold : deletionsThreshold ,
}
return mf , m , nil
}
manifest , truncOffset , err := ReplayManifestFile (fp , extMagic , opt )
if err != nil {
_ = fp .Close ()
return nil , Manifest {}, err
}
if !readOnly {
if err := fp .Truncate (truncOffset ); err != nil {
_ = fp .Close ()
return nil , Manifest {}, err
}
}
if _, err = fp .Seek (0 , io .SeekEnd ); err != nil {
_ = fp .Close ()
return nil , Manifest {}, err
}
mf := &manifestFile {
fp : fp ,
directory : dir ,
externalMagic : extMagic ,
manifest : manifest .clone (opt ),
deletionsRewriteThreshold : deletionsThreshold ,
}
return mf , manifest , nil
}
func (mf *manifestFile ) close () error {
if mf .inMemory {
return nil
}
return mf .fp .Close ()
}
func (mf *manifestFile ) addChanges (changesParam []*pb .ManifestChange , opt Options ) error {
if mf .inMemory {
return nil
}
changes := pb .ManifestChangeSet {Changes : changesParam }
buf , err := proto .Marshal (&changes )
if err != nil {
return err
}
mf .appendLock .Lock ()
defer mf .appendLock .Unlock ()
if err := applyChangeSet (&mf .manifest , &changes , opt ); err != nil {
return err
}
if mf .manifest .Deletions > mf .deletionsRewriteThreshold &&
mf .manifest .Deletions > manifestDeletionsRatio *(mf .manifest .Creations -mf .manifest .Deletions ) {
if err := mf .rewrite (); err != nil {
return err
}
} else {
var lenCrcBuf [8 ]byte
binary .BigEndian .PutUint32 (lenCrcBuf [0 :4 ], uint32 (len (buf )))
binary .BigEndian .PutUint32 (lenCrcBuf [4 :8 ], crc32 .Checksum (buf , y .CastagnoliCrcTable ))
buf = append (lenCrcBuf [:], buf ...)
if _ , err := mf .fp .Write (buf ); err != nil {
return err
}
}
return syncFunc (mf .fp )
}
var syncFunc = func (f *os .File ) error { return f .Sync () }
var magicText = [4 ]byte {'B' , 'd' , 'g' , 'r' }
const badgerMagicVersion = 8
func helpRewrite(dir string , m *Manifest , extMagic uint16 ) (*os .File , int , error ) {
rewritePath := filepath .Join (dir , manifestRewriteFilename )
fp , err := y .OpenTruncFile (rewritePath , false )
if err != nil {
return nil , 0 , err
}
y .AssertTrue (badgerMagicVersion <= math .MaxUint16 )
buf := make ([]byte , 8 )
copy (buf [0 :4 ], magicText [:])
binary .BigEndian .PutUint16 (buf [4 :6 ], extMagic )
binary .BigEndian .PutUint16 (buf [6 :8 ], badgerMagicVersion )
netCreations := len (m .Tables )
changes := m .asChanges ()
set := pb .ManifestChangeSet {Changes : changes }
changeBuf , err := proto .Marshal (&set )
if err != nil {
fp .Close ()
return nil , 0 , err
}
var lenCrcBuf [8 ]byte
binary .BigEndian .PutUint32 (lenCrcBuf [0 :4 ], uint32 (len (changeBuf )))
binary .BigEndian .PutUint32 (lenCrcBuf [4 :8 ], crc32 .Checksum (changeBuf , y .CastagnoliCrcTable ))
buf = append (buf , lenCrcBuf [:]...)
buf = append (buf , changeBuf ...)
if _ , err := fp .Write (buf ); err != nil {
fp .Close ()
return nil , 0 , err
}
if err := fp .Sync (); err != nil {
fp .Close ()
return nil , 0 , err
}
if err = fp .Close (); err != nil {
return nil , 0 , err
}
manifestPath := filepath .Join (dir , ManifestFilename )
if err := os .Rename (rewritePath , manifestPath ); err != nil {
return nil , 0 , err
}
fp , err = y .OpenExistingFile (manifestPath , 0 )
if err != nil {
return nil , 0 , err
}
if _ , err := fp .Seek (0 , io .SeekEnd ); err != nil {
fp .Close ()
return nil , 0 , err
}
if err := syncDir (dir ); err != nil {
fp .Close ()
return nil , 0 , err
}
return fp , netCreations , nil
}
func (mf *manifestFile ) rewrite () error {
if err := mf .fp .Close (); err != nil {
return err
}
fp , netCreations , err := helpRewrite (mf .directory , &mf .manifest , mf .externalMagic )
if err != nil {
return err
}
mf .fp = fp
mf .manifest .Creations = netCreations
mf .manifest .Deletions = 0
return nil
}
type countingReader struct {
wrapped *bufio .Reader
count int64
}
func (r *countingReader ) Read (p []byte ) (n int , err error ) {
n , err = r .wrapped .Read (p )
r .count += int64 (n )
return
}
func (r *countingReader ) ReadByte () (b byte , err error ) {
b , err = r .wrapped .ReadByte ()
if err == nil {
r .count ++
}
return
}
var (
errBadMagic = errors .New ("manifest has bad magic" )
errBadChecksum = errors .New ("manifest has checksum mismatch" )
)
func ReplayManifestFile (fp *os .File , extMagic uint16 , opt Options ) (Manifest , int64 , error ) {
r := countingReader {wrapped : bufio .NewReader (fp )}
var magicBuf [8 ]byte
if _ , err := io .ReadFull (&r , magicBuf [:]); err != nil {
return Manifest {}, 0 , errBadMagic
}
if !bytes .Equal (magicBuf [0 :4 ], magicText [:]) {
return Manifest {}, 0 , errBadMagic
}
extVersion := y .BytesToU16 (magicBuf [4 :6 ])
version := y .BytesToU16 (magicBuf [6 :8 ])
if version != badgerMagicVersion {
return Manifest {}, 0 ,
fmt .Errorf ("manifest has unsupported version: %d (we support %d).\n" +
"Please see https://github.com/dgraph-io/badger/blob/main/docs/troubleshooting.md#i-see-manifest-has-unsupported-version-x-we-support-y-error" +
" on how to fix this" ,
version , badgerMagicVersion )
}
if extVersion != extMagic {
return Manifest {}, 0 ,
fmt .Errorf ("cannot open DB because the external magic number doesn't match, " +
"expected: %d, version present in manifest: %d" , extMagic , extVersion )
}
stat , err := fp .Stat ()
if err != nil {
return Manifest {}, 0 , err
}
build := createManifest ()
var offset int64
for {
offset = r .count
var lenCrcBuf [8 ]byte
_ , err := io .ReadFull (&r , lenCrcBuf [:])
if err != nil {
if err == io .EOF || err == io .ErrUnexpectedEOF {
break
}
return Manifest {}, 0 , err
}
length := y .BytesToU32 (lenCrcBuf [0 :4 ])
if length > uint32 (stat .Size ()) {
return Manifest {}, 0 , fmt .Errorf (
"Buffer length: %d greater than file size: %d. Manifest file might be corrupted" ,
length , stat .Size ())
}
var buf = make ([]byte , length )
if _ , err := io .ReadFull (&r , buf ); err != nil {
if err == io .EOF || err == io .ErrUnexpectedEOF {
break
}
return Manifest {}, 0 , err
}
if crc32 .Checksum (buf , y .CastagnoliCrcTable ) != y .BytesToU32 (lenCrcBuf [4 :8 ]) {
return Manifest {}, 0 , errBadChecksum
}
var changeSet pb .ManifestChangeSet
if err := proto .Unmarshal (buf , &changeSet ); err != nil {
return Manifest {}, 0 , err
}
if err := applyChangeSet (&build , &changeSet , opt ); err != nil {
return Manifest {}, 0 , err
}
}
return build , offset , nil
}
func applyManifestChange(build *Manifest , tc *pb .ManifestChange , opt Options ) error {
switch tc .Op {
case pb .ManifestChange_CREATE :
if _ , ok := build .Tables [tc .Id ]; ok {
return fmt .Errorf ("MANIFEST invalid, table %d exists" , tc .Id )
}
build .Tables [tc .Id ] = TableManifest {
Level : uint8 (tc .Level ),
KeyID : tc .KeyId ,
Compression : options .CompressionType (tc .Compression ),
}
for len (build .Levels ) <= int (tc .Level ) {
build .Levels = append (build .Levels , levelManifest {make (map [uint64 ]struct {})})
}
build .Levels [tc .Level ].Tables [tc .Id ] = struct {}{}
build .Creations ++
case pb .ManifestChange_DELETE :
tm , ok := build .Tables [tc .Id ]
if !ok {
opt .Warningf ("MANIFEST delete: table %d has already been removed" , tc .Id )
for _ , level := range build .Levels {
delete (level .Tables , tc .Id )
}
} else {
delete (build .Levels [tm .Level ].Tables , tc .Id )
delete (build .Tables , tc .Id )
}
build .Deletions ++
default :
return fmt .Errorf ("MANIFEST file has invalid manifestChange op" )
}
return nil
}
func applyChangeSet(build *Manifest , changeSet *pb .ManifestChangeSet , opt Options ) error {
for _ , change := range changeSet .Changes {
if err := applyManifestChange (build , change , opt ); err != nil {
return err
}
}
return nil
}
func newCreateChange(
id uint64 , level int , keyID uint64 , c options .CompressionType ) *pb .ManifestChange {
return &pb .ManifestChange {
Id : id ,
Op : pb .ManifestChange_CREATE ,
Level : uint32 (level ),
KeyId : keyID ,
EncryptionAlgo : pb .EncryptionAlgo_aes ,
Compression : uint32 (c ),
}
}
func newDeleteChange(id uint64 ) *pb .ManifestChange {
return &pb .ManifestChange {
Id : id ,
Op : pb .ManifestChange_DELETE ,
}
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .