package table
import (
"bytes"
"crypto/aes"
"encoding/binary"
"errors"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/klauspost/compress/snappy"
"github.com/klauspost/compress/zstd"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/fb"
"github.com/dgraph-io/badger/v4/options"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2"
"github.com/dgraph-io/ristretto/v2/z"
)
const fileSuffix = ".sst"
const intSize = int (unsafe .Sizeof (int (0 )))
type Options struct {
ReadOnly bool
MetricsEnabled bool
TableSize uint64
tableCapacity uint64
ChkMode options .ChecksumVerificationMode
BloomFalsePositive float64
BlockSize int
DataKey *pb .DataKey
Compression options .CompressionType
BlockCache *ristretto .Cache [[]byte , *Block ]
IndexCache *ristretto .Cache [uint64 , *fb .TableIndex ]
AllocPool *z .AllocatorPool
ZSTDCompressionLevel int
}
type TableInterface interface {
Smallest () []byte
Biggest () []byte
DoesNotHave (hash uint32 ) bool
MaxVersion () uint64
}
type Table struct {
sync .Mutex
*z .MmapFile
tableSize int
_index *fb .TableIndex
_cheap *cheapIndex
ref atomic .Int32
smallest, biggest []byte
id uint64
Checksum []byte
CreatedAt time .Time
indexStart int
indexLen int
hasBloomFilter bool
IsInmemory bool
opt *Options
}
type cheapIndex struct {
MaxVersion uint64
KeyCount uint32
UncompressedSize uint32
OnDiskSize uint32
BloomFilterLength int
OffsetsLength int
}
func (t *Table ) cheapIndex () *cheapIndex {
return t ._cheap
}
func (t *Table ) offsetsLength () int { return t .cheapIndex ().OffsetsLength }
func (t *Table ) MaxVersion () uint64 { return t .cheapIndex ().MaxVersion }
func (t *Table ) BloomFilterSize () int { return t .cheapIndex ().BloomFilterLength }
func (t *Table ) UncompressedSize () uint32 { return t .cheapIndex ().UncompressedSize }
func (t *Table ) KeyCount () uint32 { return t .cheapIndex ().KeyCount }
func (t *Table ) OnDiskSize () uint32 { return t .cheapIndex ().OnDiskSize }
func (t *Table ) CompressionType () options .CompressionType {
return t .opt .Compression
}
func (t *Table ) IncrRef () {
t .ref .Add (1 )
}
func (t *Table ) DecrRef () error {
newRef := t .ref .Add (-1 )
if newRef == 0 {
for i := 0 ; i < t .offsetsLength (); i ++ {
t .opt .BlockCache .Del (t .blockCacheKey (i ))
}
if err := t .Delete (); err != nil {
return err
}
}
return nil
}
func BlockEvictHandler (b *Block ) {
b .decrRef ()
}
type Block struct {
offset int
data []byte
checksum []byte
entriesIndexStart int
entryOffsets []uint32
chkLen int
freeMe bool
ref atomic .Int32
}
var NumBlocks atomic .Int32
func (b *Block ) incrRef () bool {
for {
ref := b .ref .Load ()
if ref == 0 {
return false
}
if b .ref .CompareAndSwap (ref , ref +1 ) {
return true
}
}
}
func (b *Block ) decrRef () {
if b == nil {
return
}
if b .ref .Add (-1 ) == 0 {
if b .freeMe {
z .Free (b .data )
}
NumBlocks .Add (-1 )
}
y .AssertTrue (b .ref .Load () >= 0 )
}
func (b *Block ) size () int64 {
return int64 (3 *intSize +
cap (b .data ) + cap (b .checksum ) + cap (b .entryOffsets )*4 )
}
func (b *Block ) verifyCheckSum () error {
cs := &pb .Checksum {}
if err := proto .Unmarshal (b .checksum , cs ); err != nil {
return y .Wrapf (err , "unable to unmarshal checksum for block" )
}
return y .VerifyChecksum (b .data , cs )
}
func CreateTable (fname string , builder *Builder ) (*Table , error ) {
bd := builder .Done ()
mf , err := z .OpenMmapFile (fname , os .O_CREATE |os .O_RDWR |os .O_EXCL , bd .Size )
if err == z .NewFile {
} else if err != nil {
return nil , y .Wrapf (err , "while creating table: %s" , fname )
} else {
return nil , fmt .Errorf ("file already exists: %s" , fname )
}
written := bd .Copy (mf .Data )
y .AssertTrue (written == len (mf .Data ))
if err := z .Msync (mf .Data ); err != nil {
return nil , y .Wrapf (err , "while calling msync on %s" , fname )
}
return OpenTable (mf , *builder .opts )
}
func OpenTable (mf *z .MmapFile , opts Options ) (*Table , error ) {
if opts .BlockSize == 0 && opts .Compression != options .None {
return nil , errors .New ("Block size cannot be zero" )
}
fileInfo , err := mf .Fd .Stat ()
if err != nil {
mf .Close (-1 )
return nil , y .Wrap (err , "" )
}
filename := fileInfo .Name ()
id , ok := ParseFileID (filename )
if !ok {
mf .Close (-1 )
return nil , fmt .Errorf ("Invalid filename: %s" , filename )
}
t := &Table {
MmapFile : mf ,
id : id ,
opt : &opts ,
IsInmemory : false ,
tableSize : int (fileInfo .Size ()),
CreatedAt : fileInfo .ModTime (),
}
t .ref .Store (1 )
if err := t .initBiggestAndSmallest (); err != nil {
return nil , y .Wrapf (err , "failed to initialize table" )
}
if opts .ChkMode == options .OnTableRead || opts .ChkMode == options .OnTableAndBlockRead {
if err := t .VerifyChecksum (); err != nil {
mf .Close (-1 )
return nil , y .Wrapf (err , "failed to verify checksum" )
}
}
return t , nil
}
func OpenInMemoryTable (data []byte , id uint64 , opt *Options ) (*Table , error ) {
mf := &z .MmapFile {
Data : data ,
Fd : nil ,
}
t := &Table {
MmapFile : mf ,
opt : opt ,
tableSize : len (data ),
IsInmemory : true ,
id : id ,
}
t .ref .Store (1 )
if err := t .initBiggestAndSmallest (); err != nil {
return nil , err
}
return t , nil
}
func (t *Table ) initBiggestAndSmallest () error {
defer func () {
if r := recover (); r != nil {
var debugBuf bytes .Buffer
defer func () {
panic (fmt .Sprintf ("%s\n== Recovered ==\n" , debugBuf .String ()))
}()
count := 0
for i := len (t .Data ) - 1 ; i >= 0 ; i -- {
if t .Data [i ] != 0 {
break
}
count ++
}
fmt .Fprintf (&debugBuf , "\n== Recovering from initIndex crash ==\n" )
fmt .Fprintf (&debugBuf , "File Info: [ID: %d, Size: %d, Zeros: %d]\n" ,
t .id , t .tableSize , count )
fmt .Fprintf (&debugBuf , "isEnrypted: %v " , t .shouldDecrypt ())
readPos := t .tableSize
readPos -= 4
buf := t .readNoFail (readPos , 4 )
checksumLen := int (y .BytesToU32 (buf ))
fmt .Fprintf (&debugBuf , "checksumLen: %d " , checksumLen )
checksum := &pb .Checksum {}
readPos -= checksumLen
buf = t .readNoFail (readPos , checksumLen )
_ = proto .Unmarshal (buf , checksum )
fmt .Fprintf (&debugBuf , "checksum: %+v " , checksum )
readPos -= 4
buf = t .readNoFail (readPos , 4 )
indexLen := int (y .BytesToU32 (buf ))
fmt .Fprintf (&debugBuf , "indexLen: %d " , indexLen )
readPos -= t .indexLen
t .indexStart = readPos
indexData := t .readNoFail (readPos , t .indexLen )
fmt .Fprintf (&debugBuf , "index: %v " , indexData )
}
}()
var err error
var ko *fb .BlockOffset
if ko , err = t .initIndex (); err != nil {
return y .Wrapf (err , "failed to read index." )
}
t .smallest = y .Copy (ko .KeyBytes ())
it2 := t .NewIterator (REVERSED | NOCACHE )
defer it2 .Close ()
it2 .Rewind ()
if !it2 .Valid () {
return y .Wrapf (it2 .err , "failed to initialize biggest for table %s" , t .Filename ())
}
t .biggest = y .Copy (it2 .Key ())
return nil
}
func (t *Table ) read (off , sz int ) ([]byte , error ) {
return t .Bytes (off , sz )
}
func (t *Table ) readNoFail (off , sz int ) []byte {
res , err := t .read (off , sz )
y .Check (err )
return res
}
func (t *Table ) initIndex () (*fb .BlockOffset , error ) {
readPos := t .tableSize
readPos -= 4
buf := t .readNoFail (readPos , 4 )
checksumLen := int (y .BytesToU32 (buf ))
if checksumLen < 0 {
return nil , errors .New ("checksum length less than zero. Data corrupted" )
}
expectedChk := &pb .Checksum {}
readPos -= checksumLen
buf = t .readNoFail (readPos , checksumLen )
if err := proto .Unmarshal (buf , expectedChk ); err != nil {
return nil , err
}
readPos -= 4
buf = t .readNoFail (readPos , 4 )
t .indexLen = int (y .BytesToU32 (buf ))
readPos -= t .indexLen
t .indexStart = readPos
data := t .readNoFail (readPos , t .indexLen )
if err := y .VerifyChecksum (data , expectedChk ); err != nil {
return nil , y .Wrapf (err , "failed to verify checksum for table: %s" , t .Filename ())
}
index , err := t .readTableIndex ()
if err != nil {
return nil , err
}
if !t .shouldDecrypt () {
t ._index = index
}
t ._cheap = &cheapIndex {
MaxVersion : index .MaxVersion (),
KeyCount : index .KeyCount (),
UncompressedSize : index .UncompressedSize (),
OnDiskSize : index .OnDiskSize (),
OffsetsLength : index .OffsetsLength (),
BloomFilterLength : index .BloomFilterLength (),
}
t .hasBloomFilter = len (index .BloomFilterBytes ()) > 0
var bo fb .BlockOffset
y .AssertTrue (index .Offsets (&bo , 0 ))
return &bo , nil
}
func (t *Table ) KeySplits (n int , prefix []byte ) []string {
if n == 0 {
return nil
}
oLen := t .offsetsLength ()
jump := oLen / n
if jump == 0 {
jump = 1
}
var bo fb .BlockOffset
var res []string
for i := 0 ; i < oLen ; i += jump {
if i >= oLen {
i = oLen - 1
}
y .AssertTrue (t .offsets (&bo , i ))
if bytes .HasPrefix (bo .KeyBytes (), prefix ) {
res = append (res , string (bo .KeyBytes ()))
}
}
return res
}
func (t *Table ) fetchIndex () *fb .TableIndex {
if !t .shouldDecrypt () {
return t ._index
}
if t .opt .IndexCache == nil {
panic ("Index Cache must be set for encrypted workloads" )
}
if val , ok := t .opt .IndexCache .Get (t .indexKey ()); ok && val != nil {
return val
}
index , err := t .readTableIndex ()
y .Check (err )
t .opt .IndexCache .Set (t .indexKey (), index , int64 (t .indexLen ))
return index
}
func (t *Table ) offsets (ko *fb .BlockOffset , i int ) bool {
return t .fetchIndex ().Offsets (ko , i )
}
func (t *Table ) block (idx int , useCache bool ) (*Block , error ) {
y .AssertTruef (idx >= 0 , "idx=%d" , idx )
if idx >= t .offsetsLength () {
return nil , errors .New ("block out of index" )
}
if t .opt .BlockCache != nil {
key := t .blockCacheKey (idx )
blk , ok := t .opt .BlockCache .Get (key )
if ok && blk != nil {
if blk .incrRef () {
return blk , nil
}
}
}
var ko fb .BlockOffset
y .AssertTrue (t .offsets (&ko , idx ))
blk := &Block {offset : int (ko .Offset ())}
blk .ref .Store (1 )
defer blk .decrRef ()
NumBlocks .Add (1 )
var err error
if blk .data , err = t .read (blk .offset , int (ko .Len ())); err != nil {
return nil , y .Wrapf (err ,
"failed to read from file: %s at offset: %d, len: %d" ,
t .Fd .Name (), blk .offset , ko .Len ())
}
if t .shouldDecrypt () {
if blk .data , err = t .decrypt (blk .data , true ); err != nil {
return nil , err
}
blk .freeMe = true
}
if err = t .decompress (blk ); err != nil {
return nil , y .Wrapf (err ,
"failed to decode compressed data in file: %s at offset: %d, len: %d" ,
t .Fd .Name (), blk .offset , ko .Len ())
}
readPos := len (blk .data ) - 4
blk .chkLen = int (y .BytesToU32 (blk .data [readPos : readPos +4 ]))
if blk .chkLen > len (blk .data ) {
return nil , errors .New ("invalid checksum length. Either the data is " +
"corrupted or the table options are incorrectly set" )
}
readPos -= blk .chkLen
blk .checksum = blk .data [readPos : readPos +blk .chkLen ]
readPos -= 4
numEntries := int (y .BytesToU32 (blk .data [readPos : readPos +4 ]))
entriesIndexStart := readPos - (numEntries * 4 )
entriesIndexEnd := entriesIndexStart + numEntries *4
blk .entryOffsets = y .BytesToU32Slice (blk .data [entriesIndexStart :entriesIndexEnd ])
blk .entriesIndexStart = entriesIndexStart
blk .data = blk .data [:readPos +4 ]
if t .opt .ChkMode == options .OnBlockRead || t .opt .ChkMode == options .OnTableAndBlockRead {
if err = blk .verifyCheckSum (); err != nil {
return nil , err
}
}
blk .incrRef ()
if useCache && t .opt .BlockCache != nil {
key := t .blockCacheKey (idx )
y .AssertTrue (blk .incrRef ())
if !t .opt .BlockCache .Set (key , blk , blk .size ()) {
blk .decrRef ()
}
}
return blk , nil
}
func (t *Table ) blockCacheKey (idx int ) []byte {
y .AssertTrue (t .id < math .MaxUint32 )
y .AssertTrue (uint32 (idx ) < math .MaxUint32 )
buf := make ([]byte , 8 )
binary .BigEndian .PutUint32 (buf [:4 ], uint32 (t .ID ()))
binary .BigEndian .PutUint32 (buf [4 :], uint32 (idx ))
return buf
}
func (t *Table ) indexKey () uint64 {
return t .id
}
func (t *Table ) IndexSize () int {
return t .indexLen
}
func (t *Table ) Size () int64 { return int64 (t .tableSize ) }
func (t *Table ) StaleDataSize () uint32 { return t .fetchIndex ().StaleDataSize () }
func (t *Table ) Smallest () []byte { return t .smallest }
func (t *Table ) Biggest () []byte { return t .biggest }
func (t *Table ) Filename () string { return t .Fd .Name () }
func (t *Table ) ID () uint64 { return t .id }
func (t *Table ) DoesNotHave (hash uint32 ) bool {
if !t .hasBloomFilter {
return false
}
y .NumLSMBloomHitsAdd (t .opt .MetricsEnabled , "DoesNotHave_ALL" , 1 )
index := t .fetchIndex ()
bf := index .BloomFilterBytes ()
mayContain := y .Filter (bf ).MayContain (hash )
if !mayContain {
y .NumLSMBloomHitsAdd (t .opt .MetricsEnabled , "DoesNotHave_HIT" , 1 )
}
return !mayContain
}
func (t *Table ) readTableIndex () (*fb .TableIndex , error ) {
data := t .readNoFail (t .indexStart , t .indexLen )
var err error
if t .shouldDecrypt () {
if data , err = t .decrypt (data , false ); err != nil {
return nil , y .Wrapf (err ,
"Error while decrypting table index for the table %d in readTableIndex" , t .id )
}
}
return fb .GetRootAsTableIndex (data , 0 ), nil
}
func (t *Table ) VerifyChecksum () error {
ti := t .fetchIndex ()
for i := 0 ; i < ti .OffsetsLength (); i ++ {
b , err := t .block (i , true )
if err != nil {
return y .Wrapf (err , "checksum validation failed for table: %s, block: %d, offset:%d" ,
t .Filename (), i , b .offset )
}
defer b .decrRef ()
if !(t .opt .ChkMode == options .OnBlockRead || t .opt .ChkMode == options .OnTableAndBlockRead ) {
if err = b .verifyCheckSum (); err != nil {
return y .Wrapf (err ,
"checksum validation failed for table: %s, block: %d, offset:%d" ,
t .Filename (), i , b .offset )
}
}
}
return nil
}
func (t *Table ) shouldDecrypt () bool {
return t .opt .DataKey != nil
}
func (t *Table ) KeyID () uint64 {
if t .opt .DataKey != nil {
return t .opt .DataKey .KeyId
}
return 0
}
func (t *Table ) decrypt (data []byte , viaCalloc bool ) ([]byte , error ) {
iv := data [len (data )-aes .BlockSize :]
data = data [:len (data )-aes .BlockSize ]
var dst []byte
if viaCalloc {
dst = z .Calloc (len (data ), "Table.Decrypt" )
} else {
dst = make ([]byte , len (data ))
}
if err := y .XORBlock (dst , data , t .opt .DataKey .Data , iv ); err != nil {
return nil , y .Wrapf (err , "while decrypt" )
}
return dst , nil
}
func ParseFileID (name string ) (uint64 , bool ) {
name = filepath .Base (name )
if !strings .HasSuffix (name , fileSuffix ) {
return 0 , false
}
name = strings .TrimSuffix (name , fileSuffix )
id , err := strconv .Atoi (name )
if err != nil {
return 0 , false
}
y .AssertTrue (id >= 0 )
return uint64 (id ), true
}
func IDToFilename (id uint64 ) string {
return fmt .Sprintf ("%06d" , id ) + fileSuffix
}
func NewFilename (id uint64 , dir string ) string {
return filepath .Join (dir , IDToFilename (id ))
}
func (t *Table ) decompress (b *Block ) error {
var dst []byte
var err error
src := b .data
switch t .opt .Compression {
case options .None :
return nil
case options .Snappy :
if sz , err := snappy .DecodedLen (b .data ); err == nil {
dst = z .Calloc (sz , "Table.Decompress" )
} else {
dst = z .Calloc (len (b .data )*4 , "Table.Decompress" )
}
b .data , err = snappy .Decode (dst , b .data )
if err != nil {
z .Free (dst )
return y .Wrap (err , "failed to decompress" )
}
case options .ZSTD :
sz := int (float64 (t .opt .BlockSize ) * 1.2 )
var hdr zstd .Header
if err := hdr .Decode (b .data ); err == nil && hdr .HasFCS && hdr .FrameContentSize < uint64 (t .opt .BlockSize *2 ) {
sz = int (hdr .FrameContentSize )
}
dst = z .Calloc (sz , "Table.Decompress" )
b .data , err = y .ZSTDDecompress (dst , b .data )
if err != nil {
z .Free (dst )
return y .Wrap (err , "failed to decompress" )
}
default :
return errors .New ("Unsupported compression type" )
}
if b .freeMe {
z .Free (src )
b .freeMe = false
}
if len (b .data ) > 0 && len (dst ) > 0 && &dst [0 ] != &b .data [0 ] {
z .Free (dst )
} else {
b .freeMe = true
}
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .