package frostdb

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
	

	
	schemapb 
	schemav2pb 
	tablepb 
	walpb 
	
	
	
	
)

const (
	B   = 1
	KiB = 1024 * B
	MiB = 1024 * KiB
	GiB = 1024 * MiB
	TiB = 1024 * GiB
)

type ColumnStore struct {
	mtx                 sync.RWMutex
	dbs                 map[string]*DB
	dbReplaysInProgress map[string]chan struct{}
	reg                 prometheus.Registerer
	logger              log.Logger
	tracer              trace.Tracer
	activeMemorySize    int64
	storagePath         string
	enableWAL           bool
	manualBlockRotation bool
	snapshotTriggerSize int64
	metrics             globalMetrics
	recoveryConcurrency int

	// indexDegree is the degree of the btree index (default = 2)
	indexDegree int
	// splitSize is the number of new granules that are created when granules are split (default =2)
	splitSize int
	// indexConfig is the configuration settings for the lsm index
	indexConfig []*index.LevelConfig

	sources []DataSource
	sinks   []DataSink

	compactAfterRecovery           bool
	compactAfterRecoveryTableNames []string

	// testingOptions are options only used for testing purposes.
	testingOptions struct {
		disableReclaimDiskSpaceOnSnapshot bool
		walTestingOptions                 []wal.Option
	}
}

type Option func(*ColumnStore) error

func (
	 ...Option,
) (*ColumnStore, error) {
	 := &ColumnStore{
		dbs:                 make(map[string]*DB),
		dbReplaysInProgress: make(map[string]chan struct{}),
		reg:                 prometheus.NewRegistry(),
		logger:              log.NewNopLogger(),
		tracer:              noop.NewTracerProvider().Tracer(""),
		indexConfig:         DefaultIndexConfig(),
		indexDegree:         2,
		splitSize:           2,
		activeMemorySize:    512 * MiB,
	}

	for ,  := range  {
		if  := ();  != nil {
			return nil, 
		}
	}

	// Register metrics that are updated by the collector.
	.reg.MustRegister(&collector{s: })
	.metrics = makeAndRegisterGlobalMetrics(.reg)

	if .enableWAL && .storagePath == "" {
		return nil, fmt.Errorf("storage path must be configured if WAL is enabled")
	}

	for ,  := range .indexConfig {
		if .Type == index.CompactionTypeParquetDisk {
			if !.enableWAL || .storagePath == "" {
				return nil, fmt.Errorf("persistent disk compaction requires WAL and storage path to be enabled")
			}
		}
	}

	if  := .recoverDBsFromStorage(context.Background());  != nil {
		return nil, 
	}

	return , nil
}

func ( log.Logger) Option {
	return func( *ColumnStore) error {
		.logger = 
		return nil
	}
}

func ( trace.Tracer) Option {
	return func( *ColumnStore) error {
		.tracer = 
		return nil
	}
}

func ( prometheus.Registerer) Option {
	return func( *ColumnStore) error {
		.reg = 
		return nil
	}
}

func ( int64) Option {
	return func( *ColumnStore) error {
		.activeMemorySize = 
		return nil
	}
}

func ( int) Option {
	return func( *ColumnStore) error {
		.indexDegree = 
		return nil
	}
}

func ( int) Option {
	return func( *ColumnStore) error {
		.splitSize = 
		return nil
	}
}

func ( DataSinkSource) Option {
	return func( *ColumnStore) error {
		.sources = append(.sources, )
		.sinks = append(.sinks, )
		return nil
	}
}

func ( DataSource) Option {
	return func( *ColumnStore) error {
		.sources = append(.sources, )
		return nil
	}
}

func ( DataSink) Option {
	return func( *ColumnStore) error {
		.sinks = append(.sinks, )
		return nil
	}
}

func () Option {
	return func( *ColumnStore) error {
		.manualBlockRotation = true
		return nil
	}
}

func () Option {
	return func( *ColumnStore) error {
		.enableWAL = true
		return nil
	}
}

func ( string) Option {
	return func( *ColumnStore) error {
		.storagePath = 
		return nil
	}
}

func ( []*index.LevelConfig) Option {
	return func( *ColumnStore) error {
		.indexConfig = 
		return nil
	}
}

func ( []string) Option {
	return func( *ColumnStore) error {
		.compactAfterRecovery = true
		.compactAfterRecoveryTableNames = 
		return nil
	}
}

// WithSnapshotTriggerSize specifies a size in bytes of uncompressed inserts
// that will trigger a snapshot of the whole database. This can be larger than
// the active memory size given that the active memory size tracks the size of
// *compressed* data, while snapshots are triggered based on the *uncompressed*
// data inserted into the database. The reason this choice was made is that
// if a database instance crashes, it is forced to reread all uncompressed
// inserts since the last snapshot from the WAL, which could potentially lead
// to unrecoverable OOMs on startup. Defining the snapshot trigger in terms of
// uncompressed bytes limits the memory usage on recovery to at most the
// snapshot trigger size (as long as snapshots were successful).
// If 0, snapshots are disabled. Note that snapshots (if enabled) are also
// triggered on block rotation of any database table.
// Snapshots are complementary to the WAL and will also be disabled if the WAL
// is disabled.
func ( int64) Option {
	return func( *ColumnStore) error {
		.snapshotTriggerSize = 
		return nil
	}
}

// WithRecoveryConcurrency limits the number of databases that are recovered
// simultaneously when calling frostdb.New. This helps limit memory usage on
// recovery.
func ( int) Option {
	return func( *ColumnStore) error {
		.recoveryConcurrency = 
		return nil
	}
}

// Close persists all data from the columnstore to storage.
// It is no longer valid to use the coumnstore for reads or writes, and the object should not longer be reused.
func ( *ColumnStore) () error {
	.mtx.Lock()
	defer .mtx.Unlock()
	.metrics.shutdownStarted.Inc()
	defer .metrics.shutdownCompleted.Inc()
	defer func( time.Time) {
		.metrics.shutdownDuration.Observe(float64(time.Since()))
	}(time.Now())

	 := &errgroup.Group{}
	.SetLimit(runtime.GOMAXPROCS(0))
	for ,  := range .dbs {
		 := 
		.Go(func() error {
			 := .Close()
			if  != nil {
				level.Error(.logger).Log("msg", "error closing DB", "db", .name, "err", )
			}
			return 
		})
	}

	return .Wait()
}

func ( *ColumnStore) () string {
	return filepath.Join(.storagePath, "databases")
}

// recoverDBsFromStorage replays the snapshots and write-ahead logs for each database.
func ( *ColumnStore) ( context.Context) error {
	if !.enableWAL {
		return nil
	}

	 := .DatabasesDir()
	if ,  := os.Stat();  != nil {
		if os.IsNotExist() {
			level.Debug(.logger).Log("msg", "WAL directory does not exist, no WAL to replay")
			return nil
		}
		return 
	}

	,  := os.ReadDir()
	if  != nil {
		return 
	}

	,  := errgroup.WithContext()
	// Limit this operation since WAL recovery could be very memory intensive.
	if .recoveryConcurrency == 0 {
		.recoveryConcurrency = runtime.GOMAXPROCS(0)
	}
	.SetLimit(.recoveryConcurrency)
	for ,  := range  {
		 := .Name()
		.Go(func() error {
			// Open the DB for the side effect of the snapshot and WALs being loaded as part of the open operation.
			,  := .DB(,
				,
				WithCompactionAfterOpen(
					.compactAfterRecovery, .compactAfterRecoveryTableNames,
				),
			)
			return 
		})
	}

	return .Wait()
}

type DB struct {
	columnStore *ColumnStore
	logger      log.Logger
	tracer      trace.Tracer
	name        string

	mtx      *sync.RWMutex
	roTables map[string]*Table
	tables   map[string]*Table

	storagePath string
	wal         WAL

	// The database supports multiple data sources and sinks.
	sources []DataSource
	sinks   []DataSink

	// Databases monotonically increasing transaction id
	tx atomic.Uint64
	// highWatermark maintains the highest consecutively completed txn.
	highWatermark atomic.Uint64

	// TxPool is a waiting area for finished transactions that haven't been added to the watermark
	txPool *TxPool

	compactAfterRecovery           bool
	compactAfterRecoveryTableNames []string

	snapshotInProgress atomic.Bool

	metrics         snapshotMetrics
	metricsProvider tableMetricsProvider
}

// DataSinkSource is a convenience interface for a data source and sink.
type DataSinkSource interface {
	DataSink
	DataSource
}

// DataSource is remote source of data that can be queried.
type DataSource interface {
	fmt.Stringer
	Scan(ctx context.Context, prefix string, schema *dynparquet.Schema, filter logicalplan.Expr, lastBlockTimestamp uint64, callback func(context.Context, any) error) error
	Prefixes(ctx context.Context, prefix string) ([]string, error)
}

// DataSink is a remote destination for data.
type DataSink interface {
	fmt.Stringer
	Upload(ctx context.Context, name string, r io.Reader) error
	Delete(ctx context.Context, name string) error
}

type DBOption func(*DB) error

func ( bool,  []string) DBOption {
	return func( *DB) error {
		.compactAfterRecovery = 
		.compactAfterRecoveryTableNames = 
		return nil
	}
}

// DB gets or creates a database on the given ColumnStore with the given
// options. Note that if the database already exists, the options will be
// applied cumulatively to the database.
func ( *ColumnStore) ( context.Context,  string,  ...DBOption) (*DB, error) {
	if !validateName() {
		return nil, errors.New("invalid database name")
	}
	 := func( *DB) error {
		.mtx.Lock()
		defer .mtx.Unlock()
		for ,  := range  {
			if  := ();  != nil {
				return 
			}
		}
		return nil
	}
	.mtx.RLock()
	,  := .dbs[]
	.mtx.RUnlock()
	if  {
		if  := ();  != nil {
			return nil, 
		}
		return , nil
	}

	.mtx.Lock()
	defer .mtx.Unlock()

	// Need to double-check that in the meantime a database with the same name
	// wasn't concurrently created.
	for {
		,  = .dbs[]
		if  {
			if  := ();  != nil {
				return nil, 
			}
			return , nil
		}

		// DB has not yet been created. However, another goroutine might be
		// replaying the WAL in the background (the store mutex is released
		// during replay.).
		,  := .dbReplaysInProgress[]
		if ! {
			// No replay in progress, it is safe to create the DB.
			break
		}
		.mtx.Unlock()
		<-
		.mtx.Lock()
	}

	 := log.WithPrefix(.logger, "db", )
	 = &DB{
		columnStore:     ,
		name:            ,
		mtx:             &sync.RWMutex{},
		tables:          map[string]*Table{},
		roTables:        map[string]*Table{},
		logger:          ,
		tracer:          .tracer,
		wal:             &wal.NopWAL{},
		sources:         .sources,
		sinks:           .sinks,
		metrics:         .metrics.snapshotMetricsForDB(),
		metricsProvider: tableMetricsProvider{dbName: , m: .metrics},
	}

	if .storagePath != "" {
		.storagePath = filepath.Join(.DatabasesDir(), )
	}

	if  := ();  != nil {
		return nil, 
	}

	if  := func() error {
		if .storagePath != "" {
			if  := os.RemoveAll(.trashDir());  != nil {
				return 
			}
			if  := os.RemoveAll(.indexDir());  != nil { // Remove the index directory. These are either restored from snapshots or rebuilt from the WAL.
				return 
			}
		}
		.txPool = NewTxPool(&.highWatermark)
		// Wait to start the compactor pool since benchmarks show that WAL
		// replay is a lot more efficient if it is not competing against
		// compaction. Additionally, if the CompactAfterRecovery option is
		// specified, we don't want the user-specified compaction to race with
		// our compactor pool.
		if len(.sources) != 0 {
			for ,  := range .sources {
				,  := .Prefixes(, )
				if  != nil {
					return 
				}

				for ,  := range  {
					,  := .readOnlyTable()
					if  != nil {
						return 
					}
				}
			}
		}

		if .enableWAL {
			if  := func() error {
				// Unlock the store mutex while the WAL is replayed, otherwise
				// if multiple DBs are opened in parallel, WAL replays will not
				// happen in parallel. However, create a channel for any
				// goroutines that might concurrently try to open the same DB
				// to listen on.
				.dbReplaysInProgress[] = make(chan struct{})
				.mtx.Unlock()
				defer func() {
					.mtx.Lock()
					close(.dbReplaysInProgress[])
					delete(.dbReplaysInProgress, )
				}()
				var  error
				.wal,  = .openWAL(
					,
					append(
						[]wal.Option{
							wal.WithMetrics(.metrics.metricsForFileWAL()),
							wal.WithStoreMetrics(.metrics.metricsForWAL()),
						}, .testingOptions.walTestingOptions...,
					)...,
				)
				return 
			}();  != nil {
				return 
			}
			// WAL pointers of tables need to be updated to the DB WAL since
			// they are loaded from object storage and snapshots with a no-op
			// WAL by default.
			for ,  := range .tables {
				if !.config.Load().DisableWal {
					.wal = .wal
				}
			}
			for ,  := range .roTables {
				if !.config.Load().DisableWal {
					.wal = .wal
				}
			}
		}
		return nil
	}();  != nil {
		level.Warn(.logger).Log(
			"msg", "error setting up db",
			"name", ,
			"err", ,
		)
		// closeInternal handles closing partially set fields in the db without
		// rotating blocks etc... that the public Close method does.
		_ = .closeInternal()
		return nil, 
	}

	// Compact tables after recovery if requested.
	if .compactAfterRecovery {
		 := .compactAfterRecoveryTableNames
		if len() == 0 {
			// Run compaction on all tables.
			 = maps.Keys(.tables)
		}
		for ,  := range  {
			,  := .GetTable()
			if  != nil {
				level.Warn(.logger).Log("msg", "get table during db setup", "err", )
				continue
			}

			 := time.Now()
			if  := .EnsureCompaction();  != nil {
				level.Warn(.logger).Log("msg", "compaction during setup", "err", )
			}
			level.Info(.logger).Log(
				"msg", "compacted table after recovery", "table", , "took", time.Since(),
			)
		}
	}

	.dbs[] = 
	return , nil
}

// DBs returns all the DB names of this column store.
func ( *ColumnStore) () []string {
	.mtx.RLock()
	defer .mtx.RUnlock()
	return maps.Keys(.dbs)
}

func ( *ColumnStore) ( string) (*DB, error) {
	.mtx.RLock()
	defer .mtx.RUnlock()
	,  := .dbs[]
	if ! {
		return nil, fmt.Errorf("db %s not found", )
	}
	return , nil
}

func ( *ColumnStore) ( string) error {
	,  := .GetDB()
	if  != nil {
		return 
	}
	if  := .Close(WithClearStorage());  != nil {
		return 
	}
	.mtx.Lock()
	defer .mtx.Unlock()
	delete(.dbs, )
	return os.RemoveAll(filepath.Join(.DatabasesDir(), ))
}

func ( *DB) ( context.Context,  ...wal.Option) (WAL, error) {
	,  := wal.Open(
		.logger,
		.walDir(),
		...,
	)
	if  != nil {
		return nil, 
	}

	if  := .recover(, );  != nil {
		return nil, 
	}

	.RunAsync()
	return , nil
}

const (
	walPath       = "wal"
	snapshotsPath = "snapshots"
	indexPath     = "index"
	trashPath     = "trash"
)

func ( *DB) () string {
	return filepath.Join(.storagePath, walPath)
}

func ( *DB) () string {
	return filepath.Join(.storagePath, snapshotsPath)
}

func ( *DB) () string {
	return filepath.Join(.storagePath, trashPath)
}

func ( *DB) () string {
	return filepath.Join(.storagePath, indexPath)
}

// recover attempts to recover database state from a combination of snapshots and the WAL.
//
// The recovery process is as follows:
// 1. Load the latest snapshot (if one should exist).
// 1.a. If on-disk LSM index files exist: Upon table creation during snapshot loading, the index files shall be recovered from, inserting parts into the index.
// 2. Replay the WAL starting from the latest snapshot transaction.
// 2.a. If on-disk LSM index files were loaded: Insertion into the index may drop the insertion if a part with a higher transaction already exists in the WAL.
func ( *DB) ( context.Context,  WAL) error {
	level.Info(.logger).Log(
		"msg", "recovering db",
		"name", .name,
	)
	 := time.Now()
	,  := .loadLatestSnapshot()
	if  != nil {
		level.Info(.logger).Log(
			"msg", "failed to load latest snapshot", "db", .name, "err", ,
		)
		 = 0
	}
	 := make([]any, 0)
	if  != 0 {
		 = append(
			,
			"snapshot_tx", ,
			"snapshot_load_duration", time.Since(),
		)
		if  := .cleanupSnapshotDir(, );  != nil {
			// Truncation is best-effort. If it fails, move on.
			level.Info(.logger).Log(
				"msg", "failed to truncate snapshots not equal to loaded snapshot",
				"err", ,
				"snapshot_tx", ,
			)
		}
		// snapshotTx can correspond to a write at that txn that is contained in
		// the snapshot. We want the first entry of the WAL to be the subsequent
		// txn to not replay duplicate writes.
		if  := .Truncate( + 1);  != nil {
			level.Info(.logger).Log(
				"msg", "failed to truncate WAL after loading snapshot",
				"err", ,
				"snapshot_tx", ,
			)
		}
	}

	// persistedTables is a map from a table name to the last transaction
	// persisted.
	 := make(map[string]uint64)
	var  uint64

	 := time.Now()
	if  := .Replay(+1, func( uint64,  *walpb.Record) error {
		if  := .Err();  != nil {
			return 
		}
		switch e := .Entry.EntryType.(type) {
		case *walpb.Entry_TableBlockPersisted_:
			[.TableBlockPersisted.TableName] = .TableBlockPersisted.NextTx
			// The loaded snapshot might have persisted data, this is handled in
			// the replay loop below.
			return nil
		default:
			return nil
		}
	});  != nil {
		return 
	}

	// performSnapshot is set to true if a snapshot should be performed after
	// replay. This is set in cases where there could be "dead bytes" in the
	// WAL (i.e. entries that occupy space on disk but are useless).
	 := false

	if  := .Replay(+1, func( uint64,  *walpb.Record) error {
		if  := .Err();  != nil {
			return 
		}
		 = 
		switch e := .Entry.EntryType.(type) {
		case *walpb.Entry_NewTableBlock_:
			 := .NewTableBlock
			var  proto.Message
			switch v := .Config.Schema.(type) {
			case *tablepb.TableConfig_DeprecatedSchema:
				 = .DeprecatedSchema
			case *tablepb.TableConfig_SchemaV2:
				 = .SchemaV2
			default:
				return fmt.Errorf("unhandled schema type: %T", )
			}

			var  ulid.ULID
			if  := .UnmarshalBinary(.BlockId);  != nil {
				return 
			}

			,  := [.TableName]
			if  &&  <  {
				// This block has already been successfully persisted, so we can
				// skip it. Note that if this new table block is the active
				// block after persistence tx == nextNonPersistedTxn.
				return nil
			}

			 := .TableName
			,  := .GetTable()
			var  ErrTableNotFound
			if errors.As(, &) {
				return func() error {
					.mtx.Lock()
					defer .mtx.Unlock()
					 := NewTableConfig(, FromConfig(.Config))
					if ,  := .roTables[];  {
						,  = .promoteReadOnlyTableLocked(, )
						if  != nil {
							return fmt.Errorf("promoting read only table: %w", )
						}
					} else {
						,  = newTable(
							,
							,
							,
							.metricsProvider.metricsForTable(),
							.logger,
							.tracer,
							,
						)
						if  != nil {
							return fmt.Errorf("instantiate table: %w", )
						}
					}

					.active,  = newTableBlock(, 0, , )
					if  != nil {
						return 
					}
					.tables[] = 
					return nil
				}()
			}
			if  != nil {
				return fmt.Errorf("get table: %w", )
			}

			level.Info(.logger).Log(
				"msg", "writing unfinished block in recovery",
				"table", ,
				"tx", ,
			)
			if  == 0 ||  !=  {
				// If we get to this point it means a block was finished but did
				// not get persisted. If a snapshot was loaded, then the table
				// already exists but the active block is outdated. If
				// tx == nextNonPersistedTxn, we should not persist the active
				// block, but just create a new block.
				.pendingBlocks[.active] = struct{}{}
				go .writeBlock(.active, , .columnStore.manualBlockRotation)
			}

			 := false
			switch .(type) {
			case *schemav2pb.Schema:
				 = proto.Equal(, .config.Load().GetSchemaV2())
			case *schemapb.Schema:
				 = proto.Equal(, .config.Load().GetDeprecatedSchema())
			}
			if ! {
				// If schemas are identical from block to block we should we
				// reuse the previous schema in order to retain pooled memory
				// for it.
				,  := dynparquet.SchemaFromDefinition()
				if  != nil {
					return fmt.Errorf("initialize schema: %w", )
				}

				.schema = 
			}

			.active,  = newTableBlock(, .active.minTx, , )
			if  != nil {
				return 
			}
		case *walpb.Entry_Write_:
			 := .Write
			 := .TableName
			if ,  := [];  &&  <  {
				// This write has already been successfully persisted, so we can
				// skip it.
				return nil
			}

			,  := .GetTable()
			var  ErrTableNotFound
			if errors.As(, &) {
				// This means the WAL was truncated at a point where this write
				// was already successfully persisted to disk in more optimized
				// form than the WAL.
				return nil
			}
			if  != nil {
				return fmt.Errorf("get table: %w", )
			}

			switch .Write.Arrow {
			case true:
				,  := ipc.NewReader(bytes.NewReader(.Data))
				if  != nil {
					return fmt.Errorf("create ipc reader: %w", )
				}
				,  := .Read()
				if  != nil {
					return fmt.Errorf("read record: %w", )
				}
				defer .Release()
				 := util.TotalRecordSize()
				.active.index.InsertPart(parts.NewArrowPart(, , uint64(), .schema, parts.WithCompactionLevel(int(index.L0))))
			default:
				panic("parquet writes are deprecated")
			}
			return nil
		case *walpb.Entry_TableBlockPersisted_:
			// If a block was persisted but the entry still exists in the WAL,
			// a snapshot was not performed after persisting the block. Perform
			// one now to clean up the WAL.
			 = true
			return nil
		case *walpb.Entry_Snapshot_:
			return nil
		default:
			return fmt.Errorf("unexpected WAL entry type: %t", )
		}
		return nil
	});  != nil {
		return 
	}

	 := 
	if  >  {
		 = 
	}

	.mtx.Lock()
	for ,  := range .tables {
		 := .ActiveBlock()
		.uncompressedInsertsSize.Store(.Index().LevelSize(index.L0))
	}
	.mtx.Unlock()

	.resetToTxn(, nil)
	if  && .columnStore.snapshotTriggerSize != 0 {
		level.Info(.logger).Log(
			"msg", "performing snapshot after recovery",
		)
		.snapshot(, false, func() {
			if  := .reclaimDiskSpace(, );  != nil {
				level.Error(.logger).Log(
					"msg", "failed to reclaim disk space after snapshot during recovery",
					"err", ,
				)
			}
		})
	}
	level.Info(.logger).Log(
		append(
			[]any{
				"msg", "db recovered",
				"wal_replay_duration", time.Since(),
				"watermark", ,
			},
			...,
		)...,
	)
	return nil
}

type CloseOption func(*closeOptions)

type closeOptions struct {
	clearStorage bool
}

func () CloseOption {
	return func( *closeOptions) {
		.clearStorage = true
	}
}

func ( *DB) ( ...CloseOption) error {
	 := &closeOptions{}
	for ,  := range  {
		()
	}

	 := len(.sinks) > 0 && !.columnStore.manualBlockRotation
	if ! && .columnStore.snapshotTriggerSize != 0 && !.clearStorage {
		 := time.Now()
		.snapshot(context.Background(), false, func() {
			level.Info(.logger).Log("msg", "snapshot on close completed", "duration", time.Since())
			if  := .reclaimDiskSpace(context.Background(), nil);  != nil {
				level.Error(.logger).Log(
					"msg", "failed to reclaim disk space after snapshot",
					"err", ,
				)
			}
		})
	}

	level.Info(.logger).Log("msg", "closing DB")
	for ,  := range .tables {
		.close()
		if  {
			// Write the blocks but no snapshots since they are long-running
			// jobs. Use db.tx.Load as the block's max txn since the table was
			// closed above, so no writes are in flight at this stage.
			// TODO(asubiotto): Maybe we should snapshot in any case since it
			// should be faster to write to local disk than upload to object
			// storage. This would avoid a slow WAL replay on startup if we
			// don't manage to persist in time.
			.writeBlock(.ActiveBlock(), .tx.Load(), false)
		}
	}
	level.Info(.logger).Log("msg", "closed all tables")

	if  := .closeInternal();  != nil {
		return 
	}

	if ( || .clearStorage) && .storagePath != "" {
		if  := .dropStorage();  != nil {
			return 
		}
		level.Info(.logger).Log("msg", "cleaned up wal & snapshots")
	}
	return nil
}

func ( *DB) () error {
	defer func() {
		// Clean up the txPool even on error.
		if .txPool != nil {
			.txPool.Stop()
		}
	}()

	if !.columnStore.enableWAL || .wal == nil {
		return nil
	}
	return .wal.Close()
}

func ( *DB) () {
	if  := .getMinTXPersisted();  > 0 {
		if  := .wal.Truncate();  != nil {
			return
		}
	}
}

// reclaimDiskSpace attempts to read the latest valid snapshot txn and removes
// any snapshots/wal entries that are older than the snapshot tx. Since this can
// be called before db.wal is set, the caller may optionally pass in a WAL to
// truncate.
func ( *DB) ( context.Context,  WAL) error {
	if .columnStore.testingOptions.disableReclaimDiskSpaceOnSnapshot {
		return nil
	}
	,  := .getLatestValidSnapshotTxn()
	if  != nil {
		return 
	}
	if  == 0 {
		return nil
	}
	if  := .cleanupSnapshotDir(, );  != nil {
		return 
	}
	if  == nil {
		 = .wal
	}
	// Snapshots are taken with a read txn and are inclusive, so therefore
	// include a potential write at validSnapshotTxn. We don't want this to be
	// the first entry in the WAL after truncation, given it is already
	// contained in the snapshot, so Truncate at validSnapshotTxn + 1.
	return .Truncate( + 1)
}

func ( *DB) () uint64 {
	.mtx.RLock()
	defer .mtx.RUnlock()
	 := uint64(math.MaxUint64)
	for ,  := range .tables {
		.mtx.RLock()
		 := .lastCompleted
		.mtx.RUnlock()
		if  <  {
			 = 
		}
	}
	return 
}

func ( *DB) ( string) (*Table, error) {
	,  := .tables[]
	if  {
		return , nil
	}

	,  := newTable(
		,
		,
		nil,
		.metricsProvider.metricsForTable(),
		.logger,
		.tracer,
		.wal,
	)
	if  != nil {
		return nil, fmt.Errorf("failed to create table: %w", )
	}

	.roTables[] = 
	return , nil
}

// promoteReadOnlyTableLocked promotes a read-only table to a read-write table.
// The read-write table is returned but not added to the database. Callers must
// do so.
// db.mtx must be held while calling this method.
func ( *DB) ( string,  *tablepb.TableConfig) (*Table, error) {
	,  := .roTables[]
	if ! {
		return nil, fmt.Errorf("read only table %s not found", )
	}
	,  := schemaFromTableConfig()
	if  != nil {
		return nil, 
	}
	.config.Store()
	.schema = 
	delete(.roTables, )
	return , nil
}

// Table will get or create a new table with the given name and config. If a table already exists with the given name, it will have it's configuration updated.
func ( *DB) ( string,  *tablepb.TableConfig) (*Table, error) {
	return .table(, , generateULID())
}

func ( *DB) ( string,  *tablepb.TableConfig,  ulid.ULID) (*Table, error) {
	if  == nil {
		return nil, fmt.Errorf("table config cannot be nil")
	}
	if !validateName() {
		return nil, errors.New("invalid table name")
	}
	.mtx.RLock()
	,  := .tables[]
	.mtx.RUnlock()
	if  {
		.config.Store()
		return , nil
	}

	.mtx.Lock()
	defer .mtx.Unlock()

	// Need to double-check that in the meantime another table with the same
	// name wasn't concurrently created.
	,  = .tables[]
	if  {
		return , nil
	}

	// Check if this table exists as a read only table
	if ,  := .roTables[];  {
		var  error
		,  = .promoteReadOnlyTableLocked(, )
		if  != nil {
			return nil, 
		}
	} else {
		var  error
		,  = newTable(
			,
			,
			,
			.metricsProvider.metricsForTable(),
			.logger,
			.tracer,
			.wal,
		)
		if  != nil {
			return nil, fmt.Errorf("failed to create table: %w", )
		}
	}

	, ,  := .begin()
	defer ()

	if  := .newTableBlock(0, , );  != nil {
		return nil, 
	}

	.tables[] = 
	return , nil
}

type ErrTableNotFound struct {
	TableName string
}

func ( ErrTableNotFound) () string {
	return fmt.Sprintf("table %s not found", .TableName)
}

func ( *DB) ( string) (*Table, error) {
	.mtx.RLock()
	,  := .tables[]
	.mtx.RUnlock()
	if ! {
		return nil, ErrTableNotFound{TableName: }
	}
	return , nil
}

func ( *DB) () *DBTableProvider {
	return NewDBTableProvider()
}

// TableNames returns the names of all the db's tables.
func ( *DB) () []string {
	.mtx.RLock()
	 := maps.Keys(.tables)
	.mtx.RUnlock()
	return 
}

type DBTableProvider struct {
	db *DB
}

func ( *DB) *DBTableProvider {
	return &DBTableProvider{
		db: ,
	}
}

func ( *DBTableProvider) ( string) (logicalplan.TableReader, error) {
	.db.mtx.RLock()
	defer .db.mtx.RUnlock()
	,  := .db.tables[]
	if  {
		return , nil
	}

	,  = .db.roTables[]
	if  {
		return , nil
	}

	return nil, fmt.Errorf("table %v not found", )
}

// beginRead returns the high watermark. Reads can safely access any write that has a lower or equal tx id than the returned number.
func ( *DB) () uint64 {
	return .highWatermark.Load()
}

// begin is an internal function that Tables call to start a transaction for writes.
// It returns:
//
//	the write tx id
//	The current high watermark
//	A function to complete the transaction
func ( *DB) () (uint64, uint64, func()) {
	 := .tx.Add(1)
	 := .highWatermark.Load()
	return , , func() {
		if  := .highWatermark.Load(); +1 ==  {
			// This is the next consecutive transaction; increase the watermark.
			.highWatermark.Store()
			.txPool.notifyWatermark()
			return
		}

		// place completed transaction in the waiting pool
		.txPool.Insert()
	}
}

// Wait is a blocking function that returns once the high watermark has equaled or exceeded the transaction id.
// Wait makes no differentiation between completed and aborted transactions.
func ( *DB) ( uint64) {
	for {
		if .highWatermark.Load() >=  {
			return
		}
		time.Sleep(10 * time.Millisecond)
	}
}

// HighWatermark returns the current high watermark.
func ( *DB) () uint64 {
	return .highWatermark.Load()
}

// resetToTxn resets the DB's internal state to resume from the given
// transaction. If the given wal is non-nil, it is also reset so that the next
// expected transaction will log correctly to the WAL. Note that db.wal is not
// used since callers might be calling resetToTxn before db.wal has been
// initialized or might not want the WAL to be reset.
func ( *DB) ( uint64,  WAL) {
	.tx.Store()
	.highWatermark.Store()
	if  != nil {
		// This call resets the WAL to a zero state so that new records can be
		// logged.
		if  := .Reset( + 1);  != nil {
			level.Warn(.logger).Log(
				"msg", "failed to reset WAL when resetting DB to txn",
				"txnID", ,
				"err", ,
			)
		}
	}
}

// validateName ensures that the passed in name doesn't violate any constrainsts.
func validateName( string) bool {
	return !strings.Contains(, "/")
}

// dropStorage removes all data from the storage directory, but leaves the empty
// storage directory.
func ( *DB) () error {
	 := .trashDir()

	,  := os.ReadDir(.storagePath)
	if  != nil {
		if os.IsNotExist() {
			// Nothing to drop.
			return nil
		}
		return 
	}
	// Try to rename all entries as this is O(1) per entry. We want to preserve
	// the storagePath for future opens of this database. Callers that want to
	// drop the DB remove storagePath themselves.
	if  := func() error {
		if  := os.MkdirAll(, os.FileMode(0o755));  != nil {
			return fmt.Errorf("making trash dir: %w", )
		}
		// Create a temporary directory in the trash dir to avoid clashing
		// with other wal/snapshot dirs that might not have been removed
		// previously.
		,  := os.MkdirTemp(, "")
		if  != nil {
			return 
		}
		 := make([]error, 0, len())
		for ,  := range  {
			if  := os.Rename(filepath.Join(.storagePath, .Name()), filepath.Join(, .Name()));  != nil && !os.IsNotExist() {
				 = append(, )
			}
		}
		return errors.Join(...)
	}();  != nil {
		// If we failed to move storage path entries to the trash dir, fall back
		// to attempting to remove them with RemoveAll.
		 := make([]error, 0, len())
		for ,  := range  {
			if  := os.RemoveAll(filepath.Join(.storagePath, .Name()));  != nil {
				 = append(, )
			}
		}
		return errors.Join(...)
	}
	return os.RemoveAll()
}