package frostdb
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/util"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"github.com/polarsignals/frostdb/dynparquet"
schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
schemav2pb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha2"
tablepb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/table/v1alpha1"
walpb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/wal/v1alpha1"
"github.com/polarsignals/frostdb/index"
"github.com/polarsignals/frostdb/parts"
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/wal"
)
const (
B = 1
KiB = 1024 * B
MiB = 1024 * KiB
GiB = 1024 * MiB
TiB = 1024 * GiB
)
type ColumnStore struct {
mtx sync .RWMutex
dbs map [string ]*DB
dbReplaysInProgress map [string ]chan struct {}
reg prometheus .Registerer
logger log .Logger
tracer trace .Tracer
activeMemorySize int64
storagePath string
enableWAL bool
manualBlockRotation bool
snapshotTriggerSize int64
metrics globalMetrics
recoveryConcurrency int
indexDegree int
splitSize int
indexConfig []*index .LevelConfig
sources []DataSource
sinks []DataSink
compactAfterRecovery bool
compactAfterRecoveryTableNames []string
testingOptions struct {
disableReclaimDiskSpaceOnSnapshot bool
walTestingOptions []wal .Option
}
}
type Option func (*ColumnStore ) error
func New (
options ...Option ,
) (*ColumnStore , error ) {
s := &ColumnStore {
dbs : make (map [string ]*DB ),
dbReplaysInProgress : make (map [string ]chan struct {}),
reg : prometheus .NewRegistry (),
logger : log .NewNopLogger (),
tracer : noop .NewTracerProvider ().Tracer ("" ),
indexConfig : DefaultIndexConfig (),
indexDegree : 2 ,
splitSize : 2 ,
activeMemorySize : 512 * MiB ,
}
for _ , option := range options {
if err := option (s ); err != nil {
return nil , err
}
}
s .reg .MustRegister (&collector {s : s })
s .metrics = makeAndRegisterGlobalMetrics (s .reg )
if s .enableWAL && s .storagePath == "" {
return nil , fmt .Errorf ("storage path must be configured if WAL is enabled" )
}
for _ , cfg := range s .indexConfig {
if cfg .Type == index .CompactionTypeParquetDisk {
if !s .enableWAL || s .storagePath == "" {
return nil , fmt .Errorf ("persistent disk compaction requires WAL and storage path to be enabled" )
}
}
}
if err := s .recoverDBsFromStorage (context .Background ()); err != nil {
return nil , err
}
return s , nil
}
func WithLogger (logger log .Logger ) Option {
return func (s *ColumnStore ) error {
s .logger = logger
return nil
}
}
func WithTracer (tracer trace .Tracer ) Option {
return func (s *ColumnStore ) error {
s .tracer = tracer
return nil
}
}
func WithRegistry (reg prometheus .Registerer ) Option {
return func (s *ColumnStore ) error {
s .reg = reg
return nil
}
}
func WithActiveMemorySize (size int64 ) Option {
return func (s *ColumnStore ) error {
s .activeMemorySize = size
return nil
}
}
func WithIndexDegree (indexDegree int ) Option {
return func (s *ColumnStore ) error {
s .indexDegree = indexDegree
return nil
}
}
func WithSplitSize (size int ) Option {
return func (s *ColumnStore ) error {
s .splitSize = size
return nil
}
}
func WithReadWriteStorage (ds DataSinkSource ) Option {
return func (s *ColumnStore ) error {
s .sources = append (s .sources , ds )
s .sinks = append (s .sinks , ds )
return nil
}
}
func WithReadOnlyStorage (ds DataSource ) Option {
return func (s *ColumnStore ) error {
s .sources = append (s .sources , ds )
return nil
}
}
func WithWriteOnlyStorage (ds DataSink ) Option {
return func (s *ColumnStore ) error {
s .sinks = append (s .sinks , ds )
return nil
}
}
func WithManualBlockRotation () Option {
return func (s *ColumnStore ) error {
s .manualBlockRotation = true
return nil
}
}
func WithWAL () Option {
return func (s *ColumnStore ) error {
s .enableWAL = true
return nil
}
}
func WithStoragePath (path string ) Option {
return func (s *ColumnStore ) error {
s .storagePath = path
return nil
}
}
func WithIndexConfig (indexConfig []*index .LevelConfig ) Option {
return func (s *ColumnStore ) error {
s .indexConfig = indexConfig
return nil
}
}
func WithCompactionAfterRecovery (tableNames []string ) Option {
return func (s *ColumnStore ) error {
s .compactAfterRecovery = true
s .compactAfterRecoveryTableNames = tableNames
return nil
}
}
func WithSnapshotTriggerSize (size int64 ) Option {
return func (s *ColumnStore ) error {
s .snapshotTriggerSize = size
return nil
}
}
func WithRecoveryConcurrency (concurrency int ) Option {
return func (s *ColumnStore ) error {
s .recoveryConcurrency = concurrency
return nil
}
}
func (s *ColumnStore ) Close () error {
s .mtx .Lock ()
defer s .mtx .Unlock ()
s .metrics .shutdownStarted .Inc ()
defer s .metrics .shutdownCompleted .Inc ()
defer func (ts time .Time ) {
s .metrics .shutdownDuration .Observe (float64 (time .Since (ts )))
}(time .Now ())
errg := &errgroup .Group {}
errg .SetLimit (runtime .GOMAXPROCS (0 ))
for _ , db := range s .dbs {
toClose := db
errg .Go (func () error {
err := toClose .Close ()
if err != nil {
level .Error (s .logger ).Log ("msg" , "error closing DB" , "db" , toClose .name , "err" , err )
}
return err
})
}
return errg .Wait ()
}
func (s *ColumnStore ) DatabasesDir () string {
return filepath .Join (s .storagePath , "databases" )
}
func (s *ColumnStore ) recoverDBsFromStorage (ctx context .Context ) error {
if !s .enableWAL {
return nil
}
dir := s .DatabasesDir ()
if _ , err := os .Stat (dir ); err != nil {
if os .IsNotExist (err ) {
level .Debug (s .logger ).Log ("msg" , "WAL directory does not exist, no WAL to replay" )
return nil
}
return err
}
files , err := os .ReadDir (dir )
if err != nil {
return err
}
g , ctx := errgroup .WithContext (ctx )
if s .recoveryConcurrency == 0 {
s .recoveryConcurrency = runtime .GOMAXPROCS (0 )
}
g .SetLimit (s .recoveryConcurrency )
for _ , f := range files {
databaseName := f .Name ()
g .Go (func () error {
_ , err := s .DB (ctx ,
databaseName ,
WithCompactionAfterOpen (
s .compactAfterRecovery , s .compactAfterRecoveryTableNames ,
),
)
return err
})
}
return g .Wait ()
}
type DB struct {
columnStore *ColumnStore
logger log .Logger
tracer trace .Tracer
name string
mtx *sync .RWMutex
roTables map [string ]*Table
tables map [string ]*Table
storagePath string
wal WAL
sources []DataSource
sinks []DataSink
tx atomic .Uint64
highWatermark atomic .Uint64
txPool *TxPool
compactAfterRecovery bool
compactAfterRecoveryTableNames []string
snapshotInProgress atomic .Bool
metrics snapshotMetrics
metricsProvider tableMetricsProvider
}
type DataSinkSource interface {
DataSink
DataSource
}
type DataSource interface {
fmt .Stringer
Scan (ctx context .Context , prefix string , schema *dynparquet .Schema , filter logicalplan .Expr , lastBlockTimestamp uint64 , callback func (context .Context , any ) error ) error
Prefixes (ctx context .Context , prefix string ) ([]string , error )
}
type DataSink interface {
fmt .Stringer
Upload (ctx context .Context , name string , r io .Reader ) error
Delete (ctx context .Context , name string ) error
}
type DBOption func (*DB ) error
func WithCompactionAfterOpen (compact bool , tableNames []string ) DBOption {
return func (db *DB ) error {
db .compactAfterRecovery = compact
db .compactAfterRecoveryTableNames = tableNames
return nil
}
}
func (s *ColumnStore ) DB (ctx context .Context , name string , opts ...DBOption ) (*DB , error ) {
if !validateName (name ) {
return nil , errors .New ("invalid database name" )
}
applyOptsToDB := func (db *DB ) error {
db .mtx .Lock ()
defer db .mtx .Unlock ()
for _ , opt := range opts {
if err := opt (db ); err != nil {
return err
}
}
return nil
}
s .mtx .RLock ()
db , ok := s .dbs [name ]
s .mtx .RUnlock ()
if ok {
if err := applyOptsToDB (db ); err != nil {
return nil , err
}
return db , nil
}
s .mtx .Lock ()
defer s .mtx .Unlock ()
for {
db , ok = s .dbs [name ]
if ok {
if err := applyOptsToDB (db ); err != nil {
return nil , err
}
return db , nil
}
waitForReplay , ok := s .dbReplaysInProgress [name ]
if !ok {
break
}
s .mtx .Unlock ()
<-waitForReplay
s .mtx .Lock ()
}
logger := log .WithPrefix (s .logger , "db" , name )
db = &DB {
columnStore : s ,
name : name ,
mtx : &sync .RWMutex {},
tables : map [string ]*Table {},
roTables : map [string ]*Table {},
logger : logger ,
tracer : s .tracer ,
wal : &wal .NopWAL {},
sources : s .sources ,
sinks : s .sinks ,
metrics : s .metrics .snapshotMetricsForDB (name ),
metricsProvider : tableMetricsProvider {dbName : name , m : s .metrics },
}
if s .storagePath != "" {
db .storagePath = filepath .Join (s .DatabasesDir (), name )
}
if err := applyOptsToDB (db ); err != nil {
return nil , err
}
if dbSetupErr := func () error {
if db .storagePath != "" {
if err := os .RemoveAll (db .trashDir ()); err != nil {
return err
}
if err := os .RemoveAll (db .indexDir ()); err != nil {
return err
}
}
db .txPool = NewTxPool (&db .highWatermark )
if len (db .sources ) != 0 {
for _ , source := range db .sources {
prefixes , err := source .Prefixes (ctx , name )
if err != nil {
return err
}
for _ , prefix := range prefixes {
_ , err := db .readOnlyTable (prefix )
if err != nil {
return err
}
}
}
}
if s .enableWAL {
if err := func () error {
s .dbReplaysInProgress [name ] = make (chan struct {})
s .mtx .Unlock ()
defer func () {
s .mtx .Lock ()
close (s .dbReplaysInProgress [name ])
delete (s .dbReplaysInProgress , name )
}()
var err error
db .wal , err = db .openWAL (
ctx ,
append (
[]wal .Option {
wal .WithMetrics (s .metrics .metricsForFileWAL (name )),
wal .WithStoreMetrics (s .metrics .metricsForWAL (name )),
}, s .testingOptions .walTestingOptions ...,
)...,
)
return err
}(); err != nil {
return err
}
for _ , table := range db .tables {
if !table .config .Load ().DisableWal {
table .wal = db .wal
}
}
for _ , table := range db .roTables {
if !table .config .Load ().DisableWal {
table .wal = db .wal
}
}
}
return nil
}(); dbSetupErr != nil {
level .Warn (s .logger ).Log (
"msg" , "error setting up db" ,
"name" , name ,
"err" , dbSetupErr ,
)
_ = db .closeInternal ()
return nil , dbSetupErr
}
if db .compactAfterRecovery {
tables := db .compactAfterRecoveryTableNames
if len (tables ) == 0 {
tables = maps .Keys (db .tables )
}
for _ , name := range tables {
tbl , err := db .GetTable (name )
if err != nil {
level .Warn (db .logger ).Log ("msg" , "get table during db setup" , "err" , err )
continue
}
start := time .Now ()
if err := tbl .EnsureCompaction (); err != nil {
level .Warn (db .logger ).Log ("msg" , "compaction during setup" , "err" , err )
}
level .Info (db .logger ).Log (
"msg" , "compacted table after recovery" , "table" , name , "took" , time .Since (start ),
)
}
}
s .dbs [name ] = db
return db , nil
}
func (s *ColumnStore ) DBs () []string {
s .mtx .RLock ()
defer s .mtx .RUnlock ()
return maps .Keys (s .dbs )
}
func (s *ColumnStore ) GetDB (name string ) (*DB , error ) {
s .mtx .RLock ()
defer s .mtx .RUnlock ()
db , ok := s .dbs [name ]
if !ok {
return nil , fmt .Errorf ("db %s not found" , name )
}
return db , nil
}
func (s *ColumnStore ) DropDB (name string ) error {
db , err := s .GetDB (name )
if err != nil {
return err
}
if err := db .Close (WithClearStorage ()); err != nil {
return err
}
s .mtx .Lock ()
defer s .mtx .Unlock ()
delete (s .dbs , name )
return os .RemoveAll (filepath .Join (s .DatabasesDir (), name ))
}
func (db *DB ) openWAL (ctx context .Context , opts ...wal .Option ) (WAL , error ) {
wal , err := wal .Open (
db .logger ,
db .walDir (),
opts ...,
)
if err != nil {
return nil , err
}
if err := db .recover (ctx , wal ); err != nil {
return nil , err
}
wal .RunAsync ()
return wal , nil
}
const (
walPath = "wal"
snapshotsPath = "snapshots"
indexPath = "index"
trashPath = "trash"
)
func (db *DB ) walDir () string {
return filepath .Join (db .storagePath , walPath )
}
func (db *DB ) snapshotsDir () string {
return filepath .Join (db .storagePath , snapshotsPath )
}
func (db *DB ) trashDir () string {
return filepath .Join (db .storagePath , trashPath )
}
func (db *DB ) indexDir () string {
return filepath .Join (db .storagePath , indexPath )
}
func (db *DB ) recover (ctx context .Context , wal WAL ) error {
level .Info (db .logger ).Log (
"msg" , "recovering db" ,
"name" , db .name ,
)
snapshotLoadStart := time .Now ()
snapshotTx , err := db .loadLatestSnapshot (ctx )
if err != nil {
level .Info (db .logger ).Log (
"msg" , "failed to load latest snapshot" , "db" , db .name , "err" , err ,
)
snapshotTx = 0
}
snapshotLogArgs := make ([]any , 0 )
if snapshotTx != 0 {
snapshotLogArgs = append (
snapshotLogArgs ,
"snapshot_tx" , snapshotTx ,
"snapshot_load_duration" , time .Since (snapshotLoadStart ),
)
if err := db .cleanupSnapshotDir (ctx , snapshotTx ); err != nil {
level .Info (db .logger ).Log (
"msg" , "failed to truncate snapshots not equal to loaded snapshot" ,
"err" , err ,
"snapshot_tx" , snapshotTx ,
)
}
if err := wal .Truncate (snapshotTx + 1 ); err != nil {
level .Info (db .logger ).Log (
"msg" , "failed to truncate WAL after loading snapshot" ,
"err" , err ,
"snapshot_tx" , snapshotTx ,
)
}
}
persistedTables := make (map [string ]uint64 )
var lastTx uint64
start := time .Now ()
if err := wal .Replay (snapshotTx +1 , func (_ uint64 , record *walpb .Record ) error {
if err := ctx .Err (); err != nil {
return err
}
switch e := record .Entry .EntryType .(type ) {
case *walpb .Entry_TableBlockPersisted_ :
persistedTables [e .TableBlockPersisted .TableName ] = e .TableBlockPersisted .NextTx
return nil
default :
return nil
}
}); err != nil {
return err
}
performSnapshot := false
if err := wal .Replay (snapshotTx +1 , func (tx uint64 , record *walpb .Record ) error {
if err := ctx .Err (); err != nil {
return err
}
lastTx = tx
switch e := record .Entry .EntryType .(type ) {
case *walpb .Entry_NewTableBlock_ :
entry := e .NewTableBlock
var schema proto .Message
switch v := entry .Config .Schema .(type ) {
case *tablepb .TableConfig_DeprecatedSchema :
schema = v .DeprecatedSchema
case *tablepb .TableConfig_SchemaV2 :
schema = v .SchemaV2
default :
return fmt .Errorf ("unhandled schema type: %T" , v )
}
var id ulid .ULID
if err := id .UnmarshalBinary (entry .BlockId ); err != nil {
return err
}
nextNonPersistedTxn , wasPersisted := persistedTables [entry .TableName ]
if wasPersisted && tx < nextNonPersistedTxn {
return nil
}
tableName := entry .TableName
table , err := db .GetTable (tableName )
var tableErr ErrTableNotFound
if errors .As (err , &tableErr ) {
return func () error {
db .mtx .Lock ()
defer db .mtx .Unlock ()
config := NewTableConfig (schema , FromConfig (entry .Config ))
if _ , ok := db .roTables [tableName ]; ok {
table , err = db .promoteReadOnlyTableLocked (tableName , config )
if err != nil {
return fmt .Errorf ("promoting read only table: %w" , err )
}
} else {
table , err = newTable (
db ,
tableName ,
config ,
db .metricsProvider .metricsForTable (tableName ),
db .logger ,
db .tracer ,
wal ,
)
if err != nil {
return fmt .Errorf ("instantiate table: %w" , err )
}
}
table .active , err = newTableBlock (table , 0 , tx , id )
if err != nil {
return err
}
db .tables [tableName ] = table
return nil
}()
}
if err != nil {
return fmt .Errorf ("get table: %w" , err )
}
level .Info (db .logger ).Log (
"msg" , "writing unfinished block in recovery" ,
"table" , tableName ,
"tx" , tx ,
)
if snapshotTx == 0 || tx != nextNonPersistedTxn {
table .pendingBlocks [table .active ] = struct {}{}
go table .writeBlock (table .active , tx , db .columnStore .manualBlockRotation )
}
protoEqual := false
switch schema .(type ) {
case *schemav2pb .Schema :
protoEqual = proto .Equal (schema , table .config .Load ().GetSchemaV2 ())
case *schemapb .Schema :
protoEqual = proto .Equal (schema , table .config .Load ().GetDeprecatedSchema ())
}
if !protoEqual {
schema , err := dynparquet .SchemaFromDefinition (schema )
if err != nil {
return fmt .Errorf ("initialize schema: %w" , err )
}
table .schema = schema
}
table .active , err = newTableBlock (table , table .active .minTx , tx , id )
if err != nil {
return err
}
case *walpb .Entry_Write_ :
entry := e .Write
tableName := entry .TableName
if lastPersistedTx , ok := persistedTables [tableName ]; ok && tx < lastPersistedTx {
return nil
}
table , err := db .GetTable (tableName )
var tableErr ErrTableNotFound
if errors .As (err , &tableErr ) {
return nil
}
if err != nil {
return fmt .Errorf ("get table: %w" , err )
}
switch e .Write .Arrow {
case true :
reader , err := ipc .NewReader (bytes .NewReader (entry .Data ))
if err != nil {
return fmt .Errorf ("create ipc reader: %w" , err )
}
record , err := reader .Read ()
if err != nil {
return fmt .Errorf ("read record: %w" , err )
}
defer reader .Release ()
size := util .TotalRecordSize (record )
table .active .index .InsertPart (parts .NewArrowPart (tx , record , uint64 (size ), table .schema , parts .WithCompactionLevel (int (index .L0 ))))
default :
panic ("parquet writes are deprecated" )
}
return nil
case *walpb .Entry_TableBlockPersisted_ :
performSnapshot = true
return nil
case *walpb .Entry_Snapshot_ :
return nil
default :
return fmt .Errorf ("unexpected WAL entry type: %t" , e )
}
return nil
}); err != nil {
return err
}
resetTxn := snapshotTx
if lastTx > resetTxn {
resetTxn = lastTx
}
db .mtx .Lock ()
for _ , table := range db .tables {
block := table .ActiveBlock ()
block .uncompressedInsertsSize .Store (block .Index ().LevelSize (index .L0 ))
}
db .mtx .Unlock ()
db .resetToTxn (resetTxn , nil )
if performSnapshot && db .columnStore .snapshotTriggerSize != 0 {
level .Info (db .logger ).Log (
"msg" , "performing snapshot after recovery" ,
)
db .snapshot (ctx , false , func () {
if err := db .reclaimDiskSpace (ctx , wal ); err != nil {
level .Error (db .logger ).Log (
"msg" , "failed to reclaim disk space after snapshot during recovery" ,
"err" , err ,
)
}
})
}
level .Info (db .logger ).Log (
append (
[]any {
"msg" , "db recovered" ,
"wal_replay_duration" , time .Since (start ),
"watermark" , resetTxn ,
},
snapshotLogArgs ...,
)...,
)
return nil
}
type CloseOption func (*closeOptions )
type closeOptions struct {
clearStorage bool
}
func WithClearStorage () CloseOption {
return func (o *closeOptions ) {
o .clearStorage = true
}
}
func (db *DB ) Close (options ...CloseOption ) error {
opts := &closeOptions {}
for _ , opt := range options {
opt (opts )
}
shouldPersist := len (db .sinks ) > 0 && !db .columnStore .manualBlockRotation
if !shouldPersist && db .columnStore .snapshotTriggerSize != 0 && !opts .clearStorage {
start := time .Now ()
db .snapshot (context .Background (), false , func () {
level .Info (db .logger ).Log ("msg" , "snapshot on close completed" , "duration" , time .Since (start ))
if err := db .reclaimDiskSpace (context .Background (), nil ); err != nil {
level .Error (db .logger ).Log (
"msg" , "failed to reclaim disk space after snapshot" ,
"err" , err ,
)
}
})
}
level .Info (db .logger ).Log ("msg" , "closing DB" )
for _ , table := range db .tables {
table .close ()
if shouldPersist {
table .writeBlock (table .ActiveBlock (), db .tx .Load (), false )
}
}
level .Info (db .logger ).Log ("msg" , "closed all tables" )
if err := db .closeInternal (); err != nil {
return err
}
if (shouldPersist || opts .clearStorage ) && db .storagePath != "" {
if err := db .dropStorage (); err != nil {
return err
}
level .Info (db .logger ).Log ("msg" , "cleaned up wal & snapshots" )
}
return nil
}
func (db *DB ) closeInternal () error {
defer func () {
if db .txPool != nil {
db .txPool .Stop ()
}
}()
if !db .columnStore .enableWAL || db .wal == nil {
return nil
}
return db .wal .Close ()
}
func (db *DB ) maintainWAL () {
if minTx := db .getMinTXPersisted (); minTx > 0 {
if err := db .wal .Truncate (minTx ); err != nil {
return
}
}
}
func (db *DB ) reclaimDiskSpace (ctx context .Context , wal WAL ) error {
if db .columnStore .testingOptions .disableReclaimDiskSpaceOnSnapshot {
return nil
}
validSnapshotTxn , err := db .getLatestValidSnapshotTxn (ctx )
if err != nil {
return err
}
if validSnapshotTxn == 0 {
return nil
}
if err := db .cleanupSnapshotDir (ctx , validSnapshotTxn ); err != nil {
return err
}
if wal == nil {
wal = db .wal
}
return wal .Truncate (validSnapshotTxn + 1 )
}
func (db *DB ) getMinTXPersisted () uint64 {
db .mtx .RLock ()
defer db .mtx .RUnlock ()
minTx := uint64 (math .MaxUint64 )
for _ , table := range db .tables {
table .mtx .RLock ()
tableMinTxPersisted := table .lastCompleted
table .mtx .RUnlock ()
if tableMinTxPersisted < minTx {
minTx = tableMinTxPersisted
}
}
return minTx
}
func (db *DB ) readOnlyTable (name string ) (*Table , error ) {
table , ok := db .tables [name ]
if ok {
return table , nil
}
table , err := newTable (
db ,
name ,
nil ,
db .metricsProvider .metricsForTable (name ),
db .logger ,
db .tracer ,
db .wal ,
)
if err != nil {
return nil , fmt .Errorf ("failed to create table: %w" , err )
}
db .roTables [name ] = table
return table , nil
}
func (db *DB ) promoteReadOnlyTableLocked (name string , config *tablepb .TableConfig ) (*Table , error ) {
table , ok := db .roTables [name ]
if !ok {
return nil , fmt .Errorf ("read only table %s not found" , name )
}
schema , err := schemaFromTableConfig (config )
if err != nil {
return nil , err
}
table .config .Store (config )
table .schema = schema
delete (db .roTables , name )
return table , nil
}
func (db *DB ) Table (name string , config *tablepb .TableConfig ) (*Table , error ) {
return db .table (name , config , generateULID ())
}
func (db *DB ) table (name string , config *tablepb .TableConfig , id ulid .ULID ) (*Table , error ) {
if config == nil {
return nil , fmt .Errorf ("table config cannot be nil" )
}
if !validateName (name ) {
return nil , errors .New ("invalid table name" )
}
db .mtx .RLock ()
table , ok := db .tables [name ]
db .mtx .RUnlock ()
if ok {
table .config .Store (config )
return table , nil
}
db .mtx .Lock ()
defer db .mtx .Unlock ()
table , ok = db .tables [name ]
if ok {
return table , nil
}
if _ , ok := db .roTables [name ]; ok {
var err error
table , err = db .promoteReadOnlyTableLocked (name , config )
if err != nil {
return nil , err
}
} else {
var err error
table , err = newTable (
db ,
name ,
config ,
db .metricsProvider .metricsForTable (name ),
db .logger ,
db .tracer ,
db .wal ,
)
if err != nil {
return nil , fmt .Errorf ("failed to create table: %w" , err )
}
}
tx , _ , commit := db .begin ()
defer commit ()
if err := table .newTableBlock (0 , tx , id ); err != nil {
return nil , err
}
db .tables [name ] = table
return table , nil
}
type ErrTableNotFound struct {
TableName string
}
func (e ErrTableNotFound ) Error () string {
return fmt .Sprintf ("table %s not found" , e .TableName )
}
func (db *DB ) GetTable (name string ) (*Table , error ) {
db .mtx .RLock ()
table , ok := db .tables [name ]
db .mtx .RUnlock ()
if !ok {
return nil , ErrTableNotFound {TableName : name }
}
return table , nil
}
func (db *DB ) TableProvider () *DBTableProvider {
return NewDBTableProvider (db )
}
func (db *DB ) TableNames () []string {
db .mtx .RLock ()
tables := maps .Keys (db .tables )
db .mtx .RUnlock ()
return tables
}
type DBTableProvider struct {
db *DB
}
func NewDBTableProvider (db *DB ) *DBTableProvider {
return &DBTableProvider {
db : db ,
}
}
func (p *DBTableProvider ) GetTable (name string ) (logicalplan .TableReader , error ) {
p .db .mtx .RLock ()
defer p .db .mtx .RUnlock ()
tbl , ok := p .db .tables [name ]
if ok {
return tbl , nil
}
tbl , ok = p .db .roTables [name ]
if ok {
return tbl , nil
}
return nil , fmt .Errorf ("table %v not found" , name )
}
func (db *DB ) beginRead () uint64 {
return db .highWatermark .Load ()
}
func (db *DB ) begin () (uint64 , uint64 , func ()) {
txn := db .tx .Add (1 )
watermark := db .highWatermark .Load ()
return txn , watermark , func () {
if mark := db .highWatermark .Load (); mark +1 == txn {
db .highWatermark .Store (txn )
db .txPool .notifyWatermark ()
return
}
db .txPool .Insert (txn )
}
}
func (db *DB ) Wait (tx uint64 ) {
for {
if db .highWatermark .Load () >= tx {
return
}
time .Sleep (10 * time .Millisecond )
}
}
func (db *DB ) HighWatermark () uint64 {
return db .highWatermark .Load ()
}
func (db *DB ) resetToTxn (txn uint64 , wal WAL ) {
db .tx .Store (txn )
db .highWatermark .Store (txn )
if wal != nil {
if err := wal .Reset (txn + 1 ); err != nil {
level .Warn (db .logger ).Log (
"msg" , "failed to reset WAL when resetting DB to txn" ,
"txnID" , txn ,
"err" , err ,
)
}
}
}
func validateName(name string ) bool {
return !strings .Contains (name , "/" )
}
func (db *DB ) dropStorage () error {
trashDir := db .trashDir ()
entries , err := os .ReadDir (db .storagePath )
if err != nil {
if os .IsNotExist (err ) {
return nil
}
return err
}
if moveErr := func () error {
if err := os .MkdirAll (trashDir , os .FileMode (0o755 )); err != nil {
return fmt .Errorf ("making trash dir: %w" , err )
}
tmpPath , err := os .MkdirTemp (trashDir , "" )
if err != nil {
return err
}
errs := make ([]error , 0 , len (entries ))
for _ , e := range entries {
if err := os .Rename (filepath .Join (db .storagePath , e .Name ()), filepath .Join (tmpPath , e .Name ())); err != nil && !os .IsNotExist (err ) {
errs = append (errs , err )
}
}
return errors .Join (errs ...)
}(); moveErr != nil {
errs := make ([]error , 0 , len (entries ))
for _ , e := range entries {
if err := os .RemoveAll (filepath .Join (db .storagePath , e .Name ())); err != nil {
errs = append (errs , err )
}
}
return errors .Join (errs ...)
}
return os .RemoveAll (trashDir )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .