package frostdb
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/rand"
"path/filepath"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/util"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/parquet-go/parquet-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
schemav2pb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha2"
tablepb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/table/v1alpha1"
walpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/wal/v1alpha1"
"github.com/polarsignals/frostdb/index"
"github.com/polarsignals/frostdb/internal/records"
"github.com/polarsignals/frostdb/parts"
"github.com/polarsignals/frostdb/pqarrow"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
"github.com/polarsignals/frostdb/recovery"
walpkg "github.com/polarsignals/frostdb/wal"
)
var (
ErrNoSchema = fmt .Errorf ("no schema" )
ErrTableClosing = fmt .Errorf ("table closing" )
)
func DefaultIndexConfig () []*index .LevelConfig {
return []*index .LevelConfig {
{Level : index .L0 , MaxSize : 1024 * 1024 * 15 , Type : index .CompactionTypeParquetMemory },
{Level : index .L1 , MaxSize : 1024 * 1024 * 128 , Type : index .CompactionTypeParquetMemory },
{Level : index .L2 , MaxSize : 1024 * 1024 * 512 },
}
}
type ErrWriteRow struct { err error }
func (e ErrWriteRow ) Error () string { return "failed to write row: " + e .err .Error() }
type ErrReadRow struct { err error }
func (e ErrReadRow ) Error () string { return "failed to read row: " + e .err .Error() }
type ErrCreateSchemaWriter struct { err error }
func (e ErrCreateSchemaWriter ) Error () string {
return "failed to create schema write: " + e .err .Error()
}
type TableOption func (*tablepb .TableConfig ) error
func WithRowGroupSize (numRows int ) TableOption {
return func (config *tablepb .TableConfig ) error {
config .RowGroupSize = uint64 (numRows )
return nil
}
}
func WithBlockReaderLimit (n int ) TableOption {
return func (config *tablepb .TableConfig ) error {
config .BlockReaderLimit = uint64 (n )
return nil
}
}
func WithoutWAL () TableOption {
return func (config *tablepb .TableConfig ) error {
config .DisableWal = true
return nil
}
}
func WithUniquePrimaryIndex (unique bool ) TableOption {
return func (config *tablepb .TableConfig ) error {
switch e := config .Schema .(type ) {
case *tablepb .TableConfig_DeprecatedSchema :
e .DeprecatedSchema .UniquePrimaryIndex = unique
case *tablepb .TableConfig_SchemaV2 :
e .SchemaV2 .UniquePrimaryIndex = unique
}
return nil
}
}
func FromConfig (config *tablepb .TableConfig ) TableOption {
return func (cfg *tablepb .TableConfig ) error {
if config .BlockReaderLimit != 0 {
cfg .BlockReaderLimit = config .BlockReaderLimit
}
cfg .DisableWal = config .DisableWal
cfg .RowGroupSize = config .RowGroupSize
return nil
}
}
func defaultTableConfig() *tablepb .TableConfig {
return &tablepb .TableConfig {
BlockReaderLimit : uint64 (runtime .GOMAXPROCS (0 )),
}
}
func NewTableConfig (
schema proto .Message ,
options ...TableOption ,
) *tablepb .TableConfig {
t := defaultTableConfig ()
switch v := schema .(type ) {
case *schemapb .Schema :
t .Schema = &tablepb .TableConfig_DeprecatedSchema {
DeprecatedSchema : v ,
}
case *schemav2pb .Schema :
t .Schema = &tablepb .TableConfig_SchemaV2 {
SchemaV2 : v ,
}
default :
panic (fmt .Sprintf ("unsupported schema type: %T" , v ))
}
for _ , opt := range options {
_ = opt (t )
}
return t
}
type completedBlock struct {
prevTx uint64
tx uint64
}
type GenericTable [T any ] struct {
*Table
mu sync .Mutex
build *records .Build [T ]
}
func (t *GenericTable [T ]) Release () {
t .build .Release ()
}
func (t *GenericTable [T ]) Write (ctx context .Context , values ...T ) (uint64 , error ) {
t .mu .Lock ()
defer t .mu .Unlock ()
err := t .build .Append (values ...)
if err != nil {
return 0 , err
}
return t .InsertRecord (ctx , t .build .NewRecord ())
}
func NewGenericTable [T any ](db *DB , name string , mem memory .Allocator , options ...TableOption ) (*GenericTable [T ], error ) {
build := records .NewBuild [T ](mem )
table , err := db .Table (name , NewTableConfig (build .Schema (name ), options ...))
if err != nil {
return nil , err
}
return &GenericTable [T ]{build : build , Table : table }, nil
}
type Table struct {
db *DB
name string
metrics tableMetrics
logger log .Logger
tracer trace .Tracer
config atomic .Pointer [tablepb .TableConfig ]
schema *dynparquet .Schema
pendingBlocks map [*TableBlock ]struct {}
completedBlocks []completedBlock
lastCompleted uint64
mtx *sync .RWMutex
active *TableBlock
wal WAL
closing bool
}
type Sync interface {
Sync () error
}
type WAL interface {
Close () error
Log (tx uint64 , record *walpb .Record ) error
LogRecord (tx uint64 , table string , record arrow .Record ) error
Replay (tx uint64 , handler walpkg .ReplayHandlerFunc ) error
Truncate (tx uint64 ) error
Reset (nextTx uint64 ) error
FirstIndex () (uint64 , error )
LastIndex () (uint64 , error )
}
type TableBlock struct {
table *Table
logger log .Logger
tracer trace .Tracer
ulid ulid .ULID
minTx uint64
prevTx uint64
uncompressedInsertsSize atomic .Int64
lastSnapshotSize atomic .Int64
index *index .LSM
pendingWritersWg sync .WaitGroup
pendingReadersWg sync .WaitGroup
mtx *sync .RWMutex
}
type Closer interface {
Close (cleanup bool ) error
}
func schemaFromTableConfig(tableConfig *tablepb .TableConfig ) (*dynparquet .Schema , error ) {
switch schema := tableConfig .Schema .(type ) {
case *tablepb .TableConfig_DeprecatedSchema :
return dynparquet .SchemaFromDefinition (schema .DeprecatedSchema )
case *tablepb .TableConfig_SchemaV2 :
return dynparquet .SchemaFromDefinition (schema .SchemaV2 )
default :
return nil , nil
}
}
func newTable(
db *DB ,
name string ,
tableConfig *tablepb .TableConfig ,
metrics tableMetrics ,
logger log .Logger ,
tracer trace .Tracer ,
wal WAL ,
) (*Table , error ) {
if db .columnStore .indexDegree <= 0 {
msg := fmt .Sprintf ("Table's columnStore index degree must be a positive integer (received %d)" , db .columnStore .indexDegree )
return nil , errors .New (msg )
}
if db .columnStore .splitSize < 2 {
msg := fmt .Sprintf ("Table's columnStore splitSize must be a positive integer > 1 (received %d)" , db .columnStore .splitSize )
return nil , errors .New (msg )
}
if tableConfig == nil {
tableConfig = defaultTableConfig ()
}
s , err := schemaFromTableConfig (tableConfig )
if err != nil {
return nil , err
}
t := &Table {
db : db ,
name : name ,
logger : logger ,
tracer : tracer ,
mtx : &sync .RWMutex {},
wal : wal ,
schema : s ,
metrics : metrics ,
}
t .config .Store (tableConfig )
if tableConfig .DisableWal {
t .wal = &walpkg .NopWAL {}
}
t .pendingBlocks = make (map [*TableBlock ]struct {})
return t , nil
}
func (t *Table ) newTableBlock (prevTx , tx uint64 , id ulid .ULID ) error {
b , err := id .MarshalBinary ()
if err != nil {
return err
}
if err := t .wal .Log (tx , &walpb .Record {
Entry : &walpb .Entry {
EntryType : &walpb .Entry_NewTableBlock_ {
NewTableBlock : &walpb .Entry_NewTableBlock {
TableName : t .name ,
BlockId : b ,
Config : t .config .Load (),
},
},
},
}); err != nil {
return err
}
t .active , err = newTableBlock (t , prevTx , tx , id )
if err != nil {
return err
}
return nil
}
func (t *Table ) dropPendingBlock (block *TableBlock ) {
t .mtx .Lock ()
defer t .mtx .Unlock ()
delete (t .pendingBlocks , block )
block .pendingReadersWg .Wait ()
block .pendingWritersWg .Wait ()
if err := block .index .Close (); err != nil {
level .Error (t .logger ).Log ("msg" , "failed to close index" , "err" , err )
}
}
func (t *Table ) writeBlock (
block *TableBlock , nextTxn uint64 , snapshotDB bool , opts ...RotateBlockOption ,
) {
rbo := &rotateBlockOptions {}
for _ , o := range opts {
o (rbo )
}
if rbo .wg != nil {
defer rbo .wg .Done ()
}
level .Debug (t .logger ).Log ("msg" , "syncing block" , "next_txn" , nextTxn , "ulid" , block .ulid , "size" , block .index .Size ())
block .pendingWritersWg .Wait ()
level .Debug (t .logger ).Log ("msg" , "done syncing block" , "next_txn" , nextTxn , "ulid" , block .ulid , "size" , block .index .Size ())
var err error
if !rbo .skipPersist && block .index .Size () != 0 {
err = block .Persist ()
}
t .dropPendingBlock (block )
if err != nil {
level .Error (t .logger ).Log ("msg" , "failed to persist block" )
level .Error (t .logger ).Log ("msg" , err .Error())
return
}
if err := func () error {
tx , _ , commit := t .db .begin ()
defer commit ()
buf , err := block .ulid .MarshalBinary ()
if err != nil {
level .Error (t .logger ).Log ("msg" , "failed to record block persistence in WAL: marshal ulid" , "err" , err )
return err
}
level .Debug (t .logger ).Log ("msg" , "recording block persistence in WAL" , "ulid" , block .ulid , "txn" , tx )
if err := t .wal .Log (tx , &walpb .Record {
Entry : &walpb .Entry {
EntryType : &walpb .Entry_TableBlockPersisted_ {
TableBlockPersisted : &walpb .Entry_TableBlockPersisted {
TableName : t .name ,
BlockId : buf ,
NextTx : nextTxn ,
},
},
},
}); err != nil {
level .Error (t .logger ).Log ("msg" , "failed to record block persistence in WAL" , "err" , err )
return err
}
return nil
}(); err != nil {
return
}
t .mtx .Lock ()
t .completedBlocks = append (t .completedBlocks , completedBlock {prevTx : block .prevTx , tx : block .minTx })
sort .Slice (t .completedBlocks , func (i , j int ) bool {
return t .completedBlocks [i ].prevTx < t .completedBlocks [j ].prevTx
})
for len (t .completedBlocks ) > 0 && t .completedBlocks [0 ].prevTx == t .lastCompleted {
t .lastCompleted = t .completedBlocks [0 ].tx
t .metrics .lastCompletedBlockTx .Set (float64 (t .lastCompleted ))
t .completedBlocks = t .completedBlocks [1 :]
}
t .mtx .Unlock ()
t .db .maintainWAL ()
if snapshotDB && t .db .columnStore .snapshotTriggerSize != 0 && t .db .columnStore .enableWAL {
func () {
if !t .db .snapshotInProgress .CompareAndSwap (false , true ) {
return
}
defer t .db .snapshotInProgress .Store (false )
ctx := context .Background ()
tx := t .db .beginRead ()
if err := t .db .snapshotAtTX (ctx , tx , t .db .snapshotWriter (tx )); err != nil {
level .Error (t .logger ).Log (
"msg" , "failed to write snapshot on block rotation" ,
"err" , err ,
)
}
if err := t .db .reclaimDiskSpace (ctx , nil ); err != nil {
level .Error (t .logger ).Log (
"msg" , "failed to reclaim disk space after snapshot on block rotation" ,
"err" , err ,
)
return
}
}()
}
}
type rotateBlockOptions struct {
skipPersist bool
wg *sync .WaitGroup
}
type RotateBlockOption func (*rotateBlockOptions )
func WithRotateBlockSkipPersist () RotateBlockOption {
return func (o *rotateBlockOptions ) {
o .skipPersist = true
}
}
func WithRotateBlockWaitGroup (wg *sync .WaitGroup ) RotateBlockOption {
return func (o *rotateBlockOptions ) {
o .wg = wg
}
}
func (t *Table ) RotateBlock (_ context .Context , block *TableBlock , opts ...RotateBlockOption ) error {
rbo := &rotateBlockOptions {}
for _ , o := range opts {
o (rbo )
}
t .mtx .Lock ()
defer t .mtx .Unlock ()
if t .active != block {
if rbo .wg != nil {
rbo .wg .Done ()
}
return nil
}
level .Debug (t .logger ).Log (
"msg" , "rotating block" ,
"ulid" , block .ulid ,
"size" , block .Size (),
"skip_persist" , rbo .skipPersist ,
)
defer level .Debug (t .logger ).Log ("msg" , "done rotating block" , "ulid" , block .ulid )
tx , _ , commit := t .db .begin ()
defer commit ()
id := generateULID ()
for id .Time () == block .ulid .Time () {
time .Sleep (time .Millisecond )
id = generateULID ()
}
if err := t .newTableBlock (t .active .minTx , tx , id ); err != nil {
return err
}
t .metrics .blockRotated .Inc ()
t .metrics .numParts .Set (float64 (0 ))
if !rbo .skipPersist {
t .pendingBlocks [block ] = struct {}{}
}
go t .writeBlock (block , tx , true , opts ...)
return nil
}
func (t *Table ) ActiveBlock () *TableBlock {
t .mtx .RLock ()
defer t .mtx .RUnlock ()
return t .active
}
func (t *Table ) ActiveWriteBlock () (*TableBlock , func (), error ) {
t .mtx .RLock ()
defer t .mtx .RUnlock ()
if t .closing {
return nil , nil , ErrTableClosing
}
t .active .pendingWritersWg .Add (1 )
return t .active , t .active .pendingWritersWg .Done , nil
}
func (t *Table ) Schema () *dynparquet .Schema {
if t .config .Load () == nil {
return nil
}
return t .schema
}
func (t *Table ) EnsureCompaction () error {
return t .ActiveBlock ().EnsureCompaction ()
}
func (t *Table ) InsertRecord (ctx context .Context , record arrow .Record ) (uint64 , error ) {
block , finish , err := t .appender (ctx )
if err != nil {
return 0 , fmt .Errorf ("get appender: %w" , err )
}
defer finish ()
tx , _ , commit := t .db .begin ()
defer commit ()
preHashedRecord := dynparquet .PrehashColumns (t .schema , record )
defer preHashedRecord .Release ()
if err := t .wal .LogRecord (tx , t .name , preHashedRecord ); err != nil {
return tx , fmt .Errorf ("append to log: %w" , err )
}
if err := block .InsertRecord (ctx , tx , preHashedRecord ); err != nil {
return tx , fmt .Errorf ("insert buffer into block: %w" , err )
}
return tx , nil
}
func (t *Table ) appender (ctx context .Context ) (*TableBlock , func (), error ) {
for {
block , finish , err := t .ActiveWriteBlock ()
if err != nil {
return nil , nil , err
}
uncompressedInsertsSize := block .uncompressedInsertsSize .Load ()
if t .db .columnStore .snapshotTriggerSize != 0 &&
uncompressedInsertsSize -block .lastSnapshotSize .Load () > t .db .columnStore .snapshotTriggerSize {
t .db .asyncSnapshot (context .Background (), func () {
level .Debug (t .logger ).Log (
"msg" , "successful snapshot on block size trigger" ,
"block_size" , humanize .IBytes (uint64 (uncompressedInsertsSize )),
"last_snapshot_size" , humanize .IBytes (uint64 (block .lastSnapshotSize .Load ())),
)
block .lastSnapshotSize .Store (uncompressedInsertsSize )
if err := t .db .reclaimDiskSpace (context .Background (), nil ); err != nil {
level .Error (t .logger ).Log (
"msg" , "failed to reclaim disk space after snapshot" ,
"err" , err ,
)
return
}
})
}
blockSize := block .Size ()
if blockSize < t .db .columnStore .activeMemorySize || t .db .columnStore .manualBlockRotation {
return block , finish , nil
}
finish ()
err = t .RotateBlock (ctx , block )
if err != nil {
return nil , nil , fmt .Errorf ("rotate block: %w" , err )
}
}
}
func (t *Table ) View (ctx context .Context , fn func (ctx context .Context , tx uint64 ) error ) error {
ctx , span := t .tracer .Start (ctx , "Table/View" )
tx := t .db .beginRead ()
span .SetAttributes (attribute .Int64 ("tx" , int64 (tx )))
defer span .End ()
return fn (ctx , tx )
}
func (t *Table ) Iterator (
ctx context .Context ,
tx uint64 ,
pool memory .Allocator ,
callbacks []logicalplan .Callback ,
options ...logicalplan .Option ,
) error {
iterOpts := &logicalplan .IterOptions {}
for _ , opt := range options {
opt (iterOpts )
}
ctx , span := t .tracer .Start (ctx , "Table/Iterator" )
span .SetAttributes (attribute .Int ("physicalProjections" , len (iterOpts .PhysicalProjection )))
span .SetAttributes (attribute .Int ("projections" , len (iterOpts .Projection )))
span .SetAttributes (attribute .Int ("distinct" , len (iterOpts .DistinctColumns )))
defer span .End ()
if len (callbacks ) == 0 {
return errors .New ("no callbacks provided" )
}
rowGroups := make (chan any , len (callbacks )*4 )
defer func () {
for rg := range rowGroups {
switch t := rg .(type ) {
case index .ReleaseableRowGroup :
t .Release ()
case arrow .Record :
t .Release ()
}
}
}()
const bufferSize = 1024
errg , ctx := errgroup .WithContext (ctx )
for _ , callback := range callbacks {
callback := callback
errg .Go (recovery .Do (func () error {
converter := pqarrow .NewParquetConverter (pool , *iterOpts )
defer converter .Close ()
for {
select {
case <- ctx .Done ():
return ctx .Err ()
case rg , ok := <- rowGroups :
if !ok {
r := converter .NewRecord ()
if r == nil {
return nil
}
defer r .Release ()
if r .NumRows () == 0 {
return nil
}
return callback (ctx , r )
}
switch rg := rg .(type ) {
case arrow .Record :
defer rg .Release ()
r := pqarrow .Project (rg , iterOpts .PhysicalProjection )
defer r .Release ()
err := callback (ctx , r )
if err != nil {
return err
}
case index .ReleaseableRowGroup :
defer rg .Release ()
if err := converter .Convert (ctx , rg , t .schema ); err != nil {
return fmt .Errorf ("failed to convert row group to arrow record: %v" , err )
}
if len (converter .Fields ()) == 0 {
continue
}
if converter .NumRows () >= bufferSize {
err := func () error {
r := converter .NewRecord ()
defer r .Release ()
converter .Reset ()
return callback (ctx , r )
}()
if err != nil {
return err
}
}
case dynparquet .DynamicRowGroup :
if err := converter .Convert (ctx , rg , t .schema ); err != nil {
return fmt .Errorf ("failed to convert row group to arrow record: %v" , err )
}
if len (converter .Fields ()) == 0 {
continue
}
if converter .NumRows () >= bufferSize {
err := func () error {
r := converter .NewRecord ()
defer r .Release ()
converter .Reset ()
return callback (ctx , r )
}()
if err != nil {
return err
}
}
default :
return fmt .Errorf ("unknown row group type: %T" , t )
}
}
}
}, t .logger ))
}
errg .Go (func () error {
defer close (rowGroups )
return t .collectRowGroups (ctx , tx , iterOpts .Filter , iterOpts .ReadMode , rowGroups )
})
return errg .Wait ()
}
func (t *Table ) SchemaIterator (
ctx context .Context ,
tx uint64 ,
pool memory .Allocator ,
callbacks []logicalplan .Callback ,
options ...logicalplan .Option ,
) error {
iterOpts := &logicalplan .IterOptions {}
for _ , opt := range options {
opt (iterOpts )
}
ctx , span := t .tracer .Start (ctx , "Table/SchemaIterator" )
span .SetAttributes (attribute .Int ("physicalProjections" , len (iterOpts .PhysicalProjection )))
span .SetAttributes (attribute .Int ("projections" , len (iterOpts .Projection )))
span .SetAttributes (attribute .Int ("distinct" , len (iterOpts .DistinctColumns )))
defer span .End ()
if len (callbacks ) == 0 {
return errors .New ("no callbacks provided" )
}
rowGroups := make (chan any , len (callbacks )*4 )
defer func () {
for rg := range rowGroups {
switch t := rg .(type ) {
case index .ReleaseableRowGroup :
t .Release ()
case arrow .Record :
t .Release ()
}
}
}()
schema := arrow .NewSchema (
[]arrow .Field {
{Name : "name" , Type : arrow .BinaryTypes .String },
},
nil ,
)
errg , ctx := errgroup .WithContext (ctx )
for _ , callback := range callbacks {
callback := callback
errg .Go (recovery .Do (func () error {
for {
select {
case <- ctx .Done ():
return ctx .Err ()
case rg , ok := <- rowGroups :
if !ok {
return nil
}
b := array .NewRecordBuilder (pool , schema )
switch t := rg .(type ) {
case arrow .Record :
for i := 0 ; i < t .Schema ().NumFields (); i ++ {
b .Field (0 ).(*array .StringBuilder ).Append (t .Schema ().Field (i ).Name )
}
record := b .NewRecord ()
err := callback (ctx , record )
record .Release ()
t .Release ()
if err != nil {
return err
}
case index .ReleaseableRowGroup :
if rg == nil {
return errors .New ("received nil rowGroup" )
}
defer t .Release ()
parquetFields := t .Schema ().Fields ()
fieldNames := make ([]string , 0 , len (parquetFields ))
for _ , f := range parquetFields {
fieldNames = append (fieldNames , f .Name ())
}
b .Field (0 ).(*array .StringBuilder ).AppendValues (fieldNames , nil )
record := b .NewRecord ()
if err := callback (ctx , record ); err != nil {
return err
}
record .Release ()
b .Release ()
case dynparquet .DynamicRowGroup :
if rg == nil {
return errors .New ("received nil rowGroup" )
}
parquetFields := t .Schema ().Fields ()
fieldNames := make ([]string , 0 , len (parquetFields ))
for _ , f := range parquetFields {
fieldNames = append (fieldNames , f .Name ())
}
b .Field (0 ).(*array .StringBuilder ).AppendValues (fieldNames , nil )
record := b .NewRecord ()
if err := callback (ctx , record ); err != nil {
return err
}
record .Release ()
b .Release ()
default :
return fmt .Errorf ("unknown row group type: %T" , t )
}
}
}
}, t .logger ))
}
errg .Go (func () error {
if err := t .collectRowGroups (ctx , tx , iterOpts .Filter , iterOpts .ReadMode , rowGroups ); err != nil {
return err
}
close (rowGroups )
return nil
})
return errg .Wait ()
}
func generateULID() ulid .ULID {
t := time .Now ()
entropy := ulid .Monotonic (rand .New (rand .NewSource (t .UnixNano ())), 0 )
return ulid .MustNew (ulid .Timestamp (t ), entropy )
}
func newTableBlock(table *Table , prevTx , tx uint64 , id ulid .ULID ) (*TableBlock , error ) {
tb := &TableBlock {
table : table ,
mtx : &sync .RWMutex {},
ulid : id ,
logger : table .logger ,
tracer : table .tracer ,
minTx : tx ,
prevTx : prevTx ,
}
var err error
tb .index , err = index .NewLSM (
filepath .Join (table .db .indexDir (), table .name , id .String ()),
table .schema ,
table .IndexConfig (),
table .db .HighWatermark ,
index .LSMWithMetrics (&table .metrics .indexMetrics ),
index .LSMWithLogger (table .logger ),
)
if err != nil {
return nil , fmt .Errorf ("new LSM: %w" , err )
}
return tb , nil
}
func (t *TableBlock ) EnsureCompaction () error {
return t .index .EnsureCompaction ()
}
func (t *TableBlock ) InsertRecord (_ context .Context , tx uint64 , record arrow .Record ) error {
recordSize := util .TotalRecordSize (record )
defer func () {
t .table .metrics .rowsInserted .Add (float64 (record .NumRows ()))
t .table .metrics .rowInsertSize .Observe (float64 (record .NumRows ()))
t .table .metrics .rowBytesInserted .Add (float64 (recordSize ))
}()
if record .NumRows () == 0 {
t .table .metrics .zeroRowsInserted .Add (1 )
return nil
}
t .index .Add (tx , record )
t .table .metrics .numParts .Inc ()
t .uncompressedInsertsSize .Add (recordSize )
return nil
}
func (t *TableBlock ) Size () int64 {
return t .index .Size ()
}
func (t *TableBlock ) Index () *index .LSM {
return t .index
}
func (t *TableBlock ) Serialize (writer io .Writer ) error {
return t .index .Rotate (t .table .externalParquetCompaction (writer ))
}
type ParquetWriter interface {
Flush () error
WriteRows ([]parquet .Row ) (int , error )
io .Closer
}
type parquetRowWriter struct {
schema *dynparquet .Schema
w ParquetWriter
rowGroupSize int
maxNumRows int
rowGroupRowsWritten int
totalRowsWritten int
rowsBuf []parquet .Row
}
type parquetRowWriterOption func (p *parquetRowWriter )
func (t *TableBlock ) rowWriter (w ParquetWriter , options ...parquetRowWriterOption ) (*parquetRowWriter , error ) {
buffSize := 256
config := t .table .config .Load ()
if config .RowGroupSize > 0 {
buffSize = int (config .RowGroupSize )
}
p := &parquetRowWriter {
w : w ,
schema : t .table .schema ,
rowsBuf : make ([]parquet .Row , buffSize ),
rowGroupSize : int (config .RowGroupSize ),
}
for _ , option := range options {
option (p )
}
return p , nil
}
func (p *parquetRowWriter ) writeRows (rows parquet .RowReader ) (int , error ) {
written := 0
for p .maxNumRows == 0 || p .totalRowsWritten < p .maxNumRows {
if p .rowGroupSize > 0 && p .rowGroupRowsWritten +len (p .rowsBuf ) > p .rowGroupSize {
p .rowsBuf = p .rowsBuf [:p .rowGroupSize -p .rowGroupRowsWritten ]
}
if p .maxNumRows != 0 && p .totalRowsWritten +len (p .rowsBuf ) > p .maxNumRows {
p .rowsBuf = p .rowsBuf [:p .maxNumRows -p .totalRowsWritten ]
}
n , err := rows .ReadRows (p .rowsBuf )
if err != nil && err != io .EOF {
return 0 , err
}
if n == 0 {
break
}
if _, err = p .w .WriteRows (p .rowsBuf [:n ]); err != nil {
return 0 , err
}
written += n
p .rowGroupRowsWritten += n
p .totalRowsWritten += n
if p .rowGroupSize > 0 && p .rowGroupRowsWritten >= p .rowGroupSize {
if err := p .w .Flush (); err != nil {
return 0 , err
}
p .rowGroupRowsWritten = 0
}
}
return written , nil
}
func (p *parquetRowWriter ) close () error {
return p .w .Close ()
}
func (t *Table ) memoryBlocks () ([]*TableBlock , uint64 ) {
t .mtx .RLock ()
defer t .mtx .RUnlock ()
if t .active == nil {
return nil , 0
}
lastReadBlockTimestamp := t .active .ulid .Time ()
t .active .pendingReadersWg .Add (1 )
memoryBlocks := []*TableBlock {t .active }
for block := range t .pendingBlocks {
block .pendingReadersWg .Add (1 )
memoryBlocks = append (memoryBlocks , block )
if block .ulid .Time () < lastReadBlockTimestamp {
lastReadBlockTimestamp = block .ulid .Time ()
}
}
return memoryBlocks , lastReadBlockTimestamp
}
func (t *Table ) collectRowGroups (
ctx context .Context ,
tx uint64 ,
filterExpr logicalplan .Expr ,
readMode logicalplan .ReadMode ,
rowGroups chan <- any ,
) error {
ctx , span := t .tracer .Start (ctx , "Table/collectRowGroups" )
defer span .End ()
var lastBlockTimestamp uint64
if readMode != logicalplan .ReadModeDataSourcesOnly {
memoryBlocks , lbt := t .memoryBlocks ()
lastBlockTimestamp = lbt
defer func () {
for _ , block := range memoryBlocks {
block .pendingReadersWg .Done ()
}
}()
for _ , block := range memoryBlocks {
if err := block .index .Scan (ctx , "" , t .schema , filterExpr , tx , func (ctx context .Context , v any ) error {
select {
case <- ctx .Done ():
if rg , ok := v .(index .ReleaseableRowGroup ); ok {
rg .Release ()
}
return ctx .Err ()
case rowGroups <- v :
return nil
}
}); err != nil {
return err
}
}
}
if readMode == logicalplan .ReadModeInMemoryOnly {
return nil
}
for _ , source := range t .db .sources {
span .AddEvent (fmt .Sprintf ("source/%s" , source .String ()))
if err := source .Scan (ctx , filepath .Join (t .db .name , t .name ), t .schema , filterExpr , lastBlockTimestamp , func (ctx context .Context , v any ) error {
select {
case <- ctx .Done ():
if rg , ok := v .(index .ReleaseableRowGroup ); ok {
rg .Release ()
}
return ctx .Err ()
case rowGroups <- v :
return nil
}
}); err != nil {
return err
}
}
return nil
}
func (t *Table ) close () {
t .mtx .Lock ()
defer t .mtx .Unlock ()
t .active .pendingWritersWg .Wait ()
t .closing = true
t .active .index .WaitForPendingCompactions ()
}
func (t *Table ) externalParquetCompaction (writer io .Writer ) func (compact []parts .Part ) (parts .Part , int64 , int64 , error ) {
return func (compact []parts .Part ) (parts .Part , int64 , int64 , error ) {
size , err := t .compactParts (writer , compact )
if err != nil {
return nil , 0 , 0 , err
}
return nil , size , 0 , nil
}
}
func (t *Table ) compactParts (w io .Writer , compact []parts .Part , options ...parquet .WriterOption ) (int64 , error ) {
if len (compact ) == 0 {
return 0 , nil
}
preCompactionSize := int64 (0 )
for _ , p := range compact {
preCompactionSize += p .Size ()
}
if t .schema .UniquePrimaryIndex {
distinctRecords , err := t .distinctRecordsForCompaction (compact )
if err != nil {
return 0 , err
}
if distinctRecords != nil {
defer func () {
for _ , r := range distinctRecords {
r .Release ()
}
}()
return preCompactionSize , t .writeRecordsToParquet (w , distinctRecords , true , options ...)
}
}
bufs , err := t .buffersForCompaction (w , compact , options ...)
if err != nil {
return 0 , err
}
if bufs == nil {
return preCompactionSize , nil
}
merged , err := t .schema .MergeDynamicRowGroups (bufs )
if err != nil {
return 0 , err
}
err = func () error {
var writer dynparquet .ParquetWriter
if len (options ) > 0 {
writer , err = t .schema .NewWriter (w , merged .DynamicColumns (), false , options ...)
if err != nil {
return err
}
} else {
pw , err := t .schema .GetWriter (w , merged .DynamicColumns (), false )
if err != nil {
return err
}
defer t .schema .PutWriter (pw )
writer = pw .ParquetWriter
}
p , err := t .active .rowWriter (writer )
if err != nil {
return err
}
defer p .close ()
rows := merged .Rows ()
defer rows .Close ()
var rowReader parquet .RowReader = rows
if t .schema .UniquePrimaryIndex {
rowReader = parquet .DedupeRowReader (rows , merged .Schema ().Comparator (merged .SortingColumns ()...))
}
if _ , err := p .writeRows (rowReader ); err != nil {
return err
}
return nil
}()
if err != nil {
return 0 , err
}
return preCompactionSize , nil
}
func (t *Table ) buffersForCompaction (w io .Writer , inputParts []parts .Part , options ...parquet .WriterOption ) ([]dynparquet .DynamicRowGroup , error ) {
nonOverlappingParts , overlappingParts , err := parts .FindMaximumNonOverlappingSet (t .schema , inputParts )
if err != nil {
return nil , err
}
result := make ([]dynparquet .DynamicRowGroup , 0 , len (inputParts ))
for _ , p := range overlappingParts {
buf , err := p .AsSerializedBuffer (t .schema )
if err != nil {
return nil , err
}
result = append (result , buf .MultiDynamicRowGroup ())
}
if len (nonOverlappingParts ) == 0 {
return result , nil
}
allArrow := true
for _ , p := range nonOverlappingParts {
if p .Record () == nil {
allArrow = false
break
}
}
if len (nonOverlappingParts ) == 1 || !allArrow {
nonOverlappingRowGroups := make ([]dynparquet .DynamicRowGroup , 0 , len (nonOverlappingParts ))
for _ , p := range nonOverlappingParts {
buf , err := p .AsSerializedBuffer (t .schema )
if err != nil {
return nil , err
}
nonOverlappingRowGroups = append (nonOverlappingRowGroups , buf .MultiDynamicRowGroup ())
}
merged := nonOverlappingRowGroups [0 ]
if len (nonOverlappingRowGroups ) > 1 {
merged , err = t .schema .MergeDynamicRowGroups (nonOverlappingRowGroups , dynparquet .WithAlreadySorted ())
if err != nil {
return nil , err
}
}
result = append (result , merged )
return result , nil
}
var b bytes .Buffer
if len (overlappingParts ) > 0 {
w = &b
}
records := make ([]arrow .Record , 0 , len (nonOverlappingParts ))
for _ , p := range nonOverlappingParts {
records = append (records , p .Record ())
}
if err := t .writeRecordsToParquet (w , records , false , options ...); err != nil {
return nil , err
}
if len (overlappingParts ) == 0 {
return nil , nil
}
buf , err := dynparquet .ReaderFromBytes (b .Bytes ())
if err != nil {
return nil , err
}
result = append (result , buf .MultiDynamicRowGroup ())
return result , nil
}
func (t *Table ) writeRecordsToParquet (w io .Writer , records []arrow .Record , sortInput bool , options ...parquet .WriterOption ) error {
dynColSets := make ([]map [string ][]string , 0 , len (records ))
for _ , r := range records {
dynColSets = append (dynColSets , pqarrow .RecordDynamicCols (r ))
}
dynCols := dynparquet .MergeDynamicColumnSets (dynColSets )
var writer dynparquet .ParquetWriter
if len (options ) > 0 {
var err error
writer , err = t .schema .NewWriter (w , dynCols , false , options ...)
if err != nil {
return err
}
} else {
pw , err := t .schema .GetWriter (w , dynCols , sortInput )
if err != nil {
return err
}
defer t .schema .PutWriter (pw )
writer = pw
}
return pqarrow .RecordsToFile (t .schema , writer , records )
}
func (t *Table ) distinctRecordsForCompaction (compact []parts .Part ) ([]arrow .Record , error ) {
sortingCols := t .schema .ColumnDefinitionsForSortingColumns ()
columnExprs := make ([]logicalplan .Expr , 0 , len (sortingCols ))
for _ , col := range sortingCols {
var expr logicalplan .Expr
if col .Dynamic {
expr = logicalplan .DynCol (col .Name )
} else {
expr = logicalplan .Col (col .Name )
}
columnExprs = append (columnExprs , expr )
}
d := physicalplan .Distinct (memory .NewGoAllocator (), t .tracer , columnExprs )
output := physicalplan .OutputPlan {}
newRecords := make ([]arrow .Record , 0 )
output .SetNextCallback (func (_ context .Context , r arrow .Record ) error {
r .Retain ()
newRecords = append (newRecords , r )
return nil
})
d .SetNext (&output )
if ok , err := func () (bool , error ) {
ctx := context .TODO ()
for _ , p := range compact {
if p .Record () == nil {
return false , nil
}
if err := d .Callback (ctx , p .Record ()); err != nil {
return false , err
}
}
if err := d .Finish (ctx ); err != nil {
return false , err
}
return true , nil
}(); !ok || err != nil {
for _ , r := range newRecords {
r .Release ()
}
return nil , err
}
return newRecords , nil
}
func (t *Table ) IndexConfig () []*index .LevelConfig {
config := make ([]*index .LevelConfig , 0 , len (t .db .columnStore .indexConfig ))
for i , c := range t .db .columnStore .indexConfig {
compactFunc := t .compactParts
if i == len (t .db .columnStore .indexConfig )-1 {
compactFunc = nil
}
config = append (config , &index .LevelConfig {
Level : c .Level ,
MaxSize : c .MaxSize ,
Type : c .Type ,
Compact : compactFunc ,
})
}
return config
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .