package parquet

import (
	
	
)

// SortingWriter is a type similar to GenericWriter but it ensures that rows
// are sorted according to the sorting columns configured on the writer.
//
// The writer accumulates rows in an in-memory buffer which is sorted when it
// reaches the target number of rows, then written to a temporary row group.
// When the writer is flushed or closed, the temporary row groups are merged
// into a row group in the output file, ensuring that rows remain sorted in the
// final row group.
//
// Because row groups get encoded and compressed, they hold a lot less memory
// than if all rows were retained in memory. Sorting then merging rows chunks
// also tends to be a lot more efficient than sorting all rows in memory as it
// results in better CPU cache utilization since sorting multi-megabyte arrays
// causes a lot of cache misses since the data set cannot be held in CPU caches.
type SortingWriter[ any] struct {
	rowbuf  *RowBuffer[]
	writer  *GenericWriter[]
	output  *GenericWriter[]
	buffer  io.ReadWriteSeeker
	maxRows int64
	numRows int64
	sorting SortingConfig
	dedupe  dedupe
}

// NewSortingWriter constructs a new sorting writer which writes a parquet file
// where rows of each row group are ordered according to the sorting columns
// configured on the writer.
//
// The sortRowCount argument defines the target number of rows that will be
// sorted in memory before being written to temporary row groups. The greater
// this value the more memory is needed to buffer rows in memory. Choosing a
// value that is too small limits the maximum number of rows that can exist in
// the output file since the writer cannot create more than 32K temporary row
// groups to hold the sorted row chunks.
func [ any]( io.Writer,  int64,  ...WriterOption) *SortingWriter[] {
	,  := NewWriterConfig(...)
	if  != nil {
		panic()
	}
	return &SortingWriter[]{
		rowbuf: NewRowBuffer[](&RowGroupConfig{
			Schema:  .Schema,
			Sorting: .Sorting,
		}),
		writer: NewGenericWriter[](io.Discard, &WriterConfig{
			CreatedBy:            .CreatedBy,
			ColumnPageBuffers:    .ColumnPageBuffers,
			ColumnIndexSizeLimit: .ColumnIndexSizeLimit,
			PageBufferSize:       .PageBufferSize,
			WriteBufferSize:      .WriteBufferSize,
			DataPageVersion:      .DataPageVersion,
			Schema:               .Schema,
			Compression:          .Compression,
			Sorting:              .Sorting,
		}),
		output:  NewGenericWriter[](, ),
		maxRows: ,
		sorting: .Sorting,
	}
}

func ( *SortingWriter[]) () error {
	if  := .Flush();  != nil {
		return 
	}
	return .output.Close()
}

func ( *SortingWriter[]) () error {
	defer .resetSortingBuffer()

	if  := .sortAndWriteBufferedRows();  != nil {
		return 
	}

	if .numRows == 0 {
		return nil
	}

	if  := .writer.Close();  != nil {
		return 
	}

	,  := .buffer.Seek(0, io.SeekCurrent)
	if  != nil {
		return 
	}

	,  := OpenFile(newReaderAt(.buffer), ,
		&FileConfig{
			SkipPageIndex:    true,
			SkipBloomFilters: true,
			ReadBufferSize:   defaultReadBufferSize,
		},
	)
	if  != nil {
		return 
	}

	,  := MergeRowGroups(.RowGroups(),
		&RowGroupConfig{
			Schema:  .Schema(),
			Sorting: .sorting,
		},
	)
	if  != nil {
		return 
	}

	 := .Rows()
	defer .Close()

	 := RowReader()
	if .sorting.DropDuplicatedRows {
		 = DedupeRowReader(, .rowbuf.compare)
	}

	if ,  := CopyRows(.output, );  != nil {
		return 
	}

	return .output.Flush()
}

func ( *SortingWriter[]) ( io.Writer) {
	.output.Reset()
	.rowbuf.Reset()
	.resetSortingBuffer()
}

func ( *SortingWriter[]) () {
	.writer.Reset(io.Discard)
	.numRows = 0

	if .buffer != nil {
		.sorting.SortingBuffers.PutBuffer(.buffer)
		.buffer = nil
	}
}

func ( *SortingWriter[]) ( []) (int, error) {
	return .writeRows(len(), func(,  int) (int, error) { return .rowbuf.Write([:]) })
}

func ( *SortingWriter[]) ( []Row) (int, error) {
	return .writeRows(len(), func(,  int) (int, error) { return .rowbuf.WriteRows([:]) })
}

func ( *SortingWriter[]) ( int,  func(,  int) (int, error)) (int, error) {
	 := 0

	for  <  {
		if .rowbuf.NumRows() >= .maxRows {
			if  := .sortAndWriteBufferedRows();  != nil {
				return , 
			}
		}

		 := int(.maxRows - .rowbuf.NumRows())
		 += 
		if  >  {
			 = 
		}

		,  := (, )
		 += 

		if  != nil {
			return , 
		}
	}

	return , nil
}

func ( *SortingWriter[]) (,  string) {
	.output.SetKeyValueMetadata(, )
}

func ( *SortingWriter[]) () *Schema {
	return .output.Schema()
}

func ( *SortingWriter[]) () error {
	if .rowbuf.Len() == 0 {
		return nil
	}

	defer .rowbuf.Reset()
	sort.Sort(.rowbuf)

	if .sorting.DropDuplicatedRows {
		.rowbuf.rows = .rowbuf.rows[:.dedupe.deduplicate(.rowbuf.rows, .rowbuf.compare)]
		defer .dedupe.reset()
	}

	 := .rowbuf.Rows()
	defer .Close()

	if .buffer == nil {
		.buffer = .sorting.SortingBuffers.GetBuffer()
		.writer.Reset(.buffer)
	}

	,  := CopyRows(.writer, )
	if  != nil {
		return 
	}

	if  := .writer.Flush();  != nil {
		return 
	}

	.numRows += 
	return nil
}