package index

import (
	
	
	
	
	
	
	

	
	
	
	
	
	

	
	
	
	
)

type CompactionType int

const (
	CompactionTypeUnknown CompactionType = iota

	// CompactionTypeParquetDisk is a compaction type that will compact the parts into a Parquet file on disk.
	CompactionTypeParquetDisk

	// CompactionTypeParquetMemory is a compaction type that will compact the parts into a Parquet file in memory.
	CompactionTypeParquetMemory
)

// LSM is a log-structured merge-tree like index. It is implemented as a single linked list of parts.
//
// Arrow records are always added to the L0 list. When a list reaches it's configured max size it is compacted
// calling the levels Compact function and is then added as a new part to the next level.
//
// [L0]->[record]->[record]->[L1]->[record/parquet]->[record/parquet] etc.
type LSM struct {
	sync.RWMutex
	compacting   sync.Mutex
	compactionWg sync.WaitGroup

	schema *dynparquet.Schema

	dir           string
	maxTXRecoverd []uint64
	levels        []Level
	partList      *Node
	sizes         []atomic.Int64

	// Options
	logger    log.Logger
	metrics   *LSMMetrics
	watermark func() uint64
}

// LSMMetrics are the metrics for an LSM index.
type LSMMetrics struct {
	Compactions        *prometheus.CounterVec
	LevelSize          *prometheus.GaugeVec
	CompactionDuration prometheus.Observer
}

// LevelConfig is the configuration for a level in the LSM.
// The MaxSize is the maximum size of the level in bytes before it triggers a compaction into the next level.
type LevelConfig struct {
	Level   SentinelType
	MaxSize int64
	Type    CompactionType
	Compact Compaction
}

type Level interface {
	Compact(parts []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error)
	Snapshot(parts []parts.Part, writer func(parts.Part) error, dir string) error
	MaxSize() int64
	Reset()
}

type LSMOption func(*LSM)

func ( log.Logger) LSMOption {
	return func( *LSM) {
		.logger = 
	}
}

func ( *LSMMetrics) LSMOption {
	return func( *LSM) {
		.metrics = 
	}
}

func ( prometheus.Registerer) *LSMMetrics {
	return &LSMMetrics{
		Compactions: promauto.With().NewCounterVec(prometheus.CounterOpts{
			Name: "frostdb_lsm_compactions_total",
			Help: "The total number of compactions that have occurred.",
		}, []string{"level"}),

		LevelSize: promauto.With().NewGaugeVec(prometheus.GaugeOpts{
			Name: "frostdb_lsm_level_size_bytes",
			Help: "The size of the level in bytes.",
		}, []string{"level"}),

		CompactionDuration: promauto.With().NewHistogram(prometheus.HistogramOpts{
			Name:                        "frostdb_lsm_compaction_total_duration_seconds",
			Help:                        "Total compaction duration",
			NativeHistogramBucketFactor: 1.1,
		}),
	}
}

// NewLSM returns an LSM-like index of len(levels) levels.
// wait is a function that will block until the given transaction has been committed; this is used only during compaction to ensure
// that all the tx in the level up to the compaction tx have been committed before compacting.
func ( string,  *dynparquet.Schema,  []*LevelConfig,  func() uint64,  ...LSMOption) (*LSM, error) {
	if  := validateLevels();  != nil {
		return nil, 
	}

	 := &LSM{
		schema:        ,
		dir:           ,
		maxTXRecoverd: make([]uint64, len()),
		partList:      NewList(L0),
		sizes:         make([]atomic.Int64, len()),
		compacting:    sync.Mutex{},
		logger:        log.NewNopLogger(),
		watermark:     ,
	}

	for ,  := range  {
		()
	}

	// Configure the LSM levels.
	, ,  := configureLSMLevels(, , .logger)
	if  != nil {
		return nil, 
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	.levels = 

	// Reverse iterate (due to prepend) to create the chain of sentinel nodes.
	for  := len() - 1;  > 0; -- {
		.partList.Sentinel([].Level)
	}

	if .metrics == nil {
		.metrics = NewLSMMetrics(prometheus.NewRegistry())
	} else {
		for ,  := range  {
			.metrics.LevelSize.WithLabelValues(.Level.String()).Set(0)
		}
	}

	// Replay the recovered parts
	for ,  := range  {
		.InsertPart()
	}

	return , nil
}

func ( *LSM) () error {
	.compacting.Lock()
	defer .compacting.Unlock()
	.Lock()
	defer .Unlock()

	// Release all the parts to free up any underlying resources.
	.partList.Iterate(func( *Node) bool {
		if .part != nil {
			.part.Release()
		}
		return true
	})

	// Reset the levels to ensure that none of the parts are still being referenced.
	for  := range .levels {
		if .levels[] != nil {
			.levels[].Reset()
		}
	}

	// Remove the index directory
	if  := os.RemoveAll(.dir);  != nil {
		return fmt.Errorf("remove lsm dir: %w", )
	}

	return nil
}

// configureLSMLevels will configure the LSM levels. It will recover the levels from disk and return the recovered parts.
func configureLSMLevels( string,  []*LevelConfig,  log.Logger) ([]Level, []parts.Part, error) {
	 := make([]Level, len())
	 := []parts.Part{}

	// Recover in reverse order so that the highest level is recovered first.
	// This allows us to throw away parts that were compacted into a higher level but for some reason weren't successfully removed.
	for  := len() - 1;  >= 0; -- {
		 := []
		switch .Type {
		case CompactionTypeParquetMemory:
			[] = &inMemoryLevel{
				maxSize: .MaxSize,
				compact: .Compact,
			}
		case CompactionTypeParquetDisk:
			,  := NewFileCompaction(filepath.Join(, fmt.Sprintf("L%v", +1)), .MaxSize, .Compact, ) // TODO: it would be nice to not need to inject the compact function here.
			if  != nil {
				return nil, nil, 
			}
			,  := .recover(parts.WithCompactionLevel( + 1))
			if  != nil {
				return nil, nil, fmt.Errorf("failed to recover level %v parts: %w", +1, )
			}
			 = append(, ...)
			[] = 
		default:
			if  != len()-1 { // Compaction type should not be set for last level
				panic(fmt.Sprintf("unknown compaction type: %v", .Type))
			}
		}
	}

	return , , nil
}

// Size returns the total size of the index in bytes.
func ( *LSM) () int64 {
	var  int64
	for  := range .sizes {
		 += .sizes[].Load()
	}
	return 
}

// LevelSize returns the size of a specific level in bytes.
func ( *LSM) ( SentinelType) int64 {
	return .sizes[].Load()
}

// Snapshot creates a snapshot of the index at the given transaction. It will call the writer function with the parts in the index that are in-memory.
func ( *LSM) ( uint64,  func(parts.Part) error,  string) error {
	.compacting.Lock()
	defer .compacting.Unlock()

	var (
		 []parts.Part
		    error
	)
	var  SentinelType
	.partList.Iterate(func( *Node) bool {
		if .part == nil {
			if .sentinel == L0 { // First node in the list will be L0
				 = L0
				return true
			}

			switch  {
			case L0: // L0 is always in-memory
				for ,  := range  {
					if  := ();  != nil {
						 = 
						return false
					}
				}
			default:
				 := .levels[-1]
				 := filepath.Join(, fmt.Sprintf("%v", ))
				if  := .Snapshot(, , );  != nil {
					 = 
					return false
				}
			}

			 = .sentinel
			 = nil
			return true
		}

		if .part.TX() <=  {
			 = append(, .part)
		}
		return true
	})
	if  != nil {
		return 
	}

	 := .levels[-1]
	 := filepath.Join(, fmt.Sprintf("%v", ))
	return .Snapshot(, , )
}

func validateLevels( []*LevelConfig) error {
	for ,  := range  {
		if int(.Level) !=  {
			return fmt.Errorf("level %d is not in order", .Level)
		}

		switch  {
		case len() - 1:
			if .Compact != nil {
				return fmt.Errorf("level %d is the last level and should not have a compact function", .Level)
			}
		default:
			if .Compact == nil {
				return fmt.Errorf("level %d is not the last level and should have a compact function", .Level)
			}
		}
	}

	return nil
}

func ( *LSM) () SentinelType {
	return SentinelType(len(.levels) - 1)
}

func ( *LSM) ( uint64,  arrow.Record) {
	.Retain()
	 := util.TotalRecordSize()
	.partList.Insert(parts.NewArrowPart(, , uint64(), .schema, parts.WithCompactionLevel(int(L0))))
	 := .sizes[L0].Add(int64())
	.metrics.LevelSize.WithLabelValues(L0.String()).Set(float64())
	if  >= .levels[L0].MaxSize() {
		if .compacting.TryLock() {
			.compactionWg.Add(1)
			go func() {
				defer .compacting.Unlock()
				defer .compactionWg.Done()
				_ = .compact(false)
			}()
		}
	}
}

func ( *LSM) () {
	.compactionWg.Wait()
}

// InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction.
// This should only be used during snapshot recovery. It will drop the insert on the floor if the part is older than a part in the next level of the LSM. This indicates
// that this part is already accounted for in the next level vis compaction.
func ( *LSM) ( parts.Part) {
	 := SentinelType(.CompactionLevel())
	// Check the next levels if there is one to see if this part should be inserted.
	if  != .MaxLevel() {
		for  :=  + 1;  < .MaxLevel()+1; ++ {
			if .TX() <= .maxTXRecoverd[] {
				return
			}
		}
	}

	// Retain the part
	.Retain()

	if  := .TX();  > .maxTXRecoverd[] {
		.maxTXRecoverd[] = 
	}

	// Insert the part into the correct level, but do not do this if parts with newer TXs have already been inserted.
	.findLevel().Insert()
	 := .sizes[].Add(int64(.Size()))
	.metrics.LevelSize.WithLabelValues(.String()).Set(float64())
}

func ( *LSM) () string {
	 := ""
	for  := range .sizes {
		 += fmt.Sprintf("L%v: %d ", , .sizes[].Load())
	}
	 += "\n"
	 += .partList.String()
	return 
}

func ( *LSM) ( context.Context,  string) ([]string, error) {
	return []string{}, nil
}

func ( *LSM) ( func( *Node) bool) {
	.RLock()
	defer .RUnlock()
	.partList.Iterate()
}

func ( *LSM) ( context.Context,  string,  *dynparquet.Schema,  logicalplan.Expr,  uint64,  func(context.Context, any) error) error {
	.RLock()
	defer .RUnlock()

	,  := expr.BooleanExpr()
	if  != nil {
		return fmt.Errorf("boolean expr: %w", )
	}

	var  error
	.partList.Iterate(func( *Node) bool {
		if .part == nil { // encountered a sentinel node; continue on
			return true
		}

		if .part.TX() >  { // skip parts that are newer than this transaction
			return true
		}

		if  := .part.Record();  != nil {
			.Retain()
			if  := (, );  != nil {
				 = 
				return false
			}
			return true
		}

		,  := .part.AsSerializedBuffer(nil)
		if  != nil {
			 = 
			return false
		}

		for  := 0;  < .NumRowGroups(); ++ {
			 := .DynamicRowGroup()
			,  := .Eval(, false)
			if  != nil {
				 = 
				return false
			}

			if  {
				.part.Retain() // Create another reference to this part
				if  := (, &releaseableRowGroup{DynamicRowGroup: , release: .part.Release});  != nil {
					 = 
					return false
				}
			}
		}
		return true
	})
	return 
}

type releaseableRowGroup struct {
	dynparquet.DynamicRowGroup
	release func()
}

func ( *releaseableRowGroup) () {
	.release()
}

type ReleaseableRowGroup interface {
	dynparquet.DynamicRowGroup
	Release()
}

// TODO: this should be changed to just retain the sentinel nodes in the lsm struct to do an O(1) lookup.
func ( *LSM) ( SentinelType) *Node {
	var  *Node
	.partList.Iterate(func( *Node) bool {
		if .part == nil && .sentinel ==  {
			 = 
			return false
		}
		return true
	})

	return 
}

// findNode returns the node that points to node.
func ( *LSM) ( *Node) *Node {
	var  *Node
	.partList.Iterate(func( *Node) bool {
		if .next.Load() ==  {
			 = 
			return false
		}
		return true
	})

	return 
}

// EnsureCompaction forces a compaction of all levels, regardless of whether the
// levels are below the target size.
func ( *LSM) () error {
	.compacting.Lock()
	defer .compacting.Unlock()
	return .compact(true /* ignoreSizes */)
}

// Rotate will write all parts in the LSM into the external writer. No changes are made to the LSM.
func ( *LSM) ( func([]parts.Part) (parts.Part, int64, int64, error)) error {
	.compacting.Lock()
	defer .compacting.Unlock()
	 := time.Now()
	defer func() {
		.metrics.CompactionDuration.Observe(time.Since().Seconds())
	}()

	// Write all the parts to the external writer
	 := []parts.Part{}
	.partList.Iterate(func( *Node) bool {
		if .part == nil {
			return true
		}

		 = append(, .part)
		return true
	})

	, , ,  := ()
	return 
}

// Merge will merge the given level into an arrow record for the next level using the configured Compact function for the given level.
// If this is the max level of the LSM an external writer must be provided to write the merged part elsewhere.
func ( *LSM) ( SentinelType) error {
	if int() > len(.levels) {
		return fmt.Errorf("level %d does not exist", )
	}
	if int() == len(.levels)-1 {
		return fmt.Errorf("cannot merge the last level")
	}
	.metrics.Compactions.WithLabelValues(.String()).Inc()

	 := .findLevel()

	// Find a transaction that is <= the current watermark.
	// This ensures a contiguous sorted list of transactions.
	if  == L0 {
		 = .next.Load()
		if  == nil || .part == nil {
			return nil // nothing to compact
		}

		// Find the first part that is <= the watermark and reset the compact list to that part.
		 := .watermark()
		.Iterate(func( *Node) bool {
			if .part == nil && .sentinel != L0 {
				return false
			}
			if .part.TX() <=  {
				 = 
				return false
			}
			return true
		})
	}

	 := []*Node{}
	var  *Node
	var  error
	.Iterate(func( *Node) bool {
		if .part == nil { // sentinel encountered
			switch .sentinel {
			case : // the sentinel for the beginning of the list
				return true
			case  + 1:
				 = .next.Load() // skip the sentinel to combine the lists
			default:
				 = 
			}
			return false
		}

		 = append(, )
		return true
	})
	if  != nil {
		return 
	}

	if len() == 0 {
		return nil
	}

	var  int64
	var  int64
	var  []parts.Part
	var  error
	 := make([]parts.Part, 0, len())
	for ,  := range  {
		 = append(, .part)
	}
	 := &Node{
		sentinel:  + 1,
	}
	, , ,  = .levels[].Compact(, parts.WithCompactionLevel(int()+1))
	if  != nil {
		return 
	}

	// Create new list for the compacted parts.
	 := &Node{
		part: [0],
	}
	 := 
	for ,  := range [1:] {
		.next.Store(&Node{
			part: ,
		})
		 = .next.Load()
	}
	.next.Store()
	if  != nil {
		.next.Store()
	}
	.sizes[+1].Add(int64())
	.metrics.LevelSize.WithLabelValues(SentinelType( + 1).String()).Set(float64(.sizes[+1].Load()))

	// Replace the compacted list with the new list
	// find the node that points to the first node in our compacted list.
	 = .findNode([0])
	for !.next.CompareAndSwap([0], ) {
		// This can happen at most once in the scenario where a new part is added to the L0 list while we are trying to replace it.
		 = .findNode([0])
	}
	.sizes[].Add(-int64())
	.metrics.LevelSize.WithLabelValues(.String()).Set(float64(.sizes[].Load()))

	// release the old parts
	.Lock()
	for ,  := range  {
		.Release()
	}
	.Unlock()

	// Reset the level that was just compacted
	if  != L0 {
		.levels[-1].Reset()
	}

	return nil
}

// compact is a cascading compaction routine. It will start at the lowest level and compact until the next level is either the max level or the next level does not exceed the max size.
// compact can not be run concurrently.
func ( *LSM) ( bool) error {
	 := time.Now()
	defer func() {
		.metrics.CompactionDuration.Observe(time.Since().Seconds())
	}()

	for  := 0;  < len(.levels)-1; ++ {
		if  || .sizes[].Load() >= .levels[].MaxSize() {
			if  := .merge(SentinelType());  != nil {
				level.Error(.logger).Log("msg", "failed to merge level", "level", , "err", )
				return 
			}
		}
	}

	return nil
}