package badger
import (
"bytes"
"context"
"errors"
"fmt"
"hash"
"hash/crc32"
"io"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
var maxVlogFileSize uint32 = math .MaxUint32
const (
bitDelete byte = 1 << 0
bitValuePointer byte = 1 << 1
bitDiscardEarlierVersions byte = 1 << 2
bitMergeEntry byte = 1 << 3
bitTxn byte = 1 << 6
bitFinTxn byte = 1 << 7
mi int64 = 1 << 20
vlogHeaderSize = 20
)
var errStop = errors .New ("Stop iteration" )
var errTruncate = errors .New ("Do truncate" )
type logEntry func (e Entry , vp valuePointer ) error
type safeRead struct {
k []byte
v []byte
recordOffset uint32
lf *logFile
}
type hashReader struct {
r io .Reader
h hash .Hash32
bytesRead int
}
func newHashReader(r io .Reader ) *hashReader {
hash := crc32 .New (y .CastagnoliCrcTable )
return &hashReader {
r : r ,
h : hash ,
}
}
func (t *hashReader ) Read (p []byte ) (int , error ) {
n , err := t .r .Read (p )
if err != nil {
return n , err
}
t .bytesRead += n
return t .h .Write (p [:n ])
}
func (t *hashReader ) ReadByte () (byte , error ) {
b := make ([]byte , 1 )
_ , err := t .Read (b )
return b [0 ], err
}
func (t *hashReader ) Sum32 () uint32 {
return t .h .Sum32 ()
}
func (r *safeRead ) Entry (reader io .Reader ) (*Entry , error ) {
tee := newHashReader (reader )
var h header
hlen , err := h .DecodeFrom (tee )
if err != nil {
return nil , err
}
if h .klen > uint32 (1 <<16 ) {
return nil , errTruncate
}
kl := int (h .klen )
if cap (r .k ) < kl {
r .k = make ([]byte , 2 *kl )
}
vl := int (h .vlen )
if cap (r .v ) < vl {
r .v = make ([]byte , 2 *vl )
}
e := &Entry {}
e .offset = r .recordOffset
e .hlen = hlen
buf := make ([]byte , h .klen +h .vlen )
if _ , err := io .ReadFull (tee , buf [:]); err != nil {
if err == io .EOF {
err = errTruncate
}
return nil , err
}
if r .lf .encryptionEnabled () {
if buf , err = r .lf .decryptKV (buf [:], r .recordOffset ); err != nil {
return nil , err
}
}
e .Key = buf [:h .klen ]
e .Value = buf [h .klen :]
var crcBuf [crc32 .Size ]byte
if _ , err := io .ReadFull (reader , crcBuf [:]); err != nil {
if err == io .EOF {
err = errTruncate
}
return nil , err
}
crc := y .BytesToU32 (crcBuf [:])
if crc != tee .Sum32 () {
return nil , errTruncate
}
e .meta = h .meta
e .UserMeta = h .userMeta
e .ExpiresAt = h .expiresAt
return e , nil
}
func (vlog *valueLog ) rewrite (f *logFile ) error {
vlog .filesLock .RLock ()
for _ , fid := range vlog .filesToBeDeleted {
if fid == f .fid {
vlog .filesLock .RUnlock ()
return fmt .Errorf ("value log file already marked for deletion fid: %d" , fid )
}
}
maxFid := vlog .maxFid
y .AssertTruef (f .fid < maxFid , "fid to move: %d. Current max fid: %d" , f .fid , maxFid )
vlog .filesLock .RUnlock ()
vlog .opt .Infof ("Rewriting fid: %d" , f .fid )
wb := make ([]*Entry , 0 , 1000 )
var size int64
y .AssertTrue (vlog .db != nil )
var count , moved int
fe := func (e Entry ) error {
count ++
if count %100000 == 0 {
vlog .opt .Debugf ("Processing entry %d" , count )
}
vs , err := vlog .db .get (e .Key )
if err != nil {
return err
}
if discardEntry (e , vs , vlog .db ) {
return nil
}
if len (vs .Value ) == 0 {
return fmt .Errorf ("Empty value: %+v" , vs )
}
var vp valuePointer
vp .Decode (vs .Value )
if vp .Fid > f .fid {
return nil
}
if vp .Offset > e .offset {
return nil
}
if vp .Fid == f .fid && vp .Offset == e .offset {
moved ++
ne := new (Entry )
ne .meta = e .meta &^ (bitValuePointer | bitTxn | bitFinTxn )
ne .UserMeta = e .UserMeta
ne .ExpiresAt = e .ExpiresAt
ne .Key = append ([]byte {}, e .Key ...)
ne .Value = append ([]byte {}, e .Value ...)
es := ne .estimateSizeAndSetThreshold (vlog .db .valueThreshold ())
es += int64 (len (e .Value ))
if int64 (len (wb )+1 ) >= vlog .opt .maxBatchCount ||
size +es >= vlog .opt .maxBatchSize {
if err := vlog .db .batchSet (wb ); err != nil {
return err
}
size = 0
wb = wb [:0 ]
}
wb = append (wb , ne )
size += es
} else {
}
return nil
}
_ , err := f .iterate (vlog .opt .ReadOnly , 0 , func (e Entry , vp valuePointer ) error {
return fe (e )
})
if err != nil {
return err
}
batchSize := 1024
var loops int
for i := 0 ; i < len (wb ); {
loops ++
if batchSize == 0 {
vlog .db .opt .Warningf ("We shouldn't reach batch size of zero." )
return ErrNoRewrite
}
end := i + batchSize
if end > len (wb ) {
end = len (wb )
}
if err := vlog .db .batchSet (wb [i :end ]); err != nil {
if err == ErrTxnTooBig {
batchSize = batchSize / 2
continue
}
return err
}
i += batchSize
}
vlog .opt .Infof ("Processed %d entries in %d loops" , len (wb ), loops )
vlog .opt .Infof ("Total entries: %d. Moved: %d" , count , moved )
vlog .opt .Infof ("Removing fid: %d" , f .fid )
var deleteFileNow bool
{
vlog .filesLock .Lock ()
if _ , ok := vlog .filesMap [f .fid ]; !ok {
vlog .filesLock .Unlock ()
return fmt .Errorf ("Unable to find fid: %d" , f .fid )
}
if vlog .iteratorCount () == 0 {
delete (vlog .filesMap , f .fid )
deleteFileNow = true
} else {
vlog .filesToBeDeleted = append (vlog .filesToBeDeleted , f .fid )
}
vlog .filesLock .Unlock ()
}
if deleteFileNow {
if err := vlog .deleteLogFile (f ); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog ) incrIteratorCount () {
vlog .numActiveIterators .Add (1 )
}
func (vlog *valueLog ) iteratorCount () int {
return int (vlog .numActiveIterators .Load ())
}
func (vlog *valueLog ) decrIteratorCount () error {
num := vlog .numActiveIterators .Add (-1 )
if num != 0 {
return nil
}
vlog .filesLock .Lock ()
lfs := make ([]*logFile , 0 , len (vlog .filesToBeDeleted ))
for _ , id := range vlog .filesToBeDeleted {
lfs = append (lfs , vlog .filesMap [id ])
delete (vlog .filesMap , id )
}
vlog .filesToBeDeleted = nil
vlog .filesLock .Unlock ()
for _ , lf := range lfs {
if err := vlog .deleteLogFile (lf ); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog ) deleteLogFile (lf *logFile ) error {
if lf == nil {
return nil
}
lf .lock .Lock ()
defer lf .lock .Unlock ()
vlog .discardStats .Update (lf .fid , -1 )
return lf .Delete ()
}
func (vlog *valueLog ) dropAll () (int , error ) {
if vlog .db .opt .InMemory {
return 0 , nil
}
var count int
deleteAll := func () error {
vlog .filesLock .Lock ()
defer vlog .filesLock .Unlock ()
for _ , lf := range vlog .filesMap {
if err := vlog .deleteLogFile (lf ); err != nil {
return err
}
count ++
}
vlog .filesMap = make (map [uint32 ]*logFile )
vlog .maxFid = 0
return nil
}
if err := deleteAll (); err != nil {
return count , err
}
vlog .db .opt .Infof ("Value logs deleted. Creating value log file: 1" )
if _ , err := vlog .createVlogFile (); err != nil {
return count , err
}
return count , nil
}
func (db *DB ) valueThreshold () int64 {
return db .threshold .valueThreshold .Load ()
}
type valueLog struct {
dirPath string
filesLock sync .RWMutex
filesMap map [uint32 ]*logFile
maxFid uint32
filesToBeDeleted []uint32
numActiveIterators atomic .Int32
db *DB
writableLogOffset atomic .Uint32
numEntriesWritten uint32
opt Options
garbageCh chan struct {}
discardStats *discardStats
}
func vlogFilePath(dirPath string , fid uint32 ) string {
return fmt .Sprintf ("%s%s%06d.vlog" , dirPath , string (os .PathSeparator ), fid )
}
func (vlog *valueLog ) fpath (fid uint32 ) string {
return vlogFilePath (vlog .dirPath , fid )
}
func (vlog *valueLog ) populateFilesMap () error {
vlog .filesMap = make (map [uint32 ]*logFile )
files , err := os .ReadDir (vlog .dirPath )
if err != nil {
return errFile (err , vlog .dirPath , "Unable to open log dir." )
}
found := make (map [uint64 ]struct {})
for _ , file := range files {
if !strings .HasSuffix (file .Name (), ".vlog" ) {
continue
}
fsz := len (file .Name ())
fid , err := strconv .ParseUint (file .Name ()[:fsz -5 ], 10 , 32 )
if err != nil {
return errFile (err , file .Name (), "Unable to parse log id." )
}
if _ , ok := found [fid ]; ok {
return errFile (err , file .Name (), "Duplicate file found. Please delete one." )
}
found [fid ] = struct {}{}
lf := &logFile {
fid : uint32 (fid ),
path : vlog .fpath (uint32 (fid )),
registry : vlog .db .registry ,
}
vlog .filesMap [uint32 (fid )] = lf
if vlog .maxFid < uint32 (fid ) {
vlog .maxFid = uint32 (fid )
}
}
return nil
}
func (vlog *valueLog ) createVlogFile () (*logFile , error ) {
fid := vlog .maxFid + 1
path := vlog .fpath (fid )
lf := &logFile {
fid : fid ,
path : path ,
registry : vlog .db .registry ,
writeAt : vlogHeaderSize ,
opt : vlog .opt ,
}
err := lf .open (path , os .O_RDWR |os .O_CREATE |os .O_EXCL , 2 *vlog .opt .ValueLogFileSize )
if err != z .NewFile && err != nil {
return nil , err
}
vlog .filesLock .Lock ()
vlog .filesMap [fid ] = lf
y .AssertTrue (vlog .maxFid < fid )
vlog .maxFid = fid
vlog .writableLogOffset .Store (vlogHeaderSize )
vlog .numEntriesWritten = 0
vlog .filesLock .Unlock ()
return lf , nil
}
func errFile(err error , path string , msg string ) error {
return fmt .Errorf ("%s. Path=%s. Error=%v" , msg , path , err )
}
func (vlog *valueLog ) init (db *DB ) {
vlog .opt = db .opt
vlog .db = db
if vlog .opt .InMemory {
return
}
vlog .dirPath = vlog .opt .ValueDir
vlog .garbageCh = make (chan struct {}, 1 )
lf , err := InitDiscardStats (vlog .opt )
y .Check (err )
vlog .discardStats = lf
db .logToSyncChan (endVLogInitMsg )
}
func (vlog *valueLog ) open (db *DB ) error {
if db .opt .InMemory {
return nil
}
if err := vlog .populateFilesMap (); err != nil {
return err
}
if len (vlog .filesMap ) == 0 {
if vlog .opt .ReadOnly {
return nil
}
_ , err := vlog .createVlogFile ()
return y .Wrapf (err , "Error while creating log file in valueLog.open" )
}
fids := vlog .sortedFids ()
for _ , fid := range fids {
lf , ok := vlog .filesMap [fid ]
y .AssertTrue (ok )
lf .opt = vlog .opt
if err := lf .open (vlog .fpath (fid ), os .O_RDWR ,
2 *vlog .opt .ValueLogFileSize ); err != nil {
return y .Wrapf (err , "Open existing file: %q" , lf .path )
}
if lf .size .Load () == vlogHeaderSize && fid != vlog .maxFid {
vlog .opt .Infof ("Deleting empty file: %s" , lf .path )
if err := lf .Delete (); err != nil {
return y .Wrapf (err , "while trying to delete empty file: %s" , lf .path )
}
delete (vlog .filesMap , fid )
}
}
if vlog .opt .ReadOnly {
return nil
}
last , ok := vlog .filesMap [vlog .maxFid ]
y .AssertTrue (ok )
lastOff , err := last .iterate (vlog .opt .ReadOnly , vlogHeaderSize ,
func (_ Entry , vp valuePointer ) error {
return nil
})
if err != nil {
return y .Wrapf (err , "while iterating over: %s" , last .path )
}
if err := last .Truncate (int64 (lastOff )); err != nil {
return y .Wrapf (err , "while truncating last value log file: %s" , last .path )
}
if _ , err := vlog .createVlogFile (); err != nil {
return y .Wrapf (err , "Error while creating log file in valueLog.open" )
}
return nil
}
func (vlog *valueLog ) Close () error {
if vlog == nil || vlog .db == nil || vlog .db .opt .InMemory {
return nil
}
vlog .opt .Debugf ("Stopping garbage collection of values." )
var err error
for id , lf := range vlog .filesMap {
lf .lock .Lock ()
offset := int64 (-1 )
if !vlog .opt .ReadOnly && id == vlog .maxFid {
offset = int64 (vlog .woffset ())
}
if terr := lf .Close (offset ); terr != nil && err == nil {
err = terr
}
}
if vlog .discardStats != nil {
vlog .db .captureDiscardStats ()
if terr := vlog .discardStats .Close (-1 ); terr != nil && err == nil {
err = terr
}
}
return err
}
func (vlog *valueLog ) sortedFids () []uint32 {
toBeDeleted := make (map [uint32 ]struct {})
for _ , fid := range vlog .filesToBeDeleted {
toBeDeleted [fid ] = struct {}{}
}
ret := make ([]uint32 , 0 , len (vlog .filesMap ))
for fid := range vlog .filesMap {
if _ , ok := toBeDeleted [fid ]; !ok {
ret = append (ret , fid )
}
}
sort .Slice (ret , func (i , j int ) bool {
return ret [i ] < ret [j ]
})
return ret
}
type request struct {
Entries []*Entry
Ptrs []valuePointer
Wg sync .WaitGroup
Err error
ref atomic .Int32
}
func (req *request ) reset () {
req .Entries = req .Entries [:0 ]
req .Ptrs = req .Ptrs [:0 ]
req .Wg = sync .WaitGroup {}
req .Err = nil
req .ref .Store (0 )
}
func (req *request ) IncrRef () {
req .ref .Add (1 )
}
func (req *request ) DecrRef () {
nRef := req .ref .Add (-1 )
if nRef > 0 {
return
}
req .Entries = nil
requestPool .Put (req )
}
func (req *request ) Wait () error {
req .Wg .Wait ()
err := req .Err
req .DecrRef ()
return err
}
type requests []*request
func (reqs requests ) DecrRef () {
for _ , req := range reqs {
req .DecrRef ()
}
}
func (reqs requests ) IncrRef () {
for _ , req := range reqs {
req .IncrRef ()
}
}
func (vlog *valueLog ) sync () error {
if vlog .opt .SyncWrites || vlog .opt .InMemory {
return nil
}
vlog .filesLock .RLock ()
maxFid := vlog .maxFid
curlf := vlog .filesMap [maxFid ]
if curlf == nil {
vlog .filesLock .RUnlock ()
return nil
}
curlf .lock .RLock ()
vlog .filesLock .RUnlock ()
err := curlf .Sync ()
curlf .lock .RUnlock ()
return err
}
func (vlog *valueLog ) woffset () uint32 {
return vlog .writableLogOffset .Load ()
}
func (vlog *valueLog ) validateWrites (reqs []*request ) error {
vlogOffset := uint64 (vlog .woffset ())
for _ , req := range reqs {
size := estimateRequestSize (req )
estimatedVlogOffset := vlogOffset + size
if estimatedVlogOffset > uint64 (maxVlogFileSize ) {
return fmt .Errorf ("Request size offset %d is bigger than maximum offset %d" ,
estimatedVlogOffset , maxVlogFileSize )
}
if estimatedVlogOffset >= uint64 (vlog .opt .ValueLogFileSize ) {
vlogOffset = 0
continue
}
vlogOffset = estimatedVlogOffset
}
return nil
}
func estimateRequestSize(req *request ) uint64 {
size := uint64 (0 )
for _ , e := range req .Entries {
size += uint64 (maxHeaderSize + len (e .Key ) + len (e .Value ) + crc32 .Size )
}
return size
}
func (vlog *valueLog ) write (reqs []*request ) error {
if vlog .db .opt .InMemory {
return nil
}
if err := vlog .validateWrites (reqs ); err != nil {
return y .Wrapf (err , "while validating writes" )
}
vlog .filesLock .RLock ()
maxFid := vlog .maxFid
curlf := vlog .filesMap [maxFid ]
vlog .filesLock .RUnlock ()
defer func () {
if vlog .opt .SyncWrites {
if err := curlf .Sync (); err != nil {
vlog .opt .Errorf ("Error while curlf sync: %v\n" , err )
}
}
}()
write := func (buf *bytes .Buffer ) error {
if buf .Len () == 0 {
return nil
}
n := uint32 (buf .Len ())
endOffset := vlog .writableLogOffset .Add (n )
if int (endOffset ) >= len (curlf .Data ) {
if err := curlf .Truncate (int64 (endOffset )); err != nil {
return err
}
}
start := int (endOffset - n )
y .AssertTrue (copy (curlf .Data [start :], buf .Bytes ()) == int (n ))
curlf .size .Store (endOffset )
return nil
}
toDisk := func () error {
if vlog .woffset () > uint32 (vlog .opt .ValueLogFileSize ) ||
vlog .numEntriesWritten > vlog .opt .ValueLogMaxEntries {
if err := curlf .doneWriting (vlog .woffset ()); err != nil {
return err
}
newlf , err := vlog .createVlogFile ()
if err != nil {
return err
}
curlf = newlf
}
return nil
}
buf := new (bytes .Buffer )
for i := range reqs {
b := reqs [i ]
b .Ptrs = b .Ptrs [:0 ]
var written , bytesWritten int
valueSizes := make ([]int64 , 0 , len (b .Entries ))
for j := range b .Entries {
buf .Reset ()
e := b .Entries [j ]
valueSizes = append (valueSizes , int64 (len (e .Value )))
if e .skipVlogAndSetThreshold (vlog .db .valueThreshold ()) {
b .Ptrs = append (b .Ptrs , valuePointer {})
continue
}
var p valuePointer
p .Fid = curlf .fid
p .Offset = vlog .woffset ()
tmpMeta := e .meta
e .meta = e .meta &^ (bitTxn | bitFinTxn )
plen , err := curlf .encodeEntry (buf , e , p .Offset )
if err != nil {
return err
}
e .meta = tmpMeta
p .Len = uint32 (plen )
b .Ptrs = append (b .Ptrs , p )
if err := write (buf ); err != nil {
return err
}
written ++
bytesWritten += buf .Len ()
}
y .NumWritesVlogAdd (vlog .opt .MetricsEnabled , int64 (written ))
y .NumBytesWrittenVlogAdd (vlog .opt .MetricsEnabled , int64 (bytesWritten ))
vlog .numEntriesWritten += uint32 (written )
vlog .db .threshold .update (valueSizes )
if err := toDisk (); err != nil {
return err
}
}
return toDisk ()
}
func (vlog *valueLog ) getFileRLocked (vp valuePointer ) (*logFile , error ) {
vlog .filesLock .RLock ()
defer vlog .filesLock .RUnlock ()
ret , ok := vlog .filesMap [vp .Fid ]
if !ok {
return nil , fmt .Errorf ("file with ID: %d not found" , vp .Fid )
}
maxFid := vlog .maxFid
if !vlog .opt .ReadOnly && vp .Fid == maxFid {
currentOffset := vlog .woffset ()
if vp .Offset >= currentOffset {
return nil , fmt .Errorf (
"Invalid value pointer offset: %d greater than current offset: %d" ,
vp .Offset , currentOffset )
}
}
ret .lock .RLock ()
return ret , nil
}
func (vlog *valueLog ) Read (vp valuePointer , _ *y .Slice ) ([]byte , func (), error ) {
buf , lf , err := vlog .readValueBytes (vp )
cb := vlog .getUnlockCallback (lf )
if err != nil {
return nil , cb , err
}
if vlog .opt .VerifyValueChecksum {
hash := crc32 .New (y .CastagnoliCrcTable )
if _ , err := hash .Write (buf [:len (buf )-crc32 .Size ]); err != nil {
runCallback (cb )
return nil , nil , y .Wrapf (err , "failed to write hash for vp %+v" , vp )
}
checksum := buf [len (buf )-crc32 .Size :]
if hash .Sum32 () != y .BytesToU32 (checksum ) {
runCallback (cb )
return nil , nil , y .Wrapf (y .ErrChecksumMismatch , "value corrupted for vp: %+v" , vp )
}
}
var h header
headerLen := h .Decode (buf )
kv := buf [headerLen :]
if lf .encryptionEnabled () {
kv , err = lf .decryptKV (kv , vp .Offset )
if err != nil {
return nil , cb , err
}
}
if uint32 (len (kv )) < h .klen +h .vlen {
vlog .db .opt .Errorf ("Invalid read: vp: %+v" , vp )
return nil , nil , fmt .Errorf ("Invalid read: Len: %d read at:[%d:%d]" ,
len (kv ), h .klen , h .klen +h .vlen )
}
return kv [h .klen : h .klen +h .vlen ], cb , nil
}
func (vlog *valueLog ) getUnlockCallback (lf *logFile ) func () {
if lf == nil {
return nil
}
return lf .lock .RUnlock
}
func (vlog *valueLog ) readValueBytes (vp valuePointer ) ([]byte , *logFile , error ) {
lf , err := vlog .getFileRLocked (vp )
if err != nil {
return nil , nil , err
}
buf , err := lf .read (vp )
y .NumReadsVlogAdd (vlog .db .opt .MetricsEnabled , 1 )
y .NumBytesReadsVlogAdd (vlog .db .opt .MetricsEnabled , int64 (len (buf )))
return buf , lf , err
}
func (vlog *valueLog ) pickLog (discardRatio float64 ) *logFile {
vlog .filesLock .RLock ()
defer vlog .filesLock .RUnlock ()
LOOP :
fid , discard := vlog .discardStats .MaxDiscard ()
if fid == 0 {
vlog .opt .Debugf ("No file with discard stats" )
return nil
}
lf , ok := vlog .filesMap [fid ]
if !ok {
vlog .discardStats .Update (fid , -1 )
goto LOOP
}
fi , err := lf .Fd .Stat ()
if err != nil {
vlog .opt .Errorf ("Unable to get stats for value log fid: %d err: %+v" , fi , err )
return nil
}
if thr := discardRatio * float64 (fi .Size ()); float64 (discard ) < thr {
vlog .opt .Debugf ("Discard: %d less than threshold: %.0f for file: %s" ,
discard , thr , fi .Name ())
return nil
}
if fid < vlog .maxFid {
vlog .opt .Infof ("Found value log max discard fid: %d discard: %d\n" , fid , discard )
lf , ok := vlog .filesMap [fid ]
y .AssertTrue (ok )
return lf
}
return nil
}
func discardEntry(e Entry , vs y .ValueStruct , db *DB ) bool {
if vs .Version != y .ParseTs (e .Key ) {
return true
}
if isDeletedOrExpired (vs .Meta , vs .ExpiresAt ) {
return true
}
if (vs .Meta & bitValuePointer ) == 0 {
return true
}
if (vs .Meta & bitFinTxn ) > 0 {
return true
}
return false
}
func (vlog *valueLog ) doRunGC (lf *logFile ) error {
_ , span := otel .Tracer ("" ).Start (context .TODO (), "Badger.GC" )
span .SetAttributes (attribute .String ("GC rewrite for" , lf .path ))
defer span .End ()
if err := vlog .rewrite (lf ); err != nil {
return err
}
vlog .discardStats .Update (lf .fid , -1 )
return nil
}
func (vlog *valueLog ) waitOnGC (lc *z .Closer ) {
defer lc .Done ()
<-lc .HasBeenClosed ()
vlog .garbageCh <- struct {}{}
}
func (vlog *valueLog ) runGC (discardRatio float64 ) error {
select {
case vlog .garbageCh <- struct {}{}:
defer func () {
<-vlog .garbageCh
}()
lf := vlog .pickLog (discardRatio )
if lf == nil {
return ErrNoRewrite
}
return vlog .doRunGC (lf )
default :
return ErrRejected
}
}
func (vlog *valueLog ) updateDiscardStats (stats map [uint32 ]int64 ) {
if vlog .opt .InMemory {
return
}
for fid , discard := range stats {
vlog .discardStats .Update (fid , discard )
}
vlog .db .logToSyncChan (updateDiscardStatsMsg )
}
type vlogThreshold struct {
logger Logger
percentile float64
valueThreshold atomic .Int64
valueCh chan []int64
clearCh chan bool
closer *z .Closer
vlMetrics *z .HistogramData
}
func initVlogThreshold(opt *Options ) *vlogThreshold {
getBounds := func () []float64 {
mxbd := opt .maxValueThreshold
mnbd := float64 (opt .ValueThreshold )
y .AssertTruef (mxbd >= mnbd , "maximum threshold bound is less than the min threshold" )
size := math .Min (mxbd -mnbd +1 , 1024.0 )
bdstp := (mxbd - mnbd ) / size
bounds := make ([]float64 , int64 (size ))
for i := range bounds {
if i == 0 {
bounds [0 ] = mnbd
continue
}
if i == int (size -1 ) {
bounds [i ] = mxbd
continue
}
bounds [i ] = bounds [i -1 ] + bdstp
}
return bounds
}
lt := &vlogThreshold {
logger : opt .Logger ,
percentile : opt .VLogPercentile ,
valueCh : make (chan []int64 , 1000 ),
clearCh : make (chan bool , 1 ),
closer : z .NewCloser (1 ),
vlMetrics : z .NewHistogramData (getBounds ()),
}
lt .valueThreshold .Store (opt .ValueThreshold )
return lt
}
func (v *vlogThreshold ) Clear (opt Options ) {
v .valueThreshold .Store (opt .ValueThreshold )
v .clearCh <- true
}
func (v *vlogThreshold ) update (sizes []int64 ) {
v .valueCh <- sizes
}
func (v *vlogThreshold ) close () {
v .closer .SignalAndWait ()
}
func (v *vlogThreshold ) listenForValueThresholdUpdate () {
defer v .closer .Done ()
for {
select {
case <- v .closer .HasBeenClosed ():
return
case val := <- v .valueCh :
for _ , e := range val {
v .vlMetrics .Update (e )
}
p := int64 (v .vlMetrics .Percentile (v .percentile ))
if v .valueThreshold .Load () != p {
if v .logger != nil {
v .logger .Infof ("updating value of threshold to: %d" , p )
}
v .valueThreshold .Store (p )
}
case <- v .clearCh :
v .vlMetrics .Clear ()
}
}
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .