package parquet

import (
	
	
	
	
)

// GenericReader is similar to a Reader but uses a type parameter to define the
// Go type representing the schema of rows being read.
//
// See GenericWriter for details about the benefits over the classic Reader API.
type GenericReader[ any] struct {
	base Reader
	read readFunc[]
}

// NewGenericReader is like NewReader but returns GenericReader[T] suited to write
// rows of Go type T.
//
// The type parameter T should be a map, struct, or any. Any other types will
// cause a panic at runtime. Type checking is a lot more effective when the
// generic parameter is a struct type, using map and interface types is somewhat
// similar to using a Writer.
//
// If the option list may explicitly declare a schema, it must be compatible
// with the schema generated from T.
func [ any]( io.ReaderAt,  ...ReaderOption) *GenericReader[] {
	,  := NewReaderConfig(...)
	if  != nil {
		panic()
	}

	,  := openFile()
	if  != nil {
		panic()
	}

	 := fileRowGroupOf()

	 := typeOf[]()
	if .Schema == nil {
		if  == nil {
			.Schema = .Schema()
		} else {
			.Schema = schemaOf(dereference())
		}
	}

	 := &GenericReader[]{
		base: Reader{
			file: reader{
				schema:   .Schema,
				rowGroup: ,
			},
		},
	}

	if !nodesAreEqual(.Schema, .schema) {
		.base.file.rowGroup = convertRowGroupTo(.base.file.rowGroup, .Schema)
	}

	.base.read.init(.base.file.schema, .base.file.rowGroup)
	.read = readFuncOf[](, .base.file.schema)
	return 
}

func [ any]( RowGroup,  ...ReaderOption) *GenericReader[] {
	,  := NewReaderConfig(...)
	if  != nil {
		panic()
	}

	 := typeOf[]()
	if .Schema == nil {
		if  == nil {
			.Schema = .Schema()
		} else {
			.Schema = schemaOf(dereference())
		}
	}

	 := &GenericReader[]{
		base: Reader{
			file: reader{
				schema:   .Schema,
				rowGroup: ,
			},
		},
	}

	if !nodesAreEqual(.Schema, .Schema()) {
		.base.file.rowGroup = convertRowGroupTo(.base.file.rowGroup, .Schema)
	}

	.base.read.init(.base.file.schema, .base.file.rowGroup)
	.read = readFuncOf[](, .base.file.schema)
	return 
}

func ( *GenericReader[]) () {
	.base.Reset()
}

// Read reads the next rows from the reader into the given rows slice up to len(rows).
//
// The returned values are safe to reuse across Read calls and do not share
// memory with the reader's underlying page buffers.
//
// The method returns the number of rows read and io.EOF when no more rows
// can be read from the reader.
func ( *GenericReader[]) ( []) (int, error) {
	return .read(, )
}

func ( *GenericReader[]) ( []Row) (int, error) {
	return .base.ReadRows()
}

func ( *GenericReader[]) () *Schema {
	return .base.Schema()
}

func ( *GenericReader[]) () int64 {
	return .base.NumRows()
}

func ( *GenericReader[]) ( int64) error {
	return .base.SeekToRow()
}

func ( *GenericReader[]) () error {
	return .base.Close()
}

// readRows reads the next rows from the reader into the given rows slice up to len(rows).
//
// The returned values are safe to reuse across readRows calls and do not share
// memory with the reader's underlying page buffers.
//
// The method returns the number of rows read and io.EOF when no more rows
// can be read from the reader.
func ( *GenericReader[]) ( []) (int, error) {
	 := len()
	if cap(.base.rowbuf) <  {
		.base.rowbuf = make([]Row, )
	} else {
		.base.rowbuf = .base.rowbuf[:]
	}

	var ,  int
	var  error
	for {
		// ReadRows reads the minimum remaining rows in a column page across all columns
		// of the underlying reader, unless the length of the slice passed to it is smaller.
		// In that case, ReadRows will read the number of rows equal to the length of the
		// given slice argument. We limit that length to never be more than requested
		// because sequential reads can cross page boundaries.
		,  = .base.ReadRows(.base.rowbuf[:-])
		if  > 0 {
			 := .base.Schema()

			for ,  := range .base.rowbuf[:] {
				if  := .Reconstruct(&[+], );  != nil {
					return  + , 
				}
			}
		}
		 += 
		if  == 0 ||  ==  ||  != nil {
			break
		}
	}

	return , 
}

var (
	_ Rows                = (*GenericReader[any])(nil)
	_ RowReaderWithSchema = (*Reader)(nil)

	_ Rows                = (*GenericReader[struct{}])(nil)
	_ RowReaderWithSchema = (*GenericReader[struct{}])(nil)

	_ Rows                = (*GenericReader[map[struct{}]struct{}])(nil)
	_ RowReaderWithSchema = (*GenericReader[map[struct{}]struct{}])(nil)
)

type readFunc[ any] func(*GenericReader[], []) (int, error)

func readFuncOf[ any]( reflect.Type,  *Schema) readFunc[] {
	if  == nil {
		return (*GenericReader[]).readRows
	}
	switch .Kind() {
	case reflect.Interface, reflect.Map:
		return (*GenericReader[]).readRows

	case reflect.Struct:
		return (*GenericReader[]).readRows

	case reflect.Pointer:
		if  := .Elem(); .Kind() == reflect.Struct {
			return (*GenericReader[]).readRows
		}
	}
	panic("cannot create reader for values of type " + .String())
}

// Deprecated: A Reader reads Go values from parquet files.
//
// This example showcases a typical use of parquet readers:
//
//	reader := parquet.NewReader(file)
//	rows := []RowType{}
//	for {
//		row := RowType{}
//		err := reader.Read(&row)
//		if err != nil {
//			if err == io.EOF {
//				break
//			}
//			...
//		}
//		rows = append(rows, row)
//	}
//	if err := reader.Close(); err != nil {
//		...
//	}
//
// For programs building with Go 1.18 or later, the GenericReader[T] type
// supersedes this one.
type Reader struct {
	seen     reflect.Type
	file     reader
	read     reader
	rowIndex int64
	rowbuf   []Row
}

// NewReader constructs a parquet reader reading rows from the given
// io.ReaderAt.
//
// In order to read parquet rows, the io.ReaderAt must be converted to a
// parquet.File. If r is already a parquet.File it is used directly; otherwise,
// the io.ReaderAt value is expected to either have a `Size() int64` method or
// implement io.Seeker in order to determine its size.
//
// The function panics if the reader configuration is invalid. Programs that
// cannot guarantee the validity of the options passed to NewReader should
// construct the reader configuration independently prior to calling this
// function:
//
//	config, err := parquet.NewReaderConfig(options...)
//	if err != nil {
//		// handle the configuration error
//		...
//	} else {
//		// this call to create a reader is guaranteed not to panic
//		reader := parquet.NewReader(input, config)
//		...
//	}
func ( io.ReaderAt,  ...ReaderOption) *Reader {
	,  := NewReaderConfig(...)
	if  != nil {
		panic()
	}

	,  := openFile()
	if  != nil {
		panic()
	}

	 := &Reader{
		file: reader{
			schema:   .schema,
			rowGroup: fileRowGroupOf(),
		},
	}

	if .Schema != nil {
		.file.schema = .Schema
		.file.rowGroup = convertRowGroupTo(.file.rowGroup, .Schema)
	}

	.read.init(.file.schema, .file.rowGroup)
	return 
}

func openFile( io.ReaderAt) (*File, error) {
	,  := .(*File)
	if  != nil {
		return , nil
	}
	,  := sizeOf()
	if  != nil {
		return nil, 
	}
	return OpenFile(, )
}

func fileRowGroupOf( *File) RowGroup {
	switch  := .RowGroups(); len() {
	case 0:
		return newEmptyRowGroup(.Schema())
	case 1:
		return [0]
	default:
		// TODO: should we attempt to merge the row groups via MergeRowGroups
		// to preserve the global order of sorting columns within the file?
		return newMultiRowGroup(.config.ReadMode, ...)
	}
}

// NewRowGroupReader constructs a new Reader which reads rows from the RowGroup
// passed as argument.
func ( RowGroup,  ...ReaderOption) *Reader {
	,  := NewReaderConfig(...)
	if  != nil {
		panic()
	}

	if .Schema != nil {
		 = convertRowGroupTo(, .Schema)
	}

	 := &Reader{
		file: reader{
			schema:   .Schema(),
			rowGroup: ,
		},
	}

	.read.init(.file.schema, .file.rowGroup)
	return 
}

func convertRowGroupTo( RowGroup,  *Schema) RowGroup {
	if  := .Schema(); !nodesAreEqual(, ) {
		,  := Convert(, )
		if  != nil {
			// TODO: this looks like something we should not be panicking on,
			// but the current NewReader API does not offer a mechanism to
			// report errors.
			panic()
		}
		 = ConvertRowGroup(, )
	}
	return 
}

func sizeOf( io.ReaderAt) (int64, error) {
	switch f := .(type) {
	case interface{ () int64 }:
		return .(), nil
	case io.Seeker:
		,  := .Seek(0, io.SeekCurrent)
		if  != nil {
			return 0, 
		}
		,  := .Seek(0, io.SeekEnd)
		if  != nil {
			return 0, 
		}
		_,  = .Seek(, io.SeekStart)
		return , 
	default:
		return 0, fmt.Errorf("cannot determine length of %T", )
	}
}

// Reset repositions the reader at the beginning of the underlying parquet file.
func ( *Reader) () {
	.file.Reset()
	.read.Reset()
	.rowIndex = 0
	clearRows(.rowbuf)
}

// Read reads the next row from r. The type of the row must match the schema
// of the underlying parquet file or an error will be returned.
//
// The method returns io.EOF when no more rows can be read from r.
func ( *Reader) ( interface{}) error {
	if  := dereference(reflect.TypeOf()); .Kind() == reflect.Struct {
		if .seen !=  {
			if  := .updateReadSchema();  != nil {
				return fmt.Errorf("cannot read parquet row into go value of type %T: %w", , )
			}
		}
	}

	if  := .read.SeekToRow(.rowIndex);  != nil {
		if errors.Is(, io.ErrClosedPipe) {
			return io.EOF
		}
		return fmt.Errorf("seeking reader to row %d: %w", .rowIndex, )
	}

	if cap(.rowbuf) == 0 {
		.rowbuf = make([]Row, 1)
	} else {
		.rowbuf = .rowbuf[:1]
	}

	,  := .read.ReadRows(.rowbuf[:])
	if  == 0 {
		return 
	}

	.rowIndex++
	return .read.schema.Reconstruct(, .rowbuf[0])
}

func ( *Reader) ( reflect.Type) error {
	 := schemaOf()

	if nodesAreEqual(, .file.schema) {
		.read.init(, .file.rowGroup)
	} else {
		,  := Convert(, .file.schema)
		if  != nil {
			return 
		}
		.read.init(, ConvertRowGroup(.file.rowGroup, ))
	}

	.seen = 
	return nil
}

// ReadRows reads the next rows from r into the given Row buffer.
//
// The returned values are laid out in the order expected by the
// parquet.(*Schema).Reconstruct method.
//
// The method returns io.EOF when no more rows can be read from r.
func ( *Reader) ( []Row) (int, error) {
	if  := .file.SeekToRow(.rowIndex);  != nil {
		return 0, 
	}
	,  := .file.ReadRows()
	.rowIndex += int64()
	return , 
}

// Schema returns the schema of rows read by r.
func ( *Reader) () *Schema { return .file.schema }

// NumRows returns the number of rows that can be read from r.
func ( *Reader) () int64 { return .file.rowGroup.NumRows() }

// SeekToRow positions r at the given row index.
func ( *Reader) ( int64) error {
	if  := .file.SeekToRow();  != nil {
		return 
	}
	.rowIndex = 
	return nil
}

// Close closes the reader, preventing more rows from being read.
func ( *Reader) () error {
	if  := .read.Close();  != nil {
		return 
	}
	if  := .file.Close();  != nil {
		return 
	}
	return nil
}

// reader is a subtype used in the implementation of Reader to support the two
// use cases of either reading rows calling the ReadRow method (where full rows
// are read from the underlying parquet file), or calling the Read method to
// read rows into Go values, potentially doing partial reads on a subset of the
// columns due to using a converted row group view.
type reader struct {
	schema   *Schema
	rowGroup RowGroup
	rows     Rows
	rowIndex int64
}

func ( *reader) ( *Schema,  RowGroup) {
	.schema = 
	.rowGroup = 
	.Reset()
}

func ( *reader) () {
	.rowIndex = 0

	if ,  := .rows.(interface{ () });  {
		// This optimization works for the common case where the underlying type
		// of the Rows instance is rowGroupRows, which should be true in most
		// cases since even external implementations of the RowGroup interface
		// can construct values of this type via the NewRowGroupRowReader
		// function.
		//
		// Foreign implementations of the Rows interface may also define a Reset
		// method in order to participate in this optimization.
		.()
		return
	}

	if .rows != nil {
		.rows.Close()
		.rows = nil
	}
}

func ( *reader) ( []Row) (int, error) {
	if .rowGroup == nil {
		return 0, io.EOF
	}
	if .rows == nil {
		.rows = .rowGroup.Rows()
		if .rowIndex > 0 {
			if  := .rows.SeekToRow(.rowIndex);  != nil {
				return 0, 
			}
		}
	}
	,  := .rows.ReadRows()
	.rowIndex += int64()
	return , 
}

func ( *reader) ( int64) error {
	if .rowGroup == nil {
		return io.ErrClosedPipe
	}
	if  != .rowIndex {
		if .rows != nil {
			if  := .rows.SeekToRow();  != nil {
				return 
			}
		}
		.rowIndex = 
	}
	return nil
}

func ( *reader) () ( error) {
	.rowGroup = nil
	if .rows != nil {
		 = .rows.Close()
	}
	return 
}

var (
	_ Rows                = (*Reader)(nil)
	_ RowReaderWithSchema = (*Reader)(nil)

	_ RowReader = (*reader)(nil)
	_ RowSeeker = (*reader)(nil)
)