package frostdb

import (
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
	
	
	
	

	
	schemapb 
	schemav2pb 
	tablepb 
	walpb 
	
	
	
	
	
	
	
	walpkg 
)

var (
	ErrNoSchema     = fmt.Errorf("no schema")
	ErrTableClosing = fmt.Errorf("table closing")
)

// DefaultIndexConfig returns the default level configs used. This is a function
// So that any modifications to the result will not affect the default config.
func () []*index.LevelConfig {
	return []*index.LevelConfig{
		{Level: index.L0, MaxSize: 1024 * 1024 * 15, Type: index.CompactionTypeParquetMemory},  // Compact to in-memory Parquet buffer after 15MiB of data
		{Level: index.L1, MaxSize: 1024 * 1024 * 128, Type: index.CompactionTypeParquetMemory}, // Compact to a single in-memory Parquet buffer after 128MiB of Parquet files
		{Level: index.L2, MaxSize: 1024 * 1024 * 512},                                          // Final level. Rotate after 512MiB of Parquet files
	}
}

type ErrWriteRow struct{ err error }

func ( ErrWriteRow) () string { return "failed to write row: " + .err.Error() }

type ErrReadRow struct{ err error }

func ( ErrReadRow) () string { return "failed to read row: " + .err.Error() }

type ErrCreateSchemaWriter struct{ err error }

func ( ErrCreateSchemaWriter) () string {
	return "failed to create schema write: " + .err.Error()
}

type TableOption func(*tablepb.TableConfig) error

// WithRowGroupSize sets the size in number of rows for each row group for parquet files. A <= 0 value indicates no limit.
func ( int) TableOption {
	return func( *tablepb.TableConfig) error {
		.RowGroupSize = uint64()
		return nil
	}
}

// WithBlockReaderLimit sets the limit of go routines that will be used to read persisted block files. A negative number indicates no limit.
func ( int) TableOption {
	return func( *tablepb.TableConfig) error {
		.BlockReaderLimit = uint64()
		return nil
	}
}

// WithoutWAL disables the WAL for this table.
func () TableOption {
	return func( *tablepb.TableConfig) error {
		.DisableWal = true
		return nil
	}
}

func ( bool) TableOption {
	return func( *tablepb.TableConfig) error {
		switch e := .Schema.(type) {
		case *tablepb.TableConfig_DeprecatedSchema:
			.DeprecatedSchema.UniquePrimaryIndex = 
		case *tablepb.TableConfig_SchemaV2:
			.SchemaV2.UniquePrimaryIndex = 
		}
		return nil
	}
}

// FromConfig sets the table configuration from the given config.
// NOTE: that this does not override the schema even though that is included in the passed in config.
func ( *tablepb.TableConfig) TableOption {
	return func( *tablepb.TableConfig) error {
		if .BlockReaderLimit != 0 { // the zero value is not a valid block reader limit
			.BlockReaderLimit = .BlockReaderLimit
		}
		.DisableWal = .DisableWal
		.RowGroupSize = .RowGroupSize
		return nil
	}
}

func defaultTableConfig() *tablepb.TableConfig {
	return &tablepb.TableConfig{
		BlockReaderLimit: uint64(runtime.GOMAXPROCS(0)),
	}
}

func (
	 proto.Message,
	 ...TableOption,
) *tablepb.TableConfig {
	 := defaultTableConfig()

	switch v := .(type) {
	case *schemapb.Schema:
		.Schema = &tablepb.TableConfig_DeprecatedSchema{
			DeprecatedSchema: ,
		}
	case *schemav2pb.Schema:
		.Schema = &tablepb.TableConfig_SchemaV2{
			SchemaV2: ,
		}
	default:
		panic(fmt.Sprintf("unsupported schema type: %T", ))
	}

	for ,  := range  {
		_ = ()
	}

	return 
}

type completedBlock struct {
	prevTx uint64
	tx     uint64
}

// GenericTable is a wrapper around *Table that writes structs of type T. It
// consist of  a generic arrow.Record builder that ingests structs of type T.
// The generated record is then  passed to (*Table).InsertRecord.
//
// Struct tag `frostdb` is used to pass options for the schema for T.
//
// This api is opinionated.
//
//   - Nested Columns are not supported
//
// # Tags
//
// Use `frostdb` to define tags that customizes field values. You can express
// everything needed to construct schema v1alpha1.
//
// Tags are defined as a comma separated list. The first item is the column
// name. Column name is optional, when omitted it is derived from the field name
// (snake_cased)
//
// Supported Tags
//
//	    delta_binary_packed | Delta binary packed encoding.
//	                 brotli | Brotli compression.
//	                    asc | Sorts in ascending order.Use asc(n) where n is an integer for sorting order.
//	                   gzip | GZIP compression.
//	                 snappy | Snappy compression.
//	delta_length_byte_array | Delta Length Byte Array encoding.
//	       delta_byte_array | Delta Byte Array encoding.
//	                   desc | Sorts in descending order.Use desc(n) where n is an integer for sorting order
//	                lz4_raw | LZ4_RAW compression.
//	               pre_hash | Prehash the column before storing it.
//	             null_first | When used wit asc nulls are smallest and with des nulls are largest.
//	                   zstd | ZSTD compression.
//	               rle_dict | Dictionary run-length encoding.
//	                  plain | Plain encoding.
//
// Example tagged Sample struct
//
//	type Sample struct {
//		ExampleType string      `frostdb:"example_type,rle_dict,asc(0)"`
//		Labels      []Label     `frostdb:"labels,rle_dict,null,dyn,asc(1),null_first"`
//		Stacktrace  []uuid.UUID `frostdb:"stacktrace,rle_dict,asc(3),null_first"`
//		Timestamp   int64       `frostdb:"timestamp,asc(2)"`
//		Value       int64       `frostdb:"value"`
//	}
//
// # Dynamic columns
//
// Field of type map<string, T> is a dynamic column by default.
//
//	type Example struct {
//		// Use supported tags to customize the column value
//		Labels map[string]string `frostdb:"labels"`
//	}
//
// # Repeated columns
//
// Fields of type []int64, []float64, []bool, and []string are supported. These
// are represented as arrow.LIST.
//
// Generated schema for the repeated columns applies all supported tags. By
// default repeated fields are nullable. You can safely pass nil slices for
// repeated columns.
type GenericTable[ any] struct {
	*Table
	mu    sync.Mutex
	build *records.Build[]
}

func ( *GenericTable[]) () {
	.build.Release()
}

// Write builds arrow.Record directly from values and calls (*Table).InsertRecord.
func ( *GenericTable[]) ( context.Context,  ...) (uint64, error) {
	.mu.Lock()
	defer .mu.Unlock()
	 := .build.Append(...)
	if  != nil {
		return 0, 
	}
	return .InsertRecord(, .build.NewRecord())
}

func [ any]( *DB,  string,  memory.Allocator,  ...TableOption) (*GenericTable[], error) {
	 := records.NewBuild[]()
	,  := .Table(, NewTableConfig(.Schema(), ...))
	if  != nil {
		return nil, 
	}
	return &GenericTable[]{build: , Table: }, nil
}

type Table struct {
	db      *DB
	name    string
	metrics tableMetrics
	logger  log.Logger
	tracer  trace.Tracer

	config atomic.Pointer[tablepb.TableConfig]
	schema *dynparquet.Schema

	pendingBlocks   map[*TableBlock]struct{}
	completedBlocks []completedBlock
	lastCompleted   uint64

	mtx    *sync.RWMutex
	active *TableBlock

	wal     WAL
	closing bool
}

type Sync interface {
	Sync() error
}

type WAL interface {
	Close() error
	Log(tx uint64, record *walpb.Record) error
	LogRecord(tx uint64, table string, record arrow.Record) error
	// Replay replays WAL records from the given first index. If firstIndex is
	// 0, the first index read from the WAL is used (i.e. given a truncation,
	// using 0 is still valid). If the given firstIndex is less than the WAL's
	// first index on disk, the replay happens from the first index on disk.
	// If the handler panics, the WAL implementation will truncate the WAL up to
	// the last valid index.
	Replay(tx uint64, handler walpkg.ReplayHandlerFunc) error
	Truncate(tx uint64) error
	Reset(nextTx uint64) error
	FirstIndex() (uint64, error)
	LastIndex() (uint64, error)
}

type TableBlock struct {
	table  *Table
	logger log.Logger
	tracer trace.Tracer

	ulid   ulid.ULID
	minTx  uint64
	prevTx uint64

	// uncompressedInsertsSize keeps track of the cumulative L0 size. This is
	// not the size of the block, since these inserts are eventually compressed.
	// However, it serves to determine when to perform a snapshot, since these
	// uncompressed inserts are stored in the WAL, and if the node crashes, it
	// is obliged to re-read all of these uncompressed inserts into memory,
	// potentially causing OOMs.
	uncompressedInsertsSize atomic.Int64
	// lastSnapshotSize keeps track of the uncompressedInsertsSize when a
	// snapshot was last triggered.
	lastSnapshotSize atomic.Int64

	index *index.LSM

	pendingWritersWg sync.WaitGroup
	pendingReadersWg sync.WaitGroup

	mtx *sync.RWMutex
}

type Closer interface {
	Close(cleanup bool) error
}

func schemaFromTableConfig( *tablepb.TableConfig) (*dynparquet.Schema, error) {
	switch schema := .Schema.(type) {
	case *tablepb.TableConfig_DeprecatedSchema:
		return dynparquet.SchemaFromDefinition(.DeprecatedSchema)
	case *tablepb.TableConfig_SchemaV2:
		return dynparquet.SchemaFromDefinition(.SchemaV2)
	default:
		// No schema defined for table; read/only table
		return nil, nil
	}
}

func newTable(
	 *DB,
	 string,
	 *tablepb.TableConfig,
	 tableMetrics,
	 log.Logger,
	 trace.Tracer,
	 WAL,
) (*Table, error) {
	if .columnStore.indexDegree <= 0 {
		 := fmt.Sprintf("Table's columnStore index degree must be a positive integer (received %d)", .columnStore.indexDegree)
		return nil, errors.New()
	}

	if .columnStore.splitSize < 2 {
		 := fmt.Sprintf("Table's columnStore splitSize must be a positive integer > 1 (received %d)", .columnStore.splitSize)
		return nil, errors.New()
	}

	if  == nil {
		 = defaultTableConfig()
	}

	,  := schemaFromTableConfig()
	if  != nil {
		return nil, 
	}

	 := &Table{
		db:      ,
		name:    ,
		logger:  ,
		tracer:  ,
		mtx:     &sync.RWMutex{},
		wal:     ,
		schema:  ,
		metrics: ,
	}

	// Store the table config
	.config.Store()

	// Disable the WAL for this table by replacing any given WAL with a nop wal
	if .DisableWal {
		.wal = &walpkg.NopWAL{}
	}

	.pendingBlocks = make(map[*TableBlock]struct{})

	return , nil
}

func ( *Table) (,  uint64,  ulid.ULID) error {
	,  := .MarshalBinary()
	if  != nil {
		return 
	}

	if  := .wal.Log(, &walpb.Record{
		Entry: &walpb.Entry{
			EntryType: &walpb.Entry_NewTableBlock_{
				NewTableBlock: &walpb.Entry_NewTableBlock{
					TableName: .name,
					BlockId:   ,
					Config:    .config.Load(),
				},
			},
		},
	});  != nil {
		return 
	}

	.active,  = newTableBlock(, , , )
	if  != nil {
		return 
	}

	return nil
}

func ( *Table) ( *TableBlock) {
	.mtx.Lock()
	defer .mtx.Unlock()
	delete(.pendingBlocks, )

	// Wait for outstanding readers/writers to finish with the block before releasing underlying resources.
	.pendingReadersWg.Wait()
	.pendingWritersWg.Wait()

	if  := .index.Close();  != nil {
		level.Error(.logger).Log("msg", "failed to close index", "err", )
	}
}

func ( *Table) (
	 *TableBlock,  uint64,  bool,  ...RotateBlockOption,
) {
	 := &rotateBlockOptions{}
	for ,  := range  {
		()
	}
	if .wg != nil {
		defer .wg.Done()
	}
	level.Debug(.logger).Log("msg", "syncing block", "next_txn", , "ulid", .ulid, "size", .index.Size())
	.pendingWritersWg.Wait()

	// from now on, the block will no longer be modified, we can persist it to disk

	level.Debug(.logger).Log("msg", "done syncing block", "next_txn", , "ulid", .ulid, "size", .index.Size())

	// Persist the block
	var  error
	if !.skipPersist && .index.Size() != 0 {
		 = .Persist()
	}
	.dropPendingBlock()
	if  != nil {
		level.Error(.logger).Log("msg", "failed to persist block")
		level.Error(.logger).Log("msg", .Error())
		return
	}

	if  := func() error {
		, ,  := .db.begin()
		defer ()

		,  := .ulid.MarshalBinary()
		if  != nil {
			level.Error(.logger).Log("msg", "failed to record block persistence in WAL: marshal ulid", "err", )
			return 
		}

		level.Debug(.logger).Log("msg", "recording block persistence in WAL", "ulid", .ulid, "txn", )
		if  := .wal.Log(, &walpb.Record{
			Entry: &walpb.Entry{
				EntryType: &walpb.Entry_TableBlockPersisted_{
					TableBlockPersisted: &walpb.Entry_TableBlockPersisted{
						TableName: .name,
						BlockId:   ,
						// NOTE: nextTxn is used here instead of tx, since some
						// writes could have happened between block rotation
						// and the txn beginning above.
						NextTx: ,
					},
				},
			},
		});  != nil {
			level.Error(.logger).Log("msg", "failed to record block persistence in WAL", "err", )
			return 
		}

		return nil
	}();  != nil {
		return
	}

	.mtx.Lock()
	.completedBlocks = append(.completedBlocks, completedBlock{prevTx: .prevTx, tx: .minTx})
	sort.Slice(.completedBlocks, func(,  int) bool {
		return .completedBlocks[].prevTx < .completedBlocks[].prevTx
	})
	for len(.completedBlocks) > 0 && .completedBlocks[0].prevTx == .lastCompleted {
		.lastCompleted = .completedBlocks[0].tx
		.metrics.lastCompletedBlockTx.Set(float64(.lastCompleted))
		.completedBlocks = .completedBlocks[1:]
	}
	.mtx.Unlock()
	.db.maintainWAL()
	if  && .db.columnStore.snapshotTriggerSize != 0 && .db.columnStore.enableWAL {
		func() {
			if !.db.snapshotInProgress.CompareAndSwap(false, true) {
				// Snapshot already in progress. This could lead to duplicate
				// data when replaying (refer to the snapshot design document),
				// but discarding this data on recovery is better than a
				// potential additional CPU spike caused by another snapshot.
				return
			}
			defer .db.snapshotInProgress.Store(false)
			// This snapshot snapshots the new, active, table block. Refer to
			// the snapshot design document for more details as to why this
			// snapshot is necessary.
			// context.Background is used here for the snapshot since callers
			// might cancel the context when the write is finished but the
			// snapshot is not. Note that block.Persist does the same.
			// TODO(asubiotto): Eventually we should register a cancel function
			// that is called with a grace period on db.Close.
			 := context.Background()
			 := .db.beginRead()
			if  := .db.snapshotAtTX(, , .db.snapshotWriter());  != nil {
				level.Error(.logger).Log(
					"msg", "failed to write snapshot on block rotation",
					"err", ,
				)
			}
			if  := .db.reclaimDiskSpace(, nil);  != nil {
				level.Error(.logger).Log(
					"msg", "failed to reclaim disk space after snapshot on block rotation",
					"err", ,
				)
				return
			}
		}()
	}
}

type rotateBlockOptions struct {
	skipPersist bool
	wg          *sync.WaitGroup
}

type RotateBlockOption func(*rotateBlockOptions)

// WithRotateBlockSkipPersist instructs the block rotation operation to not
// persist the block to object storage.
func () RotateBlockOption {
	return func( *rotateBlockOptions) {
		.skipPersist = true
	}
}

// WithRotateBlockWaitGroup provides a WaitGroup. The rotate block operation
// will call wg.Done once the block has been persisted. Otherwise, RotateBlock
// asynchronously persists the block.
func ( *sync.WaitGroup) RotateBlockOption {
	return func( *rotateBlockOptions) {
		.wg = 
	}
}

func ( *Table) ( context.Context,  *TableBlock,  ...RotateBlockOption) error {
	 := &rotateBlockOptions{}
	for ,  := range  {
		()
	}
	.mtx.Lock()
	defer .mtx.Unlock()

	// Need to check that we haven't already rotated this block.
	if .active !=  {
		if .wg != nil {
			.wg.Done()
		}
		return nil
	}

	level.Debug(.logger).Log(
		"msg", "rotating block",
		"ulid", .ulid,
		"size", .Size(),
		"skip_persist", .skipPersist,
	)
	defer level.Debug(.logger).Log("msg", "done rotating block", "ulid", .ulid)

	, ,  := .db.begin()
	defer ()

	 := generateULID()
	for .Time() == .ulid.Time() { // Ensure the new block has a different timestamp.
		// Sleep a millisecond to ensure the ULID has a different timestamp.
		time.Sleep(time.Millisecond)
		 = generateULID()
	}
	if  := .newTableBlock(.active.minTx, , );  != nil {
		return 
	}
	.metrics.blockRotated.Inc()
	.metrics.numParts.Set(float64(0))

	if !.skipPersist {
		// If skipping persist, this block rotation is simply a block discard,
		// so no need to add this block to pending blocks. Some callers rely
		// on the fact that blocks are not available for reads as soon as
		// RotateBlock returns with skipPersist=true.
		.pendingBlocks[] = struct{}{}
	}
	// We don't check t.db.columnStore.manualBlockRotation here because this is
	// the entry point for users to trigger a manual block rotation and they
	// will specify through skipPersist if they want the block to be persisted.
	go .writeBlock(, , true, ...)

	return nil
}

func ( *Table) () *TableBlock {
	.mtx.RLock()
	defer .mtx.RUnlock()

	return .active
}

func ( *Table) () (*TableBlock, func(), error) {
	.mtx.RLock()
	defer .mtx.RUnlock()

	if .closing {
		return nil, nil, ErrTableClosing
	}

	.active.pendingWritersWg.Add(1)
	return .active, .active.pendingWritersWg.Done, nil
}

func ( *Table) () *dynparquet.Schema {
	if .config.Load() == nil {
		return nil
	}
	return .schema
}

func ( *Table) () error {
	return .ActiveBlock().EnsureCompaction()
}

func ( *Table) ( context.Context,  arrow.Record) (uint64, error) {
	, ,  := .appender()
	if  != nil {
		return 0, fmt.Errorf("get appender: %w", )
	}
	defer ()

	, ,  := .db.begin()
	defer ()

	 := dynparquet.PrehashColumns(.schema, )
	defer .Release()

	if  := .wal.LogRecord(, .name, );  != nil {
		return , fmt.Errorf("append to log: %w", )
	}

	if  := .InsertRecord(, , );  != nil {
		return , fmt.Errorf("insert buffer into block: %w", )
	}

	return , nil
}

func ( *Table) ( context.Context) (*TableBlock, func(), error) {
	for {
		// Using active write block is important because it ensures that we don't
		// miss pending writers when synchronizing the block.
		, ,  := .ActiveWriteBlock()
		if  != nil {
			return nil, nil, 
		}

		 := .uncompressedInsertsSize.Load()
		if .db.columnStore.snapshotTriggerSize != 0 &&
			// If size-lastSnapshotSize > snapshotTriggerSize (a column store
			// option), a new snapshot is triggered. This is basically the size
			// of the new data in this block since the last snapshot.
			-.lastSnapshotSize.Load() > .db.columnStore.snapshotTriggerSize {
			// context.Background is used here for the snapshot since callers
			// might cancel the context when the write is finished but the
			// snapshot is not.
			// TODO(asubiotto): Eventually we should register a cancel function
			// that is called with a grace period on db.Close.
			.db.asyncSnapshot(context.Background(), func() {
				level.Debug(.logger).Log(
					"msg", "successful snapshot on block size trigger",
					"block_size", humanize.IBytes(uint64()),
					"last_snapshot_size", humanize.IBytes(uint64(.lastSnapshotSize.Load())),
				)
				.lastSnapshotSize.Store()
				if  := .db.reclaimDiskSpace(context.Background(), nil);  != nil {
					level.Error(.logger).Log(
						"msg", "failed to reclaim disk space after snapshot",
						"err", ,
					)
					return
				}
			})
		}
		 := .Size()
		if  < .db.columnStore.activeMemorySize || .db.columnStore.manualBlockRotation {
			return , , nil
		}

		// We need to rotate the block and the writer won't actually be used.
		()

		 = .RotateBlock(, )
		if  != nil {
			return nil, nil, fmt.Errorf("rotate block: %w", )
		}
	}
}

func ( *Table) ( context.Context,  func( context.Context,  uint64) error) error {
	,  := .tracer.Start(, "Table/View")
	 := .db.beginRead()
	.SetAttributes(attribute.Int64("tx", int64())) // Attributes don't support uint64...
	defer .End()
	return (, )
}

// Iterator iterates in order over all granules in the table. It stops iterating when the iterator function returns false.
func ( *Table) (
	 context.Context,
	 uint64,
	 memory.Allocator,
	 []logicalplan.Callback,
	 ...logicalplan.Option,
) error {
	 := &logicalplan.IterOptions{}
	for ,  := range  {
		()
	}
	,  := .tracer.Start(, "Table/Iterator")
	.SetAttributes(attribute.Int("physicalProjections", len(.PhysicalProjection)))
	.SetAttributes(attribute.Int("projections", len(.Projection)))
	.SetAttributes(attribute.Int("distinct", len(.DistinctColumns)))
	defer .End()

	if len() == 0 {
		return errors.New("no callbacks provided")
	}
	 := make(chan any, len()*4) // buffer up to 4 row groups per callback
	defer func() {                                // Drain the channel of any leftover parts due to cancellation or error
		for  := range  {
			switch t := .(type) {
			case index.ReleaseableRowGroup:
				.Release()
			case arrow.Record:
				.Release()
			}
		}
	}()

	// Previously we sorted all row groups into a single row group here,
	// but it turns out that none of the downstream uses actually rely on
	// the sorting so it's not worth it in the general case. Physical plans
	// can decide to sort if they need to in order to exploit the
	// characteristics of sorted data.

	// bufferSize specifies a threshold of records past which the
	// buffered results are flushed to the next operator.
	const  = 1024

	,  := errgroup.WithContext()
	for ,  := range  {
		 := 
		.Go(recovery.Do(func() error {
			 := pqarrow.NewParquetConverter(, *)
			defer .Close()

			for {
				select {
				case <-.Done():
					return .Err()
				case ,  := <-:
					if ! {
						 := .NewRecord()
						if  == nil {
							return nil
						}
						defer .Release()
						if .NumRows() == 0 {
							return nil
						}
						return (, )
					}

					switch rg := .(type) {
					case arrow.Record:
						defer .Release()
						 := pqarrow.Project(, .PhysicalProjection)
						defer .Release()
						 := (, )
						if  != nil {
							return 
						}
					case index.ReleaseableRowGroup:
						defer .Release()
						if  := .Convert(, , .schema);  != nil {
							return fmt.Errorf("failed to convert row group to arrow record: %v", )
						}
						// This RowGroup had no relevant data. Ignore it.
						if len(.Fields()) == 0 {
							continue
						}
						if .NumRows() >=  {
							 := func() error {
								 := .NewRecord()
								defer .Release()
								.Reset() // Reset the converter to drop any dictionaries that were built.
								return (, )
							}()
							if  != nil {
								return 
							}
						}
					case dynparquet.DynamicRowGroup:
						if  := .Convert(, , .schema);  != nil {
							return fmt.Errorf("failed to convert row group to arrow record: %v", )
						}
						// This RowGroup had no relevant data. Ignore it.
						if len(.Fields()) == 0 {
							continue
						}
						if .NumRows() >=  {
							 := func() error {
								 := .NewRecord()
								defer .Release()
								.Reset() // Reset the converter to drop any dictionaries that were built.
								return (, )
							}()
							if  != nil {
								return 
							}
						}
					default:
						return fmt.Errorf("unknown row group type: %T", )
					}
				}
			}
		}, .logger))
	}

	.Go(func() error {
		defer close()
		return .collectRowGroups(, , .Filter, .ReadMode, )
	})

	return .Wait()
}

// SchemaIterator iterates in order over all granules in the table and returns
// all the schemas seen across the table.
func ( *Table) (
	 context.Context,
	 uint64,
	 memory.Allocator,
	 []logicalplan.Callback,
	 ...logicalplan.Option,
) error {
	 := &logicalplan.IterOptions{}
	for ,  := range  {
		()
	}
	,  := .tracer.Start(, "Table/SchemaIterator")
	.SetAttributes(attribute.Int("physicalProjections", len(.PhysicalProjection)))
	.SetAttributes(attribute.Int("projections", len(.Projection)))
	.SetAttributes(attribute.Int("distinct", len(.DistinctColumns)))
	defer .End()

	if len() == 0 {
		return errors.New("no callbacks provided")
	}

	 := make(chan any, len()*4) // buffer up to 4 row groups per callback
	defer func() {                                // Drain the channel of any leftover parts due to cancellation or error
		for  := range  {
			switch t := .(type) {
			case index.ReleaseableRowGroup:
				.Release()
			case arrow.Record:
				.Release()
			}
		}
	}()

	 := arrow.NewSchema(
		[]arrow.Field{
			{Name: "name", Type: arrow.BinaryTypes.String},
		},
		nil,
	)

	,  := errgroup.WithContext()
	for ,  := range  {
		 := 
		.Go(recovery.Do(func() error {
			for {
				select {
				case <-.Done():
					return .Err()
				case ,  := <-:
					if ! {
						return nil // we're done
					}

					 := array.NewRecordBuilder(, )

					switch t := .(type) {
					case arrow.Record:
						for  := 0;  < .Schema().NumFields(); ++ {
							.Field(0).(*array.StringBuilder).Append(.Schema().Field().Name)
						}
						 := .NewRecord()
						 := (, )
						.Release()
						.Release()
						if  != nil {
							return 
						}
					case index.ReleaseableRowGroup:
						if  == nil {
							return errors.New("received nil rowGroup") // shouldn't happen, but anyway
						}
						defer .Release()
						 := .Schema().Fields()
						 := make([]string, 0, len())
						for ,  := range  {
							 = append(, .Name())
						}

						.Field(0).(*array.StringBuilder).AppendValues(, nil)

						 := .NewRecord()
						if  := (, );  != nil {
							return 
						}
						.Release()
						.Release()
					case dynparquet.DynamicRowGroup:
						if  == nil {
							return errors.New("received nil rowGroup") // shouldn't happen, but anyway
						}
						 := .Schema().Fields()
						 := make([]string, 0, len())
						for ,  := range  {
							 = append(, .Name())
						}

						.Field(0).(*array.StringBuilder).AppendValues(, nil)

						 := .NewRecord()
						if  := (, );  != nil {
							return 
						}
						.Release()
						.Release()
					default:
						return fmt.Errorf("unknown row group type: %T", )
					}
				}
			}
		}, .logger))
	}

	.Go(func() error {
		if  := .collectRowGroups(, , .Filter, .ReadMode, );  != nil {
			return 
		}
		close()
		return nil
	})

	return .Wait()
}

func generateULID() ulid.ULID {
	 := time.Now()
	 := ulid.Monotonic(rand.New(rand.NewSource(.UnixNano())), 0)
	return ulid.MustNew(ulid.Timestamp(), )
}

func newTableBlock( *Table, ,  uint64,  ulid.ULID) (*TableBlock, error) {
	 := &TableBlock{
		table:  ,
		mtx:    &sync.RWMutex{},
		ulid:   ,
		logger: .logger,
		tracer: .tracer,
		minTx:  ,
		prevTx: ,
	}

	var  error
	.index,  = index.NewLSM(
		filepath.Join(.db.indexDir(), .name, .String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
		.schema,
		.IndexConfig(),
		.db.HighWatermark,
		index.LSMWithMetrics(&.metrics.indexMetrics),
		index.LSMWithLogger(.logger),
	)
	if  != nil {
		return nil, fmt.Errorf("new LSM: %w", )
	}

	return , nil
}

// EnsureCompaction forces a TableBlock compaction.
func ( *TableBlock) () error {
	return .index.EnsureCompaction()
}

func ( *TableBlock) ( context.Context,  uint64,  arrow.Record) error {
	 := util.TotalRecordSize()
	defer func() {
		.table.metrics.rowsInserted.Add(float64(.NumRows()))
		.table.metrics.rowInsertSize.Observe(float64(.NumRows()))
		.table.metrics.rowBytesInserted.Add(float64())
	}()

	if .NumRows() == 0 {
		.table.metrics.zeroRowsInserted.Add(1)
		return nil
	}

	.index.Add(, )
	.table.metrics.numParts.Inc()
	.uncompressedInsertsSize.Add()
	return nil
}

// Size returns the cumulative size of all buffers in the table. This is roughly the size of the table in bytes.
func ( *TableBlock) () int64 {
	return .index.Size()
}

// Index provides atomic access to the table index.
func ( *TableBlock) () *index.LSM {
	return .index
}

// Serialize the table block into a single Parquet file.
func ( *TableBlock) ( io.Writer) error {
	return .index.Rotate(.table.externalParquetCompaction())
}

type ParquetWriter interface {
	Flush() error
	WriteRows([]parquet.Row) (int, error)
	io.Closer
}

// parquetRowWriter is a stateful parquet row group writer.
type parquetRowWriter struct {
	schema *dynparquet.Schema
	w      ParquetWriter

	rowGroupSize int
	maxNumRows   int

	rowGroupRowsWritten int
	totalRowsWritten    int
	rowsBuf             []parquet.Row
}

type parquetRowWriterOption func(p *parquetRowWriter)

// rowWriter returns a new Parquet row writer with the given dynamic columns.
// TODO(asubiotto): Can we delete this parquetRowWriter?
func ( *TableBlock) ( ParquetWriter,  ...parquetRowWriterOption) (*parquetRowWriter, error) {
	 := 256
	 := .table.config.Load()
	if .RowGroupSize > 0 {
		 = int(.RowGroupSize)
	}

	 := &parquetRowWriter{
		w:            ,
		schema:       .table.schema,
		rowsBuf:      make([]parquet.Row, ),
		rowGroupSize: int(.RowGroupSize),
	}

	for ,  := range  {
		()
	}

	return , nil
}

// WriteRows will write the given rows to the underlying Parquet writer. It returns the number of rows written.
func ( *parquetRowWriter) ( parquet.RowReader) (int, error) {
	 := 0
	for .maxNumRows == 0 || .totalRowsWritten < .maxNumRows {
		if .rowGroupSize > 0 && .rowGroupRowsWritten+len(.rowsBuf) > .rowGroupSize {
			// Read only as many rows as we need to complete the row group size limit.
			.rowsBuf = .rowsBuf[:.rowGroupSize-.rowGroupRowsWritten]
		}
		if .maxNumRows != 0 && .totalRowsWritten+len(.rowsBuf) > .maxNumRows {
			// Read only as many rows as we need to write if they would bring
			// us over the limit.
			.rowsBuf = .rowsBuf[:.maxNumRows-.totalRowsWritten]
		}
		,  := .ReadRows(.rowsBuf)
		if  != nil &&  != io.EOF {
			return 0, 
		}
		if  == 0 {
			break
		}

		if _,  = .w.WriteRows(.rowsBuf[:]);  != nil {
			return 0, 
		}
		 += 
		.rowGroupRowsWritten += 
		.totalRowsWritten += 
		if .rowGroupSize > 0 && .rowGroupRowsWritten >= .rowGroupSize {
			if  := .w.Flush();  != nil {
				return 0, 
			}
			.rowGroupRowsWritten = 0
		}
	}

	return , nil
}

func ( *parquetRowWriter) () error {
	return .w.Close()
}

// memoryBlocks collects the active and pending blocks that are currently resident in memory.
// The pendingReadersWg.Done() function must be called on all blocks returned once processing is finished.
func ( *Table) () ([]*TableBlock, uint64) {
	.mtx.RLock()
	defer .mtx.RUnlock()

	if .active == nil { // this is currently a read only table
		return nil, 0
	}

	 := .active.ulid.Time()
	.active.pendingReadersWg.Add(1)
	 := []*TableBlock{.active}
	for  := range .pendingBlocks {
		.pendingReadersWg.Add(1)
		 = append(, )

		if .ulid.Time() <  {
			 = .ulid.Time()
		}
	}

	return , 
}

// collectRowGroups collects all the row groups from the table for the given filter.
func ( *Table) (
	 context.Context,
	 uint64,
	 logicalplan.Expr,
	 logicalplan.ReadMode,
	 chan<- any,
) error {
	,  := .tracer.Start(, "Table/collectRowGroups")
	defer .End()

	// pending blocks could be uploaded to the bucket while we iterate on them.
	// to avoid to iterate on them again while reading the block file
	// we keep the last block timestamp to be read from the bucket and pass it to the IterateBucketBlocks() function
	// so that every block with a timestamp >= lastReadBlockTimestamp is discarded while being read.
	var  uint64
	if  != logicalplan.ReadModeDataSourcesOnly {
		,  := .memoryBlocks()
		 = 
		defer func() {
			for ,  := range  {
				.pendingReadersWg.Done()
			}
		}()
		for ,  := range  {
			if  := .index.Scan(, "", .schema, , , func( context.Context,  any) error {
				select {
				case <-.Done():
					if ,  := .(index.ReleaseableRowGroup);  {
						.Release()
					}
					return .Err()
				case  <- :
					return nil
				}
			});  != nil {
				return 
			}
		}
	}

	if  == logicalplan.ReadModeInMemoryOnly {
		return nil
	}

	// Collect from all other data sources.
	for ,  := range .db.sources {
		.AddEvent(fmt.Sprintf("source/%s", .String()))
		if  := .Scan(, filepath.Join(.db.name, .name), .schema, , , func( context.Context,  any) error {
			select {
			case <-.Done():
				if ,  := .(index.ReleaseableRowGroup);  {
					.Release()
				}
				return .Err()
			case  <- :
				return nil
			}
		});  != nil {
			return 
		}
	}

	return nil
}

// close notifies a table to stop accepting writes.
func ( *Table) () {
	.mtx.Lock()
	defer .mtx.Unlock()

	.active.pendingWritersWg.Wait()
	.closing = true
	.active.index.WaitForPendingCompactions()
}

func ( *Table) ( io.Writer) func( []parts.Part) (parts.Part, int64, int64, error) {
	return func( []parts.Part) (parts.Part, int64, int64, error) {
		,  := .compactParts(, )
		if  != nil {
			return nil, 0, 0, 
		}

		return nil, , 0, nil
	}
}

// compactParts will compact the given parts into a Parquet file written to w.
// It returns the size in bytes of the compacted parts.
func ( *Table) ( io.Writer,  []parts.Part,  ...parquet.WriterOption) (int64, error) {
	if len() == 0 {
		return 0, nil
	}

	 := int64(0)
	for ,  := range  {
		 += .Size()
	}

	if .schema.UniquePrimaryIndex {
		,  := .distinctRecordsForCompaction()
		if  != nil {
			return 0, 
		}
		if  != nil {
			// Arrow distinction was successful.
			defer func() {
				for ,  := range  {
					.Release()
				}
			}()
			// Note that the records must be sorted (sortInput=true) because
			// there is no guarantee that order is maintained.
			return , .writeRecordsToParquet(, , true, ...)
		}
	}

	,  := .buffersForCompaction(, , ...)
	if  != nil {
		return 0, 
	}
	if  == nil {
		// Optimization.
		return , nil
	}

	,  := .schema.MergeDynamicRowGroups()
	if  != nil {
		return 0, 
	}
	 = func() error {
		var  dynparquet.ParquetWriter
		if len() > 0 {
			,  = .schema.NewWriter(, .DynamicColumns(), false, ...)
			if  != nil {
				return 
			}
		} else {
			,  := .schema.GetWriter(, .DynamicColumns(), false)
			if  != nil {
				return 
			}
			defer .schema.PutWriter()
			 = .ParquetWriter
		}
		,  := .active.rowWriter()
		if  != nil {
			return 
		}
		defer .close()

		 := .Rows()
		defer .Close()

		var  parquet.RowReader = 
		if .schema.UniquePrimaryIndex {
			// Given all inputs are sorted, we can deduplicate the rows using
			// DedupeRowReader, which deduplicates consecutive rows that are
			// equal on the sorting columns.
			 = parquet.DedupeRowReader(, .Schema().Comparator(.SortingColumns()...))
		}

		if ,  := .writeRows();  != nil {
			return 
		}

		return nil
	}()
	if  != nil {
		return 0, 
	}

	return , nil
}

// buffersForCompaction, given a slice of possibly overlapping parts, returns
// the minimum slice of dynamic row groups to be merged together for compaction.
// If nil, nil is returned, the resulting serialized buffer is written directly
// to w as an optimization.
func ( *Table) ( io.Writer,  []parts.Part,  ...parquet.WriterOption) ([]dynparquet.DynamicRowGroup, error) {
	, ,  := parts.FindMaximumNonOverlappingSet(.schema, )
	if  != nil {
		return nil, 
	}
	 := make([]dynparquet.DynamicRowGroup, 0, len())
	for ,  := range  {
		,  := .AsSerializedBuffer(.schema)
		if  != nil {
			return nil, 
		}
		 = append(, .MultiDynamicRowGroup())
	}
	if len() == 0 {
		return , nil
	}

	 := true
	for ,  := range  {
		if .Record() == nil {
			 = false
			break
		}
	}
	if len() == 1 || ! {
		// Not worth doing anything if only one part does not overlap. If there
		// is at least one non-arrow part then optimizations cannot be made.
		 := make([]dynparquet.DynamicRowGroup, 0, len())
		for ,  := range  {
			,  := .AsSerializedBuffer(.schema)
			if  != nil {
				return nil, 
			}
			 = append(, .MultiDynamicRowGroup())
		}
		 := [0]
		if len() > 1 {
			// WithAlreadySorted ensures that a parquet.MultiRowGroup is created
			// here, which is much cheaper than actually merging all these row
			// groups.
			,  = .schema.MergeDynamicRowGroups(, dynparquet.WithAlreadySorted())
			if  != nil {
				return nil, 
			}
		}
		 = append(, )
		return , nil
	}

	// All the non-overlapping parts are arrow records, and can therefore be
	// directly written to a parquet file. If there are no overlapping parts,
	// write directly to w.
	var  bytes.Buffer
	if len() > 0 {
		 = &
	}

	 := make([]arrow.Record, 0, len())
	for ,  := range  {
		 = append(, .Record())
	}

	if  := .writeRecordsToParquet(, , false, ...);  != nil {
		return nil, 
	}

	if len() == 0 {
		// Result was written directly to w.
		return nil, nil
	}

	,  := dynparquet.ReaderFromBytes(.Bytes())
	if  != nil {
		return nil, 
	}
	 = append(, .MultiDynamicRowGroup())
	return , nil
}

func ( *Table) ( io.Writer,  []arrow.Record,  bool,  ...parquet.WriterOption) error {
	 := make([]map[string][]string, 0, len())
	for ,  := range  {
		 = append(, pqarrow.RecordDynamicCols())
	}
	 := dynparquet.MergeDynamicColumnSets()
	var  dynparquet.ParquetWriter
	if len() > 0 {
		var  error
		,  = .schema.NewWriter(, , false, ...)
		if  != nil {
			return 
		}
	} else {
		,  := .schema.GetWriter(, , )
		if  != nil {
			return 
		}
		defer .schema.PutWriter()
		 = 
	}

	return pqarrow.RecordsToFile(.schema, , )
}

// distinctRecordsForCompaction performs a distinct on the given parts. If at
// least one non-arrow part is found, nil, nil is returned in which case, the
// caller should fall back to normal compaction. On success, the caller is
// responsible for releasing the returned records.
func ( *Table) ( []parts.Part) ([]arrow.Record, error) {
	 := .schema.ColumnDefinitionsForSortingColumns()
	 := make([]logicalplan.Expr, 0, len())
	for ,  := range  {
		var  logicalplan.Expr
		if .Dynamic {
			 = logicalplan.DynCol(.Name)
		} else {
			 = logicalplan.Col(.Name)
		}
		 = append(, )
	}

	 := physicalplan.Distinct(memory.NewGoAllocator(), .tracer, )
	 := physicalplan.OutputPlan{}
	 := make([]arrow.Record, 0)
	.SetNextCallback(func( context.Context,  arrow.Record) error {
		.Retain()
		 = append(, )
		return nil
	})
	.SetNext(&)

	if ,  := func() (bool, error) {
		 := context.TODO()
		for ,  := range  {
			if .Record() == nil {
				// Caller should fall back to parquet distinction.
				return false, nil
			}
			if  := .Callback(, .Record());  != nil {
				return false, 
			}
		}
		if  := .Finish();  != nil {
			return false, 
		}
		return true, nil
	}(); ! ||  != nil {
		for ,  := range  {
			.Release()
		}
		return nil, 
	}
	return , nil
}

// IndexConfig returns the index configuration for the table. It makes a copy of the column store index config and injects it's compactParts method.
func ( *Table) () []*index.LevelConfig {
	 := make([]*index.LevelConfig, 0, len(.db.columnStore.indexConfig))
	for ,  := range .db.columnStore.indexConfig {
		 := .compactParts
		if  == len(.db.columnStore.indexConfig)-1 {
			// The last level is the in-memory level, which is never compacted.
			 = nil
		}
		 = append(, &index.LevelConfig{
			Level:   .Level,
			MaxSize: .MaxSize,
			Type:    .Type,
			Compact: , // TODO: this is bad and it should feel bad. We shouldn't need the table object to define how parts are compacted. Refactor needed.
		})
	}

	return 
}