package y
import (
"bytes"
"encoding/binary"
stderrors "errors"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"reflect"
"strconv"
"sync"
"time"
"unsafe"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/ristretto/v2/z"
)
var (
ErrEOF = stderrors .New ("ErrEOF: End of file" )
ErrCommitAfterFinish = stderrors .New ("Batch commit not permitted after finish" )
)
type Flags int
const (
Sync Flags = 1 << iota
ReadOnly
)
var (
datasyncFileFlag = 0x0
CastagnoliCrcTable = crc32 .MakeTable (crc32 .Castagnoli )
)
func OpenExistingFile (filename string , flags Flags ) (*os .File , error ) {
openFlags := os .O_RDWR
if flags &ReadOnly != 0 {
openFlags = os .O_RDONLY
}
if flags &Sync != 0 {
openFlags |= datasyncFileFlag
}
return os .OpenFile (filename , openFlags , 0 )
}
func CreateSyncedFile (filename string , sync bool ) (*os .File , error ) {
flags := os .O_RDWR | os .O_CREATE | os .O_EXCL
if sync {
flags |= datasyncFileFlag
}
return os .OpenFile (filename , flags , 0600 )
}
func OpenSyncedFile (filename string , sync bool ) (*os .File , error ) {
flags := os .O_RDWR | os .O_CREATE
if sync {
flags |= datasyncFileFlag
}
return os .OpenFile (filename , flags , 0600 )
}
func OpenTruncFile (filename string , sync bool ) (*os .File , error ) {
flags := os .O_RDWR | os .O_CREATE | os .O_TRUNC
if sync {
flags |= datasyncFileFlag
}
return os .OpenFile (filename , flags , 0600 )
}
func SafeCopy (a , src []byte ) []byte {
b := append (a [:0 ], src ...)
if b == nil {
return []byte {}
}
return b
}
func Copy (a []byte ) []byte {
b := make ([]byte , len (a ))
copy (b , a )
return b
}
func KeyWithTs (key []byte , ts uint64 ) []byte {
out := make ([]byte , len (key )+8 )
copy (out , key )
binary .BigEndian .PutUint64 (out [len (key ):], math .MaxUint64 -ts )
return out
}
func ParseTs (key []byte ) uint64 {
if len (key ) <= 8 {
return 0
}
return math .MaxUint64 - binary .BigEndian .Uint64 (key [len (key )-8 :])
}
func CompareKeys (key1 , key2 []byte ) int {
if cmp := bytes .Compare (key1 [:len (key1 )-8 ], key2 [:len (key2 )-8 ]); cmp != 0 {
return cmp
}
return bytes .Compare (key1 [len (key1 )-8 :], key2 [len (key2 )-8 :])
}
func ParseKey (key []byte ) []byte {
if len (key ) < 8 {
return nil
}
return key [:len (key )-8 ]
}
func SameKey (src , dst []byte ) bool {
if len (src ) != len (dst ) {
return false
}
return bytes .Equal (ParseKey (src ), ParseKey (dst ))
}
type Slice struct {
buf []byte
}
func (s *Slice ) Resize (sz int ) []byte {
if cap (s .buf ) < sz {
s .buf = make ([]byte , sz )
}
return s .buf [0 :sz ]
}
func FixedDuration (d time .Duration ) string {
str := fmt .Sprintf ("%02ds" , int (d .Seconds ())%60 )
if d >= time .Minute {
str = fmt .Sprintf ("%02dm" , int (d .Minutes ())%60 ) + str
}
if d >= time .Hour {
str = fmt .Sprintf ("%02dh" , int (d .Hours ())) + str
}
return str
}
type Throttle struct {
once sync .Once
wg sync .WaitGroup
ch chan struct {}
errCh chan error
finishErr error
}
func NewThrottle (max int ) *Throttle {
return &Throttle {
ch : make (chan struct {}, max ),
errCh : make (chan error , max ),
}
}
func (t *Throttle ) Do () error {
for {
select {
case t .ch <- struct {}{}:
t .wg .Add (1 )
return nil
case err := <- t .errCh :
if err != nil {
return err
}
}
}
}
func (t *Throttle ) Done (err error ) {
if err != nil {
t .errCh <- err
}
select {
case <- t .ch :
default :
panic ("Throttle Do Done mismatch" )
}
t .wg .Done ()
}
func (t *Throttle ) Finish () error {
t .once .Do (func () {
t .wg .Wait ()
close (t .ch )
close (t .errCh )
for err := range t .errCh {
if err != nil {
t .finishErr = err
return
}
}
})
return t .finishErr
}
func U16ToBytes (v uint16 ) []byte {
var uBuf [2 ]byte
binary .BigEndian .PutUint16 (uBuf [:], v )
return uBuf [:]
}
func BytesToU16 (b []byte ) uint16 {
return binary .BigEndian .Uint16 (b )
}
func U32ToBytes (v uint32 ) []byte {
var uBuf [4 ]byte
binary .BigEndian .PutUint32 (uBuf [:], v )
return uBuf [:]
}
func BytesToU32 (b []byte ) uint32 {
return binary .BigEndian .Uint32 (b )
}
func U32SliceToBytes (u32s []uint32 ) []byte {
if len (u32s ) == 0 {
return nil
}
var b []byte
hdr := (*reflect .SliceHeader )(unsafe .Pointer (&b ))
hdr .Len = len (u32s ) * 4
hdr .Cap = hdr .Len
hdr .Data = uintptr (unsafe .Pointer (&u32s [0 ]))
return b
}
func BytesToU32Slice (b []byte ) []uint32 {
if len (b ) == 0 {
return nil
}
var u32s []uint32
hdr := (*reflect .SliceHeader )(unsafe .Pointer (&u32s ))
hdr .Len = len (b ) / 4
hdr .Cap = hdr .Len
hdr .Data = uintptr (unsafe .Pointer (&b [0 ]))
return u32s
}
func U64ToBytes (v uint64 ) []byte {
var uBuf [8 ]byte
binary .BigEndian .PutUint64 (uBuf [:], v )
return uBuf [:]
}
func BytesToU64 (b []byte ) uint64 {
return binary .BigEndian .Uint64 (b )
}
func U64SliceToBytes (u64s []uint64 ) []byte {
if len (u64s ) == 0 {
return nil
}
var b []byte
hdr := (*reflect .SliceHeader )(unsafe .Pointer (&b ))
hdr .Len = len (u64s ) * 8
hdr .Cap = hdr .Len
hdr .Data = uintptr (unsafe .Pointer (&u64s [0 ]))
return b
}
func BytesToU64Slice (b []byte ) []uint64 {
if len (b ) == 0 {
return nil
}
var u64s []uint64
hdr := (*reflect .SliceHeader )(unsafe .Pointer (&u64s ))
hdr .Len = len (b ) / 8
hdr .Cap = hdr .Len
hdr .Data = uintptr (unsafe .Pointer (&b [0 ]))
return u64s
}
type page struct {
buf []byte
}
type PageBuffer struct {
pages []*page
length int
nextPageSize int
}
func NewPageBuffer (pageSize int ) *PageBuffer {
b := &PageBuffer {}
b .pages = append (b .pages , &page {buf : make ([]byte , 0 , pageSize )})
b .nextPageSize = pageSize * 2
return b
}
func (b *PageBuffer ) Write (data []byte ) (int , error ) {
dataLen := len (data )
for {
cp := b .pages [len (b .pages )-1 ]
n := copy (cp .buf [len (cp .buf ):cap (cp .buf )], data )
cp .buf = cp .buf [:len (cp .buf )+n ]
b .length += n
if len (data ) == n {
break
}
data = data [n :]
b .pages = append (b .pages , &page {buf : make ([]byte , 0 , b .nextPageSize )})
b .nextPageSize *= 2
}
return dataLen , nil
}
func (b *PageBuffer ) WriteByte (data byte ) error {
_ , err := b .Write ([]byte {data })
return err
}
func (b *PageBuffer ) Len () int {
return b .length
}
func (b *PageBuffer ) pageForOffset (offset int ) (int , int ) {
AssertTrue (offset < b .length )
var pageIdx , startIdx , sizeNow int
for i := 0 ; i < len (b .pages ); i ++ {
cp := b .pages [i ]
if sizeNow +len (cp .buf )-1 < offset {
sizeNow += len (cp .buf )
} else {
pageIdx = i
startIdx = offset - sizeNow
break
}
}
return pageIdx , startIdx
}
func (b *PageBuffer ) Truncate (n int ) {
pageIdx , startIdx := b .pageForOffset (n )
b .pages = b .pages [:pageIdx +1 ]
cp := b .pages [len (b .pages )-1 ]
cp .buf = cp .buf [:startIdx ]
b .length = n
}
func (b *PageBuffer ) Bytes () []byte {
buf := make ([]byte , b .length )
written := 0
for i := 0 ; i < len (b .pages ); i ++ {
written += copy (buf [written :], b .pages [i ].buf )
}
return buf
}
func (b *PageBuffer ) WriteTo (w io .Writer ) (int64 , error ) {
written := int64 (0 )
for i := 0 ; i < len (b .pages ); i ++ {
n , err := w .Write (b .pages [i ].buf )
written += int64 (n )
if err != nil {
return written , err
}
}
return written , nil
}
func (b *PageBuffer ) NewReaderAt (offset int ) *PageBufferReader {
pageIdx , startIdx := b .pageForOffset (offset )
return &PageBufferReader {
buf : b ,
pageIdx : pageIdx ,
startIdx : startIdx ,
}
}
type PageBufferReader struct {
buf *PageBuffer
pageIdx int
startIdx int
}
func (r *PageBufferReader ) Read (p []byte ) (int , error ) {
pc := len (r .buf .pages )
read := 0
for r .pageIdx < pc && read < len (p ) {
cp := r .buf .pages [r .pageIdx ]
endIdx := len (cp .buf )
n := copy (p [read :], cp .buf [r .startIdx :endIdx ])
read += n
r .startIdx += n
if r .startIdx >= cap (cp .buf ) {
r .pageIdx ++
r .startIdx = 0
continue
}
if r .pageIdx == pc -1 {
break
}
}
if read == 0 && len (p ) > 0 {
return read , io .EOF
}
return read , nil
}
const kvsz = int (unsafe .Sizeof (pb .KV {}))
func NewKV (alloc *z .Allocator ) *pb .KV {
if alloc == nil {
return &pb .KV {}
}
b := alloc .AllocateAligned (kvsz )
return (*pb .KV )(unsafe .Pointer (&b [0 ]))
}
func IBytesToString (size uint64 , precision int ) string {
sizes := []string {"B" , "KiB" , "MiB" , "GiB" , "TiB" , "PiB" , "EiB" }
base := float64 (1024 )
if size < 10 {
return fmt .Sprintf ("%d B" , size )
}
e := math .Floor (math .Log (float64 (size )) / math .Log (base ))
suffix := sizes [int (e )]
val := float64 (size ) / math .Pow (base , e )
f := "%." + strconv .Itoa (precision ) + "f %s"
return fmt .Sprintf (f , val , suffix )
}
type RateMonitor struct {
start time .Time
lastSent uint64
lastCapture time .Time
rates []float64
idx int
}
func NewRateMonitor (numSamples int ) *RateMonitor {
return &RateMonitor {
start : time .Now (),
rates : make ([]float64 , numSamples ),
}
}
const minRate = 0.0001
func (rm *RateMonitor ) Capture (sent uint64 ) {
diff := sent - rm .lastSent
dur := time .Since (rm .lastCapture )
rm .lastCapture , rm .lastSent = time .Now (), sent
rate := float64 (diff ) / dur .Seconds ()
if rate < minRate {
rate = minRate
}
rm .rates [rm .idx ] = rate
rm .idx = (rm .idx + 1 ) % len (rm .rates )
}
func (rm *RateMonitor ) Rate () uint64 {
var total float64
var den float64
for _ , r := range rm .rates {
if r < minRate {
continue
}
total += r
den += 1.0
}
if den < minRate {
return 0
}
return uint64 (total / den )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .