package badger
import (
"bufio"
"bytes"
"crypto/aes"
cryptorand "crypto/rand"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/skl"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type memTable struct {
sl *skl .Skiplist
wal *logFile
maxVersion uint64
opt Options
buf *bytes .Buffer
}
func (db *DB ) openMemTables (opt Options ) error {
if db .opt .InMemory {
return nil
}
files , err := os .ReadDir (db .opt .Dir )
if err != nil {
return errFile (err , db .opt .Dir , "Unable to open mem dir." )
}
var fids []int
for _ , file := range files {
if !strings .HasSuffix (file .Name (), memFileExt ) {
continue
}
fsz := len (file .Name ())
fid , err := strconv .ParseInt (file .Name ()[:fsz -len (memFileExt )], 10 , 64 )
if err != nil {
return errFile (err , file .Name (), "Unable to parse log id." )
}
fids = append (fids , int (fid ))
}
sort .Slice (fids , func (i , j int ) bool {
return fids [i ] < fids [j ]
})
for _ , fid := range fids {
flags := os .O_RDWR
if db .opt .ReadOnly {
flags = os .O_RDONLY
}
mt , err := db .openMemTable (fid , flags )
if err != nil {
return y .Wrapf (err , "while opening fid: %d" , fid )
}
if mt .sl .Empty () {
mt .DecrRef ()
continue
}
db .imm = append (db .imm , mt )
}
if len (fids ) != 0 {
db .nextMemFid = fids [len (fids )-1 ]
}
db .nextMemFid ++
return nil
}
const memFileExt string = ".mem"
func (db *DB ) openMemTable (fid , flags int ) (*memTable , error ) {
filepath := db .mtFilePath (fid )
s := skl .NewSkiplist (arenaSize (db .opt ))
mt := &memTable {
sl : s ,
opt : db .opt ,
buf : &bytes .Buffer {},
}
if db .opt .InMemory {
return mt , z .NewFile
}
mt .wal = &logFile {
fid : uint32 (fid ),
path : filepath ,
registry : db .registry ,
writeAt : vlogHeaderSize ,
opt : db .opt ,
}
lerr := mt .wal .open (filepath , flags , 2 *db .opt .MemTableSize )
if lerr != z .NewFile && lerr != nil {
return nil , y .Wrapf (lerr , "While opening memtable: %s" , filepath )
}
s .OnClose = func () {
if err := mt .wal .Delete (); err != nil {
db .opt .Errorf ("while deleting file: %s, err: %v" , filepath , err )
}
}
if lerr == z .NewFile {
return mt , lerr
}
err := mt .UpdateSkipList ()
return mt , y .Wrapf (err , "while updating skiplist" )
}
func (db *DB ) newMemTable () (*memTable , error ) {
mt , err := db .openMemTable (db .nextMemFid , os .O_CREATE |os .O_RDWR )
if err == z .NewFile {
db .nextMemFid ++
return mt , nil
}
if err != nil {
db .opt .Errorf ("Got error: %v for id: %d\n" , err , db .nextMemFid )
return nil , y .Wrapf (err , "newMemTable" )
}
return nil , fmt .Errorf ("File %s already exists" , mt .wal .Fd .Name ())
}
func (db *DB ) mtFilePath (fid int ) string {
return filepath .Join (db .opt .Dir , fmt .Sprintf ("%05d%s" , fid , memFileExt ))
}
func (mt *memTable ) SyncWAL () error {
return mt .wal .Sync ()
}
func (mt *memTable ) isFull () bool {
if mt .sl .MemSize () >= mt .opt .MemTableSize {
return true
}
if mt .opt .InMemory {
return false
}
return int64 (mt .wal .writeAt ) >= mt .opt .MemTableSize
}
func (mt *memTable ) Put (key []byte , value y .ValueStruct ) error {
entry := &Entry {
Key : key ,
Value : value .Value ,
UserMeta : value .UserMeta ,
meta : value .Meta ,
ExpiresAt : value .ExpiresAt ,
}
if mt .wal != nil {
if err := mt .wal .writeEntry (mt .buf , entry , mt .opt ); err != nil {
return y .Wrapf (err , "cannot write entry to WAL file" )
}
}
if entry .meta &bitFinTxn > 0 {
return nil
}
mt .sl .Put (key , value )
if ts := y .ParseTs (entry .Key ); ts > mt .maxVersion {
mt .maxVersion = ts
}
y .NumBytesWrittenToL0Add (mt .opt .MetricsEnabled , entry .estimateSizeAndSetThreshold (mt .opt .ValueThreshold ))
return nil
}
func (mt *memTable ) UpdateSkipList () error {
if mt .wal == nil || mt .sl == nil {
return nil
}
endOff , err := mt .wal .iterate (true , 0 , mt .replayFunction (mt .opt ))
if err != nil {
return y .Wrapf (err , "while iterating wal: %s" , mt .wal .Fd .Name ())
}
if endOff < mt .wal .size .Load () && mt .opt .ReadOnly {
return y .Wrapf (ErrTruncateNeeded , "end offset: %d < size: %d" , endOff , mt .wal .size .Load ())
}
return mt .wal .Truncate (int64 (endOff ))
}
func (mt *memTable ) IncrRef () {
mt .sl .IncrRef ()
}
func (mt *memTable ) DecrRef () {
mt .sl .DecrRef ()
}
func (mt *memTable ) replayFunction (opt Options ) func (Entry , valuePointer ) error {
first := true
return func (e Entry , _ valuePointer ) error {
if first {
opt .Debugf ("First key=%q\n" , e .Key )
}
first = false
if ts := y .ParseTs (e .Key ); ts > mt .maxVersion {
mt .maxVersion = ts
}
v := y .ValueStruct {
Value : e .Value ,
Meta : e .meta ,
UserMeta : e .UserMeta ,
ExpiresAt : e .ExpiresAt ,
}
mt .sl .Put (e .Key , v )
return nil
}
}
type logFile struct {
*z .MmapFile
path string
lock sync .RWMutex
fid uint32
size atomic .Uint32
dataKey *pb .DataKey
baseIV []byte
registry *KeyRegistry
writeAt uint32
opt Options
}
func (lf *logFile ) Truncate (end int64 ) error {
if fi , err := lf .Fd .Stat (); err != nil {
return fmt .Errorf ("while file.stat on file: %s, error: %v\n" , lf .Fd .Name (), err )
} else if fi .Size () == end {
return nil
}
y .AssertTrue (!lf .opt .ReadOnly )
lf .size .Store (uint32 (end ))
return lf .MmapFile .Truncate (end )
}
func (lf *logFile ) encodeEntry (buf *bytes .Buffer , e *Entry , offset uint32 ) (int , error ) {
h := header {
klen : uint32 (len (e .Key )),
vlen : uint32 (len (e .Value )),
expiresAt : e .ExpiresAt ,
meta : e .meta ,
userMeta : e .UserMeta ,
}
hash := crc32 .New (y .CastagnoliCrcTable )
writer := io .MultiWriter (buf , hash )
var headerEnc [maxHeaderSize ]byte
sz := h .Encode (headerEnc [:])
y .Check2 (writer .Write (headerEnc [:sz ]))
if lf .encryptionEnabled () {
eBuf := make ([]byte , 0 , len (e .Key )+len (e .Value ))
eBuf = append (eBuf , e .Key ...)
eBuf = append (eBuf , e .Value ...)
if err := y .XORBlockStream (
writer , eBuf , lf .dataKey .Data , lf .generateIV (offset )); err != nil {
return 0 , y .Wrapf (err , "Error while encoding entry for vlog." )
}
} else {
y .Check2 (writer .Write (e .Key ))
y .Check2 (writer .Write (e .Value ))
}
var crcBuf [crc32 .Size ]byte
binary .BigEndian .PutUint32 (crcBuf [:], hash .Sum32 ())
y .Check2 (buf .Write (crcBuf [:]))
return len (headerEnc [:sz ]) + len (e .Key ) + len (e .Value ) + len (crcBuf ), nil
}
func (lf *logFile ) writeEntry (buf *bytes .Buffer , e *Entry , opt Options ) error {
buf .Reset ()
plen , err := lf .encodeEntry (buf , e , lf .writeAt )
if err != nil {
return err
}
y .AssertTrue (plen == copy (lf .Data [lf .writeAt :], buf .Bytes ()))
lf .writeAt += uint32 (plen )
lf .zeroNextEntry ()
return nil
}
func (lf *logFile ) decodeEntry (buf []byte , offset uint32 ) (*Entry , error ) {
var h header
hlen := h .Decode (buf )
kv := buf [hlen :]
if lf .encryptionEnabled () {
var err error
if kv , err = lf .decryptKV (kv , offset ); err != nil {
return nil , err
}
}
e := &Entry {
meta : h .meta ,
UserMeta : h .userMeta ,
ExpiresAt : h .expiresAt ,
offset : offset ,
Key : kv [:h .klen ],
Value : kv [h .klen : h .klen +h .vlen ],
}
return e , nil
}
func (lf *logFile ) decryptKV (buf []byte , offset uint32 ) ([]byte , error ) {
return y .XORBlockAllocate (buf , lf .dataKey .Data , lf .generateIV (offset ))
}
func (lf *logFile ) keyID () uint64 {
if lf .dataKey == nil {
return 0
}
return lf .dataKey .KeyId
}
func (lf *logFile ) encryptionEnabled () bool {
return lf .dataKey != nil
}
func (lf *logFile ) read (p valuePointer ) (buf []byte , err error ) {
offset := p .Offset
size := int64 (len (lf .Data ))
valsz := p .Len
lfsz := lf .size .Load ()
if int64 (offset ) >= size || int64 (offset +valsz ) > size ||
int64 (offset +valsz ) > int64 (lfsz ) {
err = y .ErrEOF
} else {
buf = lf .Data [offset : offset +valsz ]
}
return buf , err
}
func (lf *logFile ) generateIV (offset uint32 ) []byte {
iv := make ([]byte , aes .BlockSize )
y .AssertTrue (12 == copy (iv [:12 ], lf .baseIV ))
binary .BigEndian .PutUint32 (iv [12 :], offset )
return iv
}
func (lf *logFile ) doneWriting (offset uint32 ) error {
if lf .opt .SyncWrites {
if err := lf .Sync (); err != nil {
return y .Wrapf (err , "Unable to sync value log: %q" , lf .path )
}
}
lf .lock .Lock ()
defer lf .lock .Unlock ()
if err := lf .Truncate (int64 (offset )); err != nil {
return y .Wrapf (err , "Unable to truncate file: %q" , lf .path )
}
return nil
}
func (lf *logFile ) iterate (readOnly bool , offset uint32 , fn logEntry ) (uint32 , error ) {
if offset == 0 {
offset = vlogHeaderSize
}
reader := bufio .NewReader (lf .NewReader (int (offset )))
read := &safeRead {
k : make ([]byte , 10 ),
v : make ([]byte , 10 ),
recordOffset : offset ,
lf : lf ,
}
var lastCommit uint64
var validEndOffset uint32 = offset
var entries []*Entry
var vptrs []valuePointer
loop :
for {
e , err := read .Entry (reader )
switch {
case err == io .EOF :
break loop
case err == io .ErrUnexpectedEOF || err == errTruncate :
break loop
case err != nil :
return 0 , err
case e == nil :
continue
case e .isZero ():
break loop
}
var vp valuePointer
vp .Len = uint32 (e .hlen + len (e .Key ) + len (e .Value ) + crc32 .Size )
read .recordOffset += vp .Len
vp .Offset = e .offset
vp .Fid = lf .fid
switch {
case e .meta &bitTxn > 0 :
txnTs := y .ParseTs (e .Key )
if lastCommit == 0 {
lastCommit = txnTs
}
if lastCommit != txnTs {
break loop
}
entries = append (entries , e )
vptrs = append (vptrs , vp )
case e .meta &bitFinTxn > 0 :
txnTs , err := strconv .ParseUint (string (e .Value ), 10 , 64 )
if err != nil || lastCommit != txnTs {
break loop
}
lastCommit = 0
validEndOffset = read .recordOffset
for i , e := range entries {
vp := vptrs [i ]
if err := fn (*e , vp ); err != nil {
if err == errStop {
break
}
return 0 , errFile (err , lf .path , "Iteration function" )
}
}
entries = entries [:0 ]
vptrs = vptrs [:0 ]
default :
if lastCommit != 0 {
break loop
}
validEndOffset = read .recordOffset
if err := fn (*e , vp ); err != nil {
if err == errStop {
break
}
return 0 , errFile (err , lf .path , "Iteration function" )
}
}
}
return validEndOffset , nil
}
func (lf *logFile ) zeroNextEntry () {
z .ZeroOut (lf .Data , int (lf .writeAt ), int (lf .writeAt +maxHeaderSize ))
}
func (lf *logFile ) open (path string , flags int , fsize int64 ) error {
mf , ferr := z .OpenMmapFile (path , flags , int (fsize ))
lf .MmapFile = mf
if ferr == z .NewFile {
if err := lf .bootstrap (); err != nil {
os .Remove (path )
return err
}
lf .size .Store (vlogHeaderSize )
} else if ferr != nil {
return y .Wrapf (ferr , "while opening file: %s" , path )
}
lf .size .Store (uint32 (len (lf .Data )))
if lf .size .Load () < vlogHeaderSize {
return nil
}
buf := make ([]byte , vlogHeaderSize )
y .AssertTruef (vlogHeaderSize == copy (buf , lf .Data ),
"Unable to copy from %s, size %d" , path , lf .size .Load ())
keyID := binary .BigEndian .Uint64 (buf [:8 ])
if dk , err := lf .registry .DataKey (keyID ); err != nil {
return y .Wrapf (err , "While opening vlog file %d" , lf .fid )
} else {
lf .dataKey = dk
}
lf .baseIV = buf [8 :]
y .AssertTrue (len (lf .baseIV ) == 12 )
return ferr
}
func (lf *logFile ) bootstrap () error {
var err error
var dk *pb .DataKey
if dk , err = lf .registry .LatestDataKey (); err != nil {
return y .Wrapf (err , "Error while retrieving datakey in logFile.bootstarp" )
}
lf .dataKey = dk
buf := make ([]byte , vlogHeaderSize )
binary .BigEndian .PutUint64 (buf [:8 ], lf .keyID ())
if _ , err := cryptorand .Read (buf [8 :]); err != nil {
return y .Wrapf (err , "Error while creating base IV, while creating logfile" )
}
lf .baseIV = buf [8 :]
y .AssertTrue (len (lf .baseIV ) == 12 )
y .AssertTrue (vlogHeaderSize == copy (lf .Data [0 :], buf ))
lf .zeroNextEntry ()
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .