package badger
import (
"bytes"
"fmt"
"hash/crc32"
"math"
"sort"
"sync"
"time"
"github.com/dgraph-io/badger/v4/table"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
type prefetchStatus uint8
const (
prefetched prefetchStatus = iota + 1
)
type Item struct {
key []byte
vptr []byte
val []byte
version uint64
expiresAt uint64
slice *y .Slice
next *Item
txn *Txn
err error
wg sync .WaitGroup
status prefetchStatus
meta byte
userMeta byte
}
func (item *Item ) String () string {
return fmt .Sprintf ("key=%q, version=%d, meta=%x" , item .Key (), item .Version (), item .meta )
}
func (item *Item ) Key () []byte {
return item .key
}
func (item *Item ) KeyCopy (dst []byte ) []byte {
return y .SafeCopy (dst , item .key )
}
func (item *Item ) Version () uint64 {
return item .version
}
func (item *Item ) Value (fn func (val []byte ) error ) error {
item .wg .Wait ()
if item .status == prefetched {
if item .err == nil && fn != nil {
if err := fn (item .val ); err != nil {
return err
}
}
return item .err
}
buf , cb , err := item .yieldItemValue ()
defer runCallback (cb )
if err != nil {
return err
}
if fn != nil {
return fn (buf )
}
return nil
}
func (item *Item ) ValueCopy (dst []byte ) ([]byte , error ) {
item .wg .Wait ()
if item .status == prefetched {
return y .SafeCopy (dst , item .val ), item .err
}
buf , cb , err := item .yieldItemValue ()
defer runCallback (cb )
return y .SafeCopy (dst , buf ), err
}
func (item *Item ) hasValue () bool {
if item .meta == 0 && item .vptr == nil {
return false
}
return true
}
func (item *Item ) IsDeletedOrExpired () bool {
return isDeletedOrExpired (item .meta , item .expiresAt )
}
func (item *Item ) DiscardEarlierVersions () bool {
return item .meta &bitDiscardEarlierVersions > 0
}
func (item *Item ) yieldItemValue () ([]byte , func (), error ) {
key := item .Key ()
if !item .hasValue () {
return nil , nil , nil
}
if item .slice == nil {
item .slice = new (y .Slice )
}
if (item .meta & bitValuePointer ) == 0 {
val := item .slice .Resize (len (item .vptr ))
copy (val , item .vptr )
return val , nil , nil
}
var vp valuePointer
vp .Decode (item .vptr )
db := item .txn .db
result , cb , err := db .vlog .Read (vp , item .slice )
if err != nil {
db .opt .Errorf ("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v" +
" Error: %v" , key , item .version , item .meta , item .userMeta , err )
var txn *Txn
if db .opt .managedTxns {
txn = db .NewTransactionAt (math .MaxUint64 , false )
} else {
txn = db .NewTransaction (false )
}
defer txn .Discard ()
iopt := DefaultIteratorOptions
iopt .AllVersions = true
iopt .InternalAccess = true
iopt .PrefetchValues = false
it := txn .NewKeyIterator (item .Key (), iopt )
defer it .Close ()
for it .Rewind (); it .Valid (); it .Next () {
item := it .Item ()
var vp valuePointer
if item .meta &bitValuePointer > 0 {
vp .Decode (item .vptr )
}
db .opt .Errorf ("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v" ,
item .Key (), item .version , item .meta , item .userMeta , vp )
}
}
return result , cb , nil
}
func runCallback(cb func ()) {
if cb != nil {
cb ()
}
}
func (item *Item ) prefetchValue () {
val , cb , err := item .yieldItemValue ()
defer runCallback (cb )
item .err = err
item .status = prefetched
if val == nil {
return
}
buf := item .slice .Resize (len (val ))
copy (buf , val )
item .val = buf
}
func (item *Item ) EstimatedSize () int64 {
if !item .hasValue () {
return 0
}
if (item .meta & bitValuePointer ) == 0 {
return int64 (len (item .key ) + len (item .vptr ))
}
var vp valuePointer
vp .Decode (item .vptr )
return int64 (vp .Len )
}
func (item *Item ) KeySize () int64 {
return int64 (len (item .key ))
}
func (item *Item ) ValueSize () int64 {
if !item .hasValue () {
return 0
}
if (item .meta & bitValuePointer ) == 0 {
return int64 (len (item .vptr ))
}
var vp valuePointer
vp .Decode (item .vptr )
klen := int64 (len (item .key ) + 8 )
return int64 (vp .Len ) - klen - 6 - crc32 .Size
}
func (item *Item ) UserMeta () byte {
return item .userMeta
}
func (item *Item ) ExpiresAt () uint64 {
return item .expiresAt
}
type list struct {
head *Item
tail *Item
}
func (l *list ) push (i *Item ) {
i .next = nil
if l .tail == nil {
l .head = i
l .tail = i
return
}
l .tail .next = i
l .tail = i
}
func (l *list ) pop () *Item {
if l .head == nil {
return nil
}
i := l .head
if l .head == l .tail {
l .tail = nil
l .head = nil
} else {
l .head = i .next
}
i .next = nil
return i
}
type IteratorOptions struct {
PrefetchSize int
PrefetchValues bool
Reverse bool
AllVersions bool
InternalAccess bool
prefixIsKey bool
Prefix []byte
SinceTs uint64
}
func (opt *IteratorOptions ) compareToPrefix (key []byte ) int {
key = y .ParseKey (key )
if len (key ) > len (opt .Prefix ) {
key = key [:len (opt .Prefix )]
}
return bytes .Compare (key , opt .Prefix )
}
func (opt *IteratorOptions ) pickTable (t table .TableInterface ) bool {
if t .MaxVersion () < opt .SinceTs {
return false
}
if len (opt .Prefix ) == 0 {
return true
}
if opt .compareToPrefix (t .Smallest ()) > 0 {
return false
}
if opt .compareToPrefix (t .Biggest ()) < 0 {
return false
}
if opt .prefixIsKey && t .DoesNotHave (y .Hash (opt .Prefix )) {
return false
}
return true
}
func (opt *IteratorOptions ) pickTables (all []*table .Table ) []*table .Table {
filterTables := func (tables []*table .Table ) []*table .Table {
if opt .SinceTs > 0 {
tmp := tables [:0 ]
for _ , t := range tables {
if t .MaxVersion () < opt .SinceTs {
continue
}
tmp = append (tmp , t )
}
tables = tmp
}
return tables
}
if len (opt .Prefix ) == 0 {
out := make ([]*table .Table , len (all ))
copy (out , all )
return filterTables (out )
}
sIdx := sort .Search (len (all ), func (i int ) bool {
return opt .compareToPrefix (all [i ].Biggest ()) >= 0
})
if sIdx == len (all ) {
return []*table .Table {}
}
filtered := all [sIdx :]
if !opt .prefixIsKey {
eIdx := sort .Search (len (filtered ), func (i int ) bool {
return opt .compareToPrefix (filtered [i ].Smallest ()) > 0
})
out := make ([]*table .Table , len (filtered [:eIdx ]))
copy (out , filtered [:eIdx ])
return filterTables (out )
}
var out []*table .Table
hash := y .Hash (opt .Prefix )
for _ , t := range filtered {
if opt .compareToPrefix (t .Smallest ()) > 0 {
break
}
if t .DoesNotHave (hash ) {
continue
}
out = append (out , t )
}
return filterTables (out )
}
var DefaultIteratorOptions = IteratorOptions {
PrefetchValues : true ,
PrefetchSize : 100 ,
Reverse : false ,
AllVersions : false ,
}
type Iterator struct {
iitr y .Iterator
txn *Txn
readTs uint64
opt IteratorOptions
item *Item
data list
waste list
lastKey []byte
closed bool
scanned int
ThreadId int
Alloc *z .Allocator
}
func (txn *Txn ) NewIterator (opt IteratorOptions ) *Iterator {
if txn .discarded {
panic (ErrDiscardedTxn )
}
if txn .db .IsClosed () {
panic (ErrDBClosed )
}
y .NumIteratorsCreatedAdd (txn .db .opt .MetricsEnabled , 1 )
txn .numIterators .Add (1 )
tables , decr := txn .db .getMemTables ()
defer decr ()
txn .db .vlog .incrIteratorCount ()
var iters []y .Iterator
if itr := txn .newPendingWritesIterator (opt .Reverse ); itr != nil {
iters = append (iters , itr )
}
for i := 0 ; i < len (tables ); i ++ {
iters = append (iters , tables [i ].sl .NewUniIterator (opt .Reverse ))
}
iters = txn .db .lc .appendIterators (iters , &opt )
res := &Iterator {
txn : txn ,
iitr : table .NewMergeIterator (iters , opt .Reverse ),
opt : opt ,
readTs : txn .readTs ,
}
return res
}
func (txn *Txn ) NewKeyIterator (key []byte , opt IteratorOptions ) *Iterator {
if len (opt .Prefix ) > 0 {
panic ("opt.Prefix should be nil for NewKeyIterator." )
}
opt .Prefix = key
opt .prefixIsKey = true
opt .AllVersions = true
return txn .NewIterator (opt )
}
func (it *Iterator ) newItem () *Item {
item := it .waste .pop ()
if item == nil {
item = &Item {slice : new (y .Slice ), txn : it .txn }
}
return item
}
func (it *Iterator ) Item () *Item {
tx := it .txn
tx .addReadKey (it .item .Key ())
return it .item
}
func (it *Iterator ) Valid () bool {
if it .item == nil {
return false
}
if it .opt .prefixIsKey {
return bytes .Equal (it .item .key , it .opt .Prefix )
}
return bytes .HasPrefix (it .item .key , it .opt .Prefix )
}
func (it *Iterator ) ValidForPrefix (prefix []byte ) bool {
return it .Valid () && bytes .HasPrefix (it .item .key , prefix )
}
func (it *Iterator ) Close () {
if it .closed {
return
}
it .closed = true
if it .iitr == nil {
it .txn .numIterators .Add (-1 )
return
}
it .iitr .Close ()
waitFor := func (l list ) {
item := l .pop ()
for item != nil {
item .wg .Wait ()
item = l .pop ()
}
}
waitFor (it .waste )
waitFor (it .data )
_ = it .txn .db .vlog .decrIteratorCount ()
it .txn .numIterators .Add (-1 )
}
func (it *Iterator ) Next () {
if it .iitr == nil {
return
}
it .item .wg .Wait ()
it .scanned += len (it .item .key ) + len (it .item .val ) + len (it .item .vptr ) + 2
it .waste .push (it .item )
it .item = it .data .pop ()
for it .iitr .Valid () && hasPrefix (it ) {
if it .parseItem () {
break
}
}
}
func isDeletedOrExpired(meta byte , expiresAt uint64 ) bool {
if meta &bitDelete > 0 {
return true
}
if expiresAt == 0 {
return false
}
return expiresAt <= uint64 (time .Now ().Unix ())
}
func (it *Iterator ) parseItem () bool {
mi := it .iitr
key := mi .Key ()
setItem := func (item *Item ) {
if it .item == nil {
it .item = item
} else {
it .data .push (item )
}
}
isInternalKey := bytes .HasPrefix (key , badgerPrefix )
if !it .opt .InternalAccess && isInternalKey {
mi .Next ()
return false
}
version := y .ParseTs (key )
if version > it .readTs || (it .opt .SinceTs > 0 && version <= it .opt .SinceTs ) {
mi .Next ()
return false
}
if !isInternalKey && it .txn .db .isBanned (key ) != nil {
mi .Next ()
return false
}
if it .opt .AllVersions {
item := it .newItem ()
it .fill (item )
setItem (item )
mi .Next ()
return true
}
if !it .opt .Reverse {
if y .SameKey (it .lastKey , key ) {
mi .Next ()
return false
}
it .lastKey = y .SafeCopy (it .lastKey , mi .Key ())
}
FILL :
vs := mi .Value ()
if isDeletedOrExpired (vs .Meta , vs .ExpiresAt ) {
mi .Next ()
return false
}
item := it .newItem ()
it .fill (item )
mi .Next ()
if !it .opt .Reverse || !mi .Valid () {
setItem (item )
return true
}
nextTs := y .ParseTs (mi .Key ())
mik := y .ParseKey (mi .Key ())
if nextTs <= it .readTs && bytes .Equal (mik , item .key ) {
goto FILL
}
setItem (item )
return true
}
func (it *Iterator ) fill (item *Item ) {
vs := it .iitr .Value ()
item .meta = vs .Meta
item .userMeta = vs .UserMeta
item .expiresAt = vs .ExpiresAt
item .version = y .ParseTs (it .iitr .Key ())
item .key = y .SafeCopy (item .key , y .ParseKey (it .iitr .Key ()))
item .vptr = y .SafeCopy (item .vptr , vs .Value )
item .val = nil
if it .opt .PrefetchValues {
item .wg .Add (1 )
go func () {
item .prefetchValue ()
item .wg .Done ()
}()
}
}
func hasPrefix(it *Iterator ) bool {
if !it .opt .Reverse && len (it .opt .Prefix ) > 0 {
return bytes .HasPrefix (y .ParseKey (it .iitr .Key ()), it .opt .Prefix )
}
return true
}
func (it *Iterator ) prefetch () {
prefetchSize := 2
if it .opt .PrefetchValues && it .opt .PrefetchSize > 1 {
prefetchSize = it .opt .PrefetchSize
}
i := it .iitr
var count int
it .item = nil
for i .Valid () && hasPrefix (it ) {
if !it .parseItem () {
continue
}
count ++
if count == prefetchSize {
break
}
}
}
func (it *Iterator ) Seek (key []byte ) {
if it .iitr == nil {
return
}
if len (key ) > 0 {
it .txn .addReadKey (key )
}
for i := it .data .pop (); i != nil ; i = it .data .pop () {
i .wg .Wait ()
it .waste .push (i )
}
it .lastKey = it .lastKey [:0 ]
if len (key ) == 0 {
key = it .opt .Prefix
}
if len (key ) == 0 {
it .iitr .Rewind ()
it .prefetch ()
return
}
if !it .opt .Reverse {
key = y .KeyWithTs (key , it .txn .readTs )
} else {
key = y .KeyWithTs (key , 0 )
}
it .iitr .Seek (key )
it .prefetch ()
}
func (it *Iterator ) Rewind () {
it .Seek (nil )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .