package table
import (
"crypto/aes"
"errors"
"math"
"runtime"
"sync"
"sync/atomic"
"unsafe"
fbs "github.com/google/flatbuffers/go"
"github.com/klauspost/compress/s2"
"google.golang.org/protobuf/proto"
"github.com/dgraph-io/badger/v4/fb"
"github.com/dgraph-io/badger/v4/options"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
const (
KB = 1024
MB = KB * 1024
padding = 256
)
type header struct {
overlap uint16
diff uint16
}
const headerSize = uint16 (unsafe .Sizeof (header {}))
func (h header ) Encode () []byte {
var b [4 ]byte
*(*header )(unsafe .Pointer (&b [0 ])) = h
return b [:]
}
func (h *header ) Decode (buf []byte ) {
copy (((*[headerSize ]byte )(unsafe .Pointer (h ))[:]), buf [:headerSize ])
}
type bblock struct {
data []byte
baseKey []byte
entryOffsets []uint32
end int
}
type Builder struct {
alloc *z .Allocator
curBlock *bblock
compressedSize atomic .Uint32
uncompressedSize atomic .Uint32
lenOffsets uint32
keyHashes []uint32
opts *Options
maxVersion uint64
onDiskSize uint32
staleDataSize int
wg sync .WaitGroup
blockChan chan *bblock
blockList []*bblock
}
func (b *Builder ) allocate (need int ) []byte {
bb := b .curBlock
if len (bb .data [bb .end :]) < need {
sz := 2 * len (bb .data )
if sz > (1 << 30 ) {
sz = 1 << 30
}
if bb .end +need > sz {
sz = bb .end + need
}
tmp := b .alloc .Allocate (sz )
copy (tmp , bb .data )
bb .data = tmp
}
bb .end += need
return bb .data [bb .end -need : bb .end ]
}
func (b *Builder ) append (data []byte ) {
dst := b .allocate (len (data ))
y .AssertTrue (len (data ) == copy (dst , data ))
}
const maxAllocatorInitialSz = 256 << 20
func NewTableBuilder (opts Options ) *Builder {
sz := 2 * int (opts .TableSize )
if sz > maxAllocatorInitialSz {
sz = maxAllocatorInitialSz
}
b := &Builder {
alloc : opts .AllocPool .Get (sz , "TableBuilder" ),
opts : &opts ,
}
b .alloc .Tag = "Builder"
b .curBlock = &bblock {
data : b .alloc .Allocate (opts .BlockSize + padding ),
}
b .opts .tableCapacity = uint64 (float64 (b .opts .TableSize ) * 0.95 )
if b .opts .Compression == options .None && b .opts .DataKey == nil {
return b
}
count := 2 * runtime .NumCPU ()
b .blockChan = make (chan *bblock , count *2 )
b .wg .Add (count )
for i := 0 ; i < count ; i ++ {
go b .handleBlock ()
}
return b
}
func maxEncodedLen(ctype options .CompressionType , sz int ) int {
switch ctype {
case options .Snappy :
return s2 .MaxEncodedLen (sz )
case options .ZSTD :
return y .ZSTDCompressBound (sz )
}
return sz
}
func (b *Builder ) handleBlock () {
defer b .wg .Done ()
doCompress := b .opts .Compression != options .None
for item := range b .blockChan {
blockBuf := item .data [:item .end ]
if doCompress {
out , err := b .compressData (blockBuf )
y .Check (err )
blockBuf = out
}
if b .shouldEncrypt () {
out , err := b .encrypt (blockBuf )
y .Check (y .Wrapf (err , "Error while encrypting block in table builder." ))
blockBuf = out
}
allocatedSpace := maxEncodedLen (b .opts .Compression , (item .end )) + padding + 1
y .AssertTrue (len (blockBuf ) <= allocatedSpace )
item .data = blockBuf
item .end = len (blockBuf )
b .compressedSize .Add (uint32 (len (blockBuf )))
}
}
func (b *Builder ) Close () {
b .opts .AllocPool .Return (b .alloc )
}
func (b *Builder ) Empty () bool { return len (b .keyHashes ) == 0 }
func (b *Builder ) keyDiff (newKey []byte ) []byte {
var i int
for i = 0 ; i < len (newKey ) && i < len (b .curBlock .baseKey ); i ++ {
if newKey [i ] != b .curBlock .baseKey [i ] {
break
}
}
return newKey [i :]
}
func (b *Builder ) addHelper (key []byte , v y .ValueStruct , vpLen uint32 ) {
b .keyHashes = append (b .keyHashes , y .Hash (y .ParseKey (key )))
if version := y .ParseTs (key ); version > b .maxVersion {
b .maxVersion = version
}
var diffKey []byte
if len (b .curBlock .baseKey ) == 0 {
b .curBlock .baseKey = append (b .curBlock .baseKey [:0 ], key ...)
diffKey = key
} else {
diffKey = b .keyDiff (key )
}
y .AssertTrue (len (key )-len (diffKey ) <= math .MaxUint16 )
y .AssertTrue (len (diffKey ) <= math .MaxUint16 )
h := header {
overlap : uint16 (len (key ) - len (diffKey )),
diff : uint16 (len (diffKey )),
}
b .curBlock .entryOffsets = append (b .curBlock .entryOffsets , uint32 (b .curBlock .end ))
b .append (h .Encode ())
b .append (diffKey )
dst := b .allocate (int (v .EncodedSize ()))
v .Encode (dst )
b .onDiskSize += vpLen
}
func (b *Builder ) finishBlock () {
if len (b .curBlock .entryOffsets ) == 0 {
return
}
b .append (y .U32SliceToBytes (b .curBlock .entryOffsets ))
b .append (y .U32ToBytes (uint32 (len (b .curBlock .entryOffsets ))))
checksum := b .calculateChecksum (b .curBlock .data [:b .curBlock .end ])
b .append (checksum )
b .append (y .U32ToBytes (uint32 (len (checksum ))))
b .blockList = append (b .blockList , b .curBlock )
b .uncompressedSize .Add (uint32 (b .curBlock .end ))
b .lenOffsets += uint32 (int (math .Ceil (float64 (len (b .curBlock .baseKey ))/4 ))*4 ) + 40
if b .blockChan != nil {
b .blockChan <- b .curBlock
}
}
func (b *Builder ) shouldFinishBlock (key []byte , value y .ValueStruct ) bool {
if len (b .curBlock .entryOffsets ) <= 0 {
return false
}
y .AssertTrue ((uint32 (len (b .curBlock .entryOffsets ))+1 )*4 +4 +8 +4 < math .MaxUint32 )
entriesOffsetsSize := uint32 ((len (b .curBlock .entryOffsets )+1 )*4 +
4 +
8 +
4 )
estimatedSize := uint32 (b .curBlock .end ) + uint32 (6 ) +
uint32 (len (key )) + value .EncodedSize () + entriesOffsetsSize
if b .shouldEncrypt () {
estimatedSize += aes .BlockSize
}
y .AssertTrue (uint64 (b .curBlock .end )+uint64 (estimatedSize ) < math .MaxUint32 )
return estimatedSize > uint32 (b .opts .BlockSize )
}
func (b *Builder ) AddStaleKey (key []byte , v y .ValueStruct , valueLen uint32 ) {
b .staleDataSize += len (key ) + len (v .Value ) + 4 + 4
b .addInternal (key , v , valueLen , true )
}
func (b *Builder ) Add (key []byte , value y .ValueStruct , valueLen uint32 ) {
b .addInternal (key , value , valueLen , false )
}
func (b *Builder ) addInternal (key []byte , value y .ValueStruct , valueLen uint32 , isStale bool ) {
if b .shouldFinishBlock (key , value ) {
if isStale {
b .staleDataSize += len (key ) + 4 + 4
}
b .finishBlock ()
b .curBlock = &bblock {
data : b .alloc .Allocate (b .opts .BlockSize + padding ),
}
}
b .addHelper (key , value , valueLen )
}
func (b *Builder ) ReachedCapacity () bool {
sumBlockSizes := b .compressedSize .Load ()
if b .opts .Compression == options .None && b .opts .DataKey == nil {
sumBlockSizes = b .uncompressedSize .Load ()
}
blocksSize := sumBlockSizes +
uint32 (len (b .curBlock .entryOffsets )*4 ) +
4 +
8 +
4
estimateSz := blocksSize +
4 +
b .lenOffsets
return uint64 (estimateSz ) > b .opts .tableCapacity
}
func (b *Builder ) Finish () []byte {
bd := b .Done ()
buf := make ([]byte , bd .Size )
written := bd .Copy (buf )
y .AssertTrue (written == len (buf ))
return buf
}
type buildData struct {
blockList []*bblock
index []byte
checksum []byte
Size int
alloc *z .Allocator
}
func (bd *buildData ) Copy (dst []byte ) int {
var written int
for _ , bl := range bd .blockList {
written += copy (dst [written :], bl .data [:bl .end ])
}
written += copy (dst [written :], bd .index )
written += copy (dst [written :], y .U32ToBytes (uint32 (len (bd .index ))))
written += copy (dst [written :], bd .checksum )
written += copy (dst [written :], y .U32ToBytes (uint32 (len (bd .checksum ))))
return written
}
func (b *Builder ) Done () buildData {
b .finishBlock ()
if b .blockChan != nil {
close (b .blockChan )
}
b .wg .Wait ()
if len (b .blockList ) == 0 {
return buildData {}
}
bd := buildData {
blockList : b .blockList ,
alloc : b .alloc ,
}
var f y .Filter
if b .opts .BloomFalsePositive > 0 {
bits := y .BloomBitsPerKey (len (b .keyHashes ), b .opts .BloomFalsePositive )
f = y .NewFilter (b .keyHashes , bits )
}
index , dataSize := b .buildIndex (f )
var err error
if b .shouldEncrypt () {
index , err = b .encrypt (index )
y .Check (err )
}
checksum := b .calculateChecksum (index )
bd .index = index
bd .checksum = checksum
bd .Size = int (dataSize ) + len (index ) + len (checksum ) + 4 + 4
return bd
}
func (b *Builder ) calculateChecksum (data []byte ) []byte {
checksum := pb .Checksum {
Sum : y .CalculateChecksum (data , pb .Checksum_CRC32C ),
Algo : pb .Checksum_CRC32C ,
}
chksum , err := proto .Marshal (&checksum )
y .Check (err )
return chksum
}
func (b *Builder ) DataKey () *pb .DataKey {
return b .opts .DataKey
}
func (b *Builder ) Opts () *Options {
return b .opts
}
func (b *Builder ) encrypt (data []byte ) ([]byte , error ) {
iv , err := y .GenerateIV ()
if err != nil {
return data , y .Wrapf (err , "Error while generating IV in Builder.encrypt" )
}
needSz := len (data ) + len (iv )
dst := b .alloc .Allocate (needSz )
if err = y .XORBlock (dst [:len (data )], data , b .DataKey ().Data , iv ); err != nil {
return data , y .Wrapf (err , "Error while encrypting in Builder.encrypt" )
}
y .AssertTrue (len (iv ) == copy (dst [len (data ):], iv ))
return dst , nil
}
func (b *Builder ) shouldEncrypt () bool {
return b .opts .DataKey != nil
}
func (b *Builder ) compressData (data []byte ) ([]byte , error ) {
switch b .opts .Compression {
case options .None :
return data , nil
case options .Snappy :
sz := s2 .MaxEncodedLen (len (data ))
dst := b .alloc .Allocate (sz )
return s2 .EncodeSnappy (dst , data ), nil
case options .ZSTD :
sz := y .ZSTDCompressBound (len (data ))
dst := b .alloc .Allocate (sz )
return y .ZSTDCompress (dst , data , b .opts .ZSTDCompressionLevel )
}
return nil , errors .New ("Unsupported compression type" )
}
func (b *Builder ) buildIndex (bloom []byte ) ([]byte , uint32 ) {
builder := fbs .NewBuilder (3 << 20 )
boList , dataSize := b .writeBlockOffsets (builder )
fb .TableIndexStartOffsetsVector (builder , len (boList ))
for i := len (boList ) - 1 ; i >= 0 ; i -- {
builder .PrependUOffsetT (boList [i ])
}
boEnd := builder .EndVector (len (boList ))
var bfoff fbs .UOffsetT
if len (bloom ) > 0 {
bfoff = builder .CreateByteVector (bloom )
}
b .onDiskSize += dataSize
fb .TableIndexStart (builder )
fb .TableIndexAddOffsets (builder , boEnd )
fb .TableIndexAddBloomFilter (builder , bfoff )
fb .TableIndexAddMaxVersion (builder , b .maxVersion )
fb .TableIndexAddUncompressedSize (builder , b .uncompressedSize .Load ())
fb .TableIndexAddKeyCount (builder , uint32 (len (b .keyHashes )))
fb .TableIndexAddOnDiskSize (builder , b .onDiskSize )
fb .TableIndexAddStaleDataSize (builder , uint32 (b .staleDataSize ))
builder .Finish (fb .TableIndexEnd (builder ))
buf := builder .FinishedBytes ()
index := fb .GetRootAsTableIndex (buf , 0 )
y .AssertTrue (index .MutateOnDiskSize (index .OnDiskSize () + uint32 (len (buf ))))
return buf , dataSize
}
func (b *Builder ) writeBlockOffsets (builder *fbs .Builder ) ([]fbs .UOffsetT , uint32 ) {
var startOffset uint32
var uoffs []fbs .UOffsetT
for _ , bl := range b .blockList {
uoff := b .writeBlockOffset (builder , bl , startOffset )
uoffs = append (uoffs , uoff )
startOffset += uint32 (bl .end )
}
return uoffs , startOffset
}
func (b *Builder ) writeBlockOffset (
builder *fbs .Builder , bl *bblock , startOffset uint32 ) fbs .UOffsetT {
k := builder .CreateByteVector (bl .baseKey )
fb .BlockOffsetStart (builder )
fb .BlockOffsetAddKey (builder , k )
fb .BlockOffsetAddOffset (builder , startOffset )
fb .BlockOffsetAddLen (builder , uint32 (bl .end ))
return fb .BlockOffsetEnd (builder )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .