package physicalplan

import (
	
	
	
	
	

	
	
	
	

	
)

type ReservoirSampler struct {
	next      PhysicalPlan
	allocator memory.Allocator

	// size is the max number of rows in the reservoir
	size int64
	// sizeInBytes is the total number of bytes that are held by records in the reservoir. This includes
	// rows that are not sampled but are being held onto because of a reference to the record that is still in the reservoir.
	sizeInBytes int64
	// sizeLimit is the number of bytes that sizeInBytes is allowed to get to before the reservoir is materialized. This is to prevent the reservoir from growing too large.
	sizeLimit int64

	// reservoir is the set of records that have been sampled. They may vary in schema due to dynamic columns.
	reservoir []sample

	w float64 // w is the probability of keeping a record
	n int64   // n is the number of rows that have been sampled thus far
	i float64 // i is the current row number being sampled
}

type sample struct {
	// i is the index of the row in the record that is being sampled. If i is -1, the entire record is being sampled.
	i int64
	// ref to the record that is being sampled
	ref *referencedRecord
}

type referencedRecord struct {
	// Record is a pointer to the record that is being sampled
	arrow.Record
	// size is the size of the record in bytes
	size int64
	// ref is the number of references to the record. When ref is 0, the record can be released.
	ref int64
}

func ( *referencedRecord) () int64 {
	defer .Record.Release()
	.ref--
	if .ref == 0 {
		return .size
	}

	return 0
}

func ( *referencedRecord) () int64 {
	defer .Record.Retain()
	.ref++
	if .ref == 1 {
		return .size
	}
	return 0
}

// NewReservoirSampler will create a new ReservoirSampler operator that will sample up to size rows of all records seen by Callback.
func (,  int64,  memory.Allocator) *ReservoirSampler {
	return &ReservoirSampler{
		size:      ,
		sizeLimit: ,
		w:         math.Exp(math.Log(rand.Float64()) / float64()),
		allocator: ,
	}
}

func ( *ReservoirSampler) ( PhysicalPlan) {
	.next = 
}

func ( *ReservoirSampler) () *Diagram {
	var  *Diagram
	if .next != nil {
		 = .next.Draw()
	}
	 := fmt.Sprintf("Reservoir Sampler (%v)", .size)
	return &Diagram{Details: , Child: }
}

func ( *ReservoirSampler) () {
	for ,  := range .reservoir {
		.sizeInBytes -= .ref.Release()
	}
	.next.Close()
}

// Callback collects all the records to sample.
func ( *ReservoirSampler) ( context.Context,  arrow.Record) error {
	var  *referencedRecord
	,  = .fill()
	if  == nil { // The record fit in the reservoir
		return nil
	}
	if .n == .size { // The reservoir just filled up. Slice the reservoir to the correct size so we can easily perform row replacement
		.sliceReservoir()
	}

	// Sample the record
	.sample(, )
	if .sizeInBytes >= .sizeLimit {
		if  := .materialize(.allocator);  != nil {
			return 
		}
	}
	return nil
}

func refPtr( arrow.Record) *referencedRecord {
	return &referencedRecord{Record: , size: util.TotalRecordSize()}
}

// fill will fill the reservoir with the first size records.
func ( *ReservoirSampler) ( arrow.Record) (arrow.Record, *referencedRecord) {
	if .n >= .size {
		return , refPtr()
	}

	if .n+.NumRows() <= .size { // The record fits in the reservoir
		 := sample{
			i:   -1,
			ref: refPtr(),
		}
		.reservoir = append(.reservoir, )
		.sizeInBytes += .ref.Retain()
		.n += .NumRows()
		return nil, nil
	}

	// The record partially fits in the reservoir
	 := refPtr()
	 := sample{
		i:   -1,
		ref: refPtr(.NewSlice(0, .size-.n)),
	}
	.reservoir = append(.reservoir, )
	.sizeInBytes += .ref.Retain()
	 = .NewSlice(.size-.n, .NumRows())
	.n = .size
	return , 
}

func ( *ReservoirSampler) () {
	 := make([]sample, 0, .size)
	for ,  := range .reservoir {
		 := refPtr(.ref.Record)
		for  := int64(0);  < .ref.NumRows(); ++ {
			 := sample{
				i:   ,
				ref: ,
			}
			 = append(, )
			.sizeInBytes += .ref.Retain()
		}
		.sizeInBytes -= .ref.Release()
	}
	.reservoir = 
}

// sample implements the reservoir sampling algorithm found https://en.wikipedia.org/wiki/Reservoir_sampling.
func ( *ReservoirSampler) ( arrow.Record,  *referencedRecord) {
	// The size can be 0 and in that case we don't want to sample.
	if .size == 0 {
		return
	}
	 := .n + .NumRows()
	if .i == 0 {
		.i = float64(.n) - 1
	} else if .i < float64() {
		.replace(rand.Intn(int(.size)), sample{i: int64(.i) - .n, ref: })
		.w = .w * math.Exp(math.Log(rand.Float64())/float64(.size))
	}

	for .i < float64() {
		.i += math.Floor(math.Log(rand.Float64())/math.Log(1-.w)) + 1
		if .i < float64() {
			// replace a random item of the reservoir with row i
			.replace(rand.Intn(int(.size)), sample{i: int64(.i) - .n, ref: })
			.w = .w * math.Exp(math.Log(rand.Float64())/float64(.size))
		}
	}
	.n = 
}

// Finish sends all the records in the reservoir to the next operator.
func ( *ReservoirSampler) ( context.Context) error {
	// Send all the records in the reservoir to the next operator
	for ,  := range .reservoir {
		if .i == -1 {
			if  := .next.Callback(, .ref.Record);  != nil {
				return 
			}
			continue
		}

		 := .ref.NewSlice(.i, .i+1)
		defer .Release()
		if  := .next.Callback(, );  != nil {
			return 
		}
	}

	return .next.Finish()
}

// replace will replace the row at index i with the row in the record r at index j.
func ( *ReservoirSampler) ( int,  sample) {
	.sizeInBytes -= .reservoir[].ref.Release()
	.reservoir[] = 
	.sizeInBytes += .ref.Retain()
}

// materialize will build a new record from the reservoir to release the underlying records.
func ( *ReservoirSampler) ( memory.Allocator) error {
	// Build the unified schema for the records
	 := .reservoir[0].ref.Schema()
	 := .Fields()
	 := map[string]struct{}{}
	for  := 1;  < len(.reservoir); ++ {
		for  := 0;  < .reservoir[].ref.Schema().NumFields(); ++ {
			 := .reservoir[].ref.Schema().Field().Name
			if ,  := []; ! && !.HasField(.reservoir[].ref.Schema().Field().Name) {
				 = append(, .reservoir[].ref.Schema().Field())
				[] = struct{}{}
			}
		}
	}

	// Sort the fields alphabetically
	slices.SortFunc(, func(,  arrow.Field) int {
		switch {
		case .Name < .Name:
			return -1
		case .Name > .Name:
			return 1
		default:
			return 0
		}
	})

	// Merge all the records slices
	 = arrow.NewSchema(, nil)
	 := array.NewRecordBuilder(, )
	defer .Release()

	for ,  := range .reservoir {
		for ,  := range .Fields() { // TODO handle disparate schemas
			// Check if this record has this field
			if !.ref.Schema().HasField(.Field().Name) {
				if  := builder.AppendValue(, nil, -1);  != nil {
					return 
				}
			} else {
				if  := builder.AppendValue(, .ref.Column(), int(.i));  != nil {
					return 
				}
			}
		}
	}

	// Clear the reservoir
	for ,  := range .reservoir {
		.sizeInBytes -= .ref.Release()
	}
	// Set the record to be the new reservoir
	 := sample{i: -1, ref: refPtr(.NewRecord())}
	.sizeInBytes += .ref.Retain()
	.ref.Record.Release() // Release this here because of the retain in the previous line.
	.reservoir = []sample{}

	// reslice the reservoir for easy row replacement
	.sliceReservoir()
	return nil
}