package parquet

import (
	
	
	
	
)

const (
	defaultRowBufferSize = 42
)

// Row represents a parquet row as a slice of values.
//
// Each value should embed a column index, repetition level, and definition
// level allowing the program to determine how to reconstruct the original
// object from the row.
type Row []Value

// MakeRow constructs a Row from a list of column values.
//
// The function panics if the column indexes of values in each column do not
// match their position in the argument list.
func ( ...[]Value) Row { return AppendRow(nil, ...) }

// AppendRow appends to row the given list of column values.
//
// AppendRow can be used to construct a Row value from columns, while retaining
// the underlying memory buffer to avoid reallocation; for example:
//
// The function panics if the column indexes of values in each column do not
// match their position in the argument list.
func ( Row,  ...[]Value) Row {
	 := 0

	for ,  := range  {
		 += len()

		for ,  := range  {
			if .columnIndex != ^int16() {
				panic(fmt.Sprintf("value of column %d has column index %d", , .Column()))
			}
		}
	}

	if  := cap() - len();  <  {
		 = append(make(Row, 0, len()+), ...)
	}

	return appendRow(, )
}

func appendRow( Row,  [][]Value) Row {
	for ,  := range  {
		 = append(, ...)
	}
	return 
}

// Clone creates a copy of the row which shares no pointers.
//
// This method is useful to capture rows after a call to RowReader.ReadRows when
// values need to be retained before the next call to ReadRows or after the lifespan
// of the reader.
func ( Row) () Row {
	 := make(Row, len())
	for  := range  {
		[] = [].Clone()
	}
	return 
}

// Equal returns true if row and other contain the same sequence of values.
func ( Row) ( Row) bool {
	if len() != len() {
		return false
	}
	for  := range  {
		if !Equal([], []) {
			return false
		}
		if [].repetitionLevel != [].repetitionLevel {
			return false
		}
		if [].definitionLevel != [].definitionLevel {
			return false
		}
		if [].columnIndex != [].columnIndex {
			return false
		}
	}
	return true
}

// Range calls f for each column of row.
func ( Row) ( func( int,  []Value) bool) {
	 := 0

	for  := 0;  < len(); {
		 :=  + 1

		for  < len() && [].columnIndex == ^int16() {
			++
		}

		if !(, [::]) {
			break
		}

		++
		 = 
	}
}

// RowSeeker is an interface implemented by readers of parquet rows which can be
// positioned at a specific row index.
type RowSeeker interface {
	// Positions the stream on the given row index.
	//
	// Some implementations of the interface may only allow seeking forward.
	//
	// The method returns io.ErrClosedPipe if the stream had already been closed.
	SeekToRow(int64) error
}

// RowReader reads a sequence of parquet rows.
type RowReader interface {
	// ReadRows reads rows from the reader, returning the number of rows read
	// into the buffer, and any error that occurred. Note that the rows read
	// into the buffer are not safe for reuse after a subsequent call to
	// ReadRows. Callers that want to reuse rows must copy the rows using Clone.
	//
	// When all rows have been read, the reader returns io.EOF to indicate the
	// end of the sequence. It is valid for the reader to return both a non-zero
	// number of rows and a non-nil error (including io.EOF).
	//
	// The buffer of rows passed as argument will be used to store values of
	// each row read from the reader. If the rows are not nil, the backing array
	// of the slices will be used as an optimization to avoid re-allocating new
	// arrays.
	//
	// The application is expected to handle the case where ReadRows returns
	// less rows than requested and no error, by looking at the first returned
	// value from ReadRows, which is the number of rows that were read.
	ReadRows([]Row) (int, error)
}

// RowReaderFrom reads parquet rows from reader.
type RowReaderFrom interface {
	ReadRowsFrom(RowReader) (int64, error)
}

// RowReaderWithSchema is an extension of the RowReader interface which
// advertises the schema of rows returned by ReadRow calls.
type RowReaderWithSchema interface {
	RowReader
	Schema() *Schema
}

// RowReadSeeker is an interface implemented by row readers which support
// seeking to arbitrary row positions.
type RowReadSeeker interface {
	RowReader
	RowSeeker
}

// RowWriter writes parquet rows to an underlying medium.
type RowWriter interface {
	// Writes rows to the writer, returning the number of rows written and any
	// error that occurred.
	//
	// Because columnar operations operate on independent columns of values,
	// writes of rows may not be atomic operations, and could result in some
	// rows being partially written. The method returns the number of rows that
	// were successfully written, but if an error occurs, values of the row(s)
	// that failed to be written may have been partially committed to their
	// columns. For that reason, applications should consider a write error as
	// fatal and assume that they need to discard the state, they cannot retry
	// the write nor recover the underlying file.
	WriteRows([]Row) (int, error)
}

// RowWriterTo writes parquet rows to a writer.
type RowWriterTo interface {
	WriteRowsTo(RowWriter) (int64, error)
}

// RowWriterWithSchema is an extension of the RowWriter interface which
// advertises the schema of rows expected to be passed to WriteRow calls.
type RowWriterWithSchema interface {
	RowWriter
	Schema() *Schema
}

// RowReaderFunc is a function type implementing the RowReader interface.
type RowReaderFunc func([]Row) (int, error)

func ( RowReaderFunc) ( []Row) (int, error) { return () }

// RowWriterFunc is a function type implementing the RowWriter interface.
type RowWriterFunc func([]Row) (int, error)

func ( RowWriterFunc) ( []Row) (int, error) { return () }

// MultiRowWriter constructs a RowWriter which dispatches writes to all the
// writers passed as arguments.
//
// When writing rows, if any of the writers returns an error, the operation is
// aborted and the error returned. If one of the writers did not error, but did
// not write all the rows, the operation is aborted and io.ErrShortWrite is
// returned.
//
// Rows are written sequentially to each writer in the order they are given to
// this function.
func ( ...RowWriter) RowWriter {
	 := &multiRowWriter{writers: make([]RowWriter, len())}
	copy(.writers, )
	return 
}

type multiRowWriter struct{ writers []RowWriter }

func ( *multiRowWriter) ( []Row) (int, error) {
	for ,  := range .writers {
		,  := .WriteRows()
		if  != nil {
			return , 
		}
		if  != len() {
			return , io.ErrShortWrite
		}
	}
	return len(), nil
}

type forwardRowSeeker struct {
	rows  RowReader
	seek  int64
	index int64
}

func ( *forwardRowSeeker) ( []Row) (int, error) {
	for {
		,  := .rows.ReadRows()

		if  > 0 && .index < .seek {
			 := .seek - .index
			.index += int64()
			if  >= int64() {
				continue
			}

			for ,  := 0, int();  < ; ++ {
				[] = append([][:0], []...)
			}

			 -= int()
		}

		return , 
	}
}

func ( *forwardRowSeeker) ( int64) error {
	if  >= .index {
		.seek = 
		return nil
	}
	return fmt.Errorf(
		"SeekToRow: %T does not implement parquet.RowSeeker: cannot seek backward from row %d to %d",
		.rows,
		.index,
		,
	)
}

// CopyRows copies rows from src to dst.
//
// The underlying types of src and dst are tested to determine if they expose
// information about the schema of rows that are read and expected to be
// written. If the schema information are available but do not match, the
// function will attempt to automatically convert the rows from the source
// schema to the destination.
//
// As an optimization, the src argument may implement RowWriterTo to bypass
// the default row copy logic and provide its own. The dst argument may also
// implement RowReaderFrom for the same purpose.
//
// The function returns the number of rows written, or any error encountered
// other than io.EOF.
func ( RowWriter,  RowReader) (int64, error) {
	return copyRows(, , nil)
}

func copyRows( RowWriter,  RowReader,  []Row) ( int64,  error) {
	 := targetSchemaOf()
	 := sourceSchemaOf()

	if  != nil &&  != nil {
		if !nodesAreEqual(, ) {
			,  := Convert(, )
			if  != nil {
				return 0, 
			}
			// The conversion effectively disables a potential optimization
			// if the source reader implemented RowWriterTo. It is a trade off
			// we are making to optimize for safety rather than performance.
			//
			// Entering this code path should not be the common case tho, it is
			// most often used when parquet schemas are evolving, but we expect
			// that the majority of files of an application to be sharing a
			// common schema.
			 = ConvertRowReader(, )
		}
	}

	if ,  := .(RowWriterTo);  {
		return .WriteRowsTo()
	}

	if ,  := .(RowReaderFrom);  {
		return .ReadRowsFrom()
	}

	if len() == 0 {
		 = make([]Row, defaultRowBufferSize)
	}

	defer clearRows()

	for {
		,  := .ReadRows()

		if  > 0 {
			,  := .WriteRows([:])
			if  != nil {
				return , 
			}

			 += int64()
		}

		if  != nil {
			if errors.Is(, io.EOF) {
				 = nil
			}
			return , 
		}

		if  == 0 {
			return , io.ErrNoProgress
		}
	}
}

func makeRows( int) []Row {
	 := make([]Value, )
	 := make([]Row, )
	for  := range  {
		[] = [ :  : +1]
	}
	return 
}

func clearRows( []Row) {
	for ,  := range  {
		clearValues()
		[] = [:0]
	}
}

func sourceSchemaOf( RowReader) *Schema {
	if ,  := .(RowReaderWithSchema);  {
		return .Schema()
	}
	return nil
}

func targetSchemaOf( RowWriter) *Schema {
	if ,  := .(RowWriterWithSchema);  {
		return .Schema()
	}
	return nil
}

// =============================================================================
// Functions returning closures are marked with "go:noinline" below to prevent
// losing naming information of the closure in stack traces.
//
// Because some of the functions are very short (simply return a closure), the
// compiler inlines when at their call site, which result in the closure being
// named something like parquet.deconstructFuncOf.func2 instead of the original
// parquet.deconstructFuncOfLeaf.func1; the latter being much more meaningful
// when reading CPU or memory profiles.
// =============================================================================

type levels struct {
	repetitionDepth byte
	repetitionLevel byte
	definitionLevel byte
}

// deconstructFunc accepts a row, the current levels, the value to deserialize
// the current column onto, and returns the row minus the deserialied value(s)
// It recurses until it hits a leaf node, then deserializes that value
// individually as the base case.
type deconstructFunc func([][]Value, levels, reflect.Value)

func deconstructFuncOf( int16,  Node) (int16, deconstructFunc) {
	switch {
	case .Optional():
		return deconstructFuncOfOptional(, )
	case .Repeated():
		return deconstructFuncOfRepeated(, )
	case isList():
		return deconstructFuncOfList(, )
	case isMap():
		return deconstructFuncOfMap(, )
	default:
		return deconstructFuncOfRequired(, )
	}
}

//go:noinline
func deconstructFuncOfOptional( int16,  Node) (int16, deconstructFunc) {
	,  := deconstructFuncOf(, Required())
	return , func( [][]Value,  levels,  reflect.Value) {
		if .IsValid() {
			if .IsZero() {
				 = reflect.Value{}
			} else {
				if .Kind() == reflect.Ptr {
					 = .Elem()
				}
				.definitionLevel++
			}
		}
		(, , )
	}
}

//go:noinline
func deconstructFuncOfRepeated( int16,  Node) (int16, deconstructFunc) {
	,  := deconstructFuncOf(, Required())
	return , func( [][]Value,  levels,  reflect.Value) {
		if .Kind() == reflect.Interface {
			 = .Elem()
		}

		if !.IsValid() || .Len() == 0 {
			(, , reflect.Value{})
			return
		}

		.repetitionDepth++
		.definitionLevel++

		for ,  := 0, .Len();  < ; ++ {
			(, , .Index())
			.repetitionLevel = .repetitionDepth
		}
	}
}

func deconstructFuncOfRequired( int16,  Node) (int16, deconstructFunc) {
	switch {
	case .Leaf():
		return deconstructFuncOfLeaf(, )
	default:
		return deconstructFuncOfGroup(, )
	}
}

func deconstructFuncOfList( int16,  Node) (int16, deconstructFunc) {
	return deconstructFuncOf(, Repeated(listElementOf()))
}

//go:noinline
func deconstructFuncOfMap( int16,  Node) (int16, deconstructFunc) {
	 := mapKeyValueOf()
	 := .GoType()
	 := .Elem()
	 := .Field(0).Type
	 := .Field(1).Type
	,  := deconstructFuncOf(, schemaOf())
	return , func( [][]Value,  levels,  reflect.Value) {
		if !.IsValid() || .Len() == 0 {
			(, , reflect.Value{})
			return
		}

		.repetitionDepth++
		.definitionLevel++

		 := reflect.New().Elem()
		 := .Field(0)
		 := .Field(1)

		for ,  := range .MapKeys() {
			.Set(.Convert())
			.Set(.MapIndex().Convert())
			(, , )
			.repetitionLevel = .repetitionDepth
		}
	}
}

//go:noinline
func deconstructFuncOfGroup( int16,  Node) (int16, deconstructFunc) {
	 := .Fields()
	 := make([]deconstructFunc, len())
	for ,  := range  {
		, [] = deconstructFuncOf(, )
	}
	return , func( [][]Value,  levels,  reflect.Value) {
		if .IsValid() {
			for ,  := range  {
				(, , [].Value())
			}
		} else {
			for ,  := range  {
				(, , )
			}
		}
	}
}

//go:noinline
func deconstructFuncOfLeaf( int16,  Node) (int16, deconstructFunc) {
	if  > MaxColumnIndex {
		panic("row cannot be deconstructed because it has more than 127 columns")
	}
	 := .Type()
	 := .Kind()
	 := .LogicalType()
	 := ^
	return  + 1, func( [][]Value,  levels,  reflect.Value) {
		 := Value{}

		if .IsValid() {
			 = makeValue(, , )
		}

		.repetitionLevel = .repetitionLevel
		.definitionLevel = .definitionLevel
		.columnIndex = 

		[] = append([], )
	}
}

// "reconstructX" turns a Go value into a Go representation of a Parquet series
// of values

type reconstructFunc func(reflect.Value, levels, [][]Value) error

func reconstructFuncOf( int16,  Node) (int16, reconstructFunc) {
	switch {
	case .Optional():
		return reconstructFuncOfOptional(, )
	case .Repeated():
		return reconstructFuncOfRepeated(, )
	case isList():
		return reconstructFuncOfList(, )
	case isMap():
		return reconstructFuncOfMap(, )
	default:
		return reconstructFuncOfRequired(, )
	}
}

//go:noinline
func reconstructFuncOfOptional( int16,  Node) (int16, reconstructFunc) {
	// We convert the optional func to required so that we eventually reach the
	// leaf base-case.  We're still using the heuristics of optional in the
	// returned closure (see levels.definitionLevel++), but we don't actually do
	// deserialization here, that happens in the leaf function, hence this line.
	,  := reconstructFuncOf(, Required())

	return , func( reflect.Value,  levels,  [][]Value) error {
		.definitionLevel++

		if [0][0].definitionLevel < .definitionLevel {
			.Set(reflect.Zero(.Type()))
			return nil
		}

		if .Kind() == reflect.Ptr {
			if .IsNil() {
				.Set(reflect.New(.Type().Elem()))
			}
			 = .Elem()
		}

		return (, , )
	}
}

func setMakeSlice( reflect.Value,  int) reflect.Value {
	 := .Type()
	if .Kind() == reflect.Interface {
		 = reflect.TypeOf(([]interface{})(nil))
	}
	 := reflect.MakeSlice(, , )
	.Set()
	return 
}

//go:noinline
func reconstructFuncOfRepeated( int16,  Node) (int16, reconstructFunc) {
	,  := reconstructFuncOf(, Required())
	return , func( reflect.Value,  levels,  [][]Value) error {
		.repetitionDepth++
		.definitionLevel++

		if [0][0].definitionLevel < .definitionLevel {
			setMakeSlice(, 0)
			return nil
		}

		 := make([][]Value, len())
		 := [0]
		 := 0

		for ,  := range  {
			[] = [0:0:len()]
		}

		for  := 0;  < len(); {
			++
			++

			for  < len() && [].repetitionLevel > .repetitionDepth {
				++
			}
		}

		 = setMakeSlice(, )

		for  := 0;  < ; ++ {
			for ,  := range  {
				 = [:cap()]
				if len() == 0 {
					continue
				}

				 := 1
				for  < len() && [].repetitionLevel > .repetitionDepth {
					++
				}

				[] = [:]
			}

			if  := (.Index(), , );  != nil {
				return 
			}

			for ,  := range  {
				[] = [len():len():cap()]
			}

			.repetitionLevel = .repetitionDepth
		}

		return nil
	}
}

func reconstructFuncOfRequired( int16,  Node) (int16, reconstructFunc) {
	switch {
	case .Leaf():
		return reconstructFuncOfLeaf(, )
	default:
		return reconstructFuncOfGroup(, )
	}
}

func reconstructFuncOfList( int16,  Node) (int16, reconstructFunc) {
	return reconstructFuncOf(, Repeated(listElementOf()))
}

//go:noinline
func reconstructFuncOfMap( int16,  Node) (int16, reconstructFunc) {
	 := mapKeyValueOf()
	 := .GoType()
	 := .Elem()
	 := reflect.Zero()
	,  := reconstructFuncOf(, schemaOf())
	return , func( reflect.Value,  levels,  [][]Value) error {
		.repetitionDepth++
		.definitionLevel++

		if [0][0].definitionLevel < .definitionLevel {
			.Set(reflect.MakeMap(.Type()))
			return nil
		}

		 := make([][]Value, len())
		 := [0]
		 := .Type()
		if .Kind() == reflect.Interface {
			 = reflect.TypeOf((map[string]any)(nil))
		}
		 := .Key()
		 := .Elem()
		 := 0

		for ,  := range  {
			[] = [0:0:len()]
		}

		for  := 0;  < len(); {
			++
			++

			for  < len() && [].repetitionLevel > .repetitionDepth {
				++
			}
		}

		if .IsNil() {
			 := reflect.MakeMapWithSize(, )
			.Set()
			 =  // track map instead of interface{} for read[any]()
		}

		 := reflect.New().Elem()
		for  := 0;  < ; ++ {
			for ,  := range  {
				 = [:cap()]
				 := 1

				for  < len() && [].repetitionLevel > .repetitionDepth {
					++
				}

				[] = [:]
			}

			if  := (, , );  != nil {
				return 
			}

			for ,  := range  {
				[] = [len():len():cap()]
			}

			.SetMapIndex(.Field(0).Convert(), .Field(1).Convert())
			.Set()
			.repetitionLevel = .repetitionDepth
		}

		return nil
	}
}

//go:noinline
func reconstructFuncOfGroup( int16,  Node) (int16, reconstructFunc) {
	 := .Fields()
	 := make([]reconstructFunc, len())
	 := make([]int16, len())
	 := 

	for ,  := range  {
		, [] = reconstructFuncOf(, )
		[] =  - 
	}

	return , func( reflect.Value,  levels,  [][]Value) error {
		if .Kind() == reflect.Interface {
			.Set(reflect.MakeMap(reflect.TypeOf((map[string]interface{})(nil))))
			 = .Elem()
		}

		if .Kind() == reflect.Map {
			 := .Type().Elem()
			 := reflect.New(reflect.TypeOf("")).Elem()
			 := reflect.New().Elem()
			 := reflect.Zero()

			if .Len() > 0 {
				.Set(reflect.MakeMap(.Type()))
			}

			 := int16(0)

			for ,  := range  {
				.SetString([].Name())
				 := []
				 := (, , [::])
				if  != nil {
					return fmt.Errorf("%s → %w", , )
				}
				 = 
				.SetMapIndex(, )
				.Set()
			}
		} else {
			 := int16(0)

			for ,  := range  {
				 := []
				 := ([].Value(), , [::])
				if  != nil {
					return fmt.Errorf("%s → %w", [].Name(), )
				}
				 = 
			}
		}

		return nil
	}
}

//go:noinline
func reconstructFuncOfLeaf( int16,  Node) (int16, reconstructFunc) {
	 := .Type()
	return  + 1, func( reflect.Value,  levels,  [][]Value) error {
		 := [0]
		if len() == 0 {
			return fmt.Errorf("no values found in parquet row for column %d", )
		}
		return .AssignValue(, [0])
	}
}