package parquet

import (
	
	
	
	
	
	
	
	
	

	
	
)

const (
	defaultDictBufferSize = 8192
	defaultReadBufferSize = 4096
)

// File represents a parquet file. The layout of a Parquet file can be found
// here: https://github.com/apache/parquet-format#file-format
type File struct {
	metadata      format.FileMetaData
	protocol      thrift.CompactProtocol
	reader        io.ReaderAt
	size          int64
	schema        *Schema
	root          *Column
	columnIndexes []format.ColumnIndex
	offsetIndexes []format.OffsetIndex
	rowGroups     []RowGroup
	config        *FileConfig
}

// OpenFile opens a parquet file and reads the content between offset 0 and the given
// size in r.
//
// Only the parquet magic bytes and footer are read, column chunks and other
// parts of the file are left untouched; this means that successfully opening
// a file does not validate that the pages have valid checksums.
func ( io.ReaderAt,  int64,  ...FileOption) (*File, error) {
	 := make([]byte, 8)
	,  := NewFileConfig(...)
	if  != nil {
		return nil, 
	}
	 := &File{reader: , size: , config: }

	if ,  := readAt(, [:4], 0);  != nil {
		return nil, fmt.Errorf("reading magic header of parquet file: %w", )
	}
	if string([:4]) != "PAR1" {
		return nil, fmt.Errorf("invalid magic header of parquet file: %q", [:4])
	}

	if ,  := .reader.(interface{ (,  int64) });  {
		.(-8, 8)
	}
	if ,  := .ReadAt([:8], -8);  != 8 {
		return nil, fmt.Errorf("reading magic footer of parquet file: %w", )
	}
	if string([4:8]) != "PAR1" {
		return nil, fmt.Errorf("invalid magic footer of parquet file: %q", [4:8])
	}

	 := int64(binary.LittleEndian.Uint32([:4]))
	 := make([]byte, )

	if ,  := .reader.(interface{ (,  int64) });  {
		.(-(+8), )
	}
	if ,  := .readAt(, -(+8));  != nil {
		return nil, fmt.Errorf("reading footer of parquet file: %w", )
	}
	if  := thrift.Unmarshal(&.protocol, , &.metadata);  != nil {
		return nil, fmt.Errorf("reading parquet file metadata: %w", )
	}
	if len(.metadata.Schema) == 0 {
		return nil, ErrMissingRootColumn
	}

	if !.SkipPageIndex {
		if .columnIndexes, .offsetIndexes,  = .ReadPageIndex();  != nil {
			return nil, fmt.Errorf("reading page index of parquet file: %w", )
		}
	}

	if .root,  = openColumns();  != nil {
		return nil, fmt.Errorf("opening columns of parquet file: %w", )
	}

	var  *Schema
	if .Schema != nil {
		 = .Schema
	} else {
		 = NewSchema(.root.Name(), .root)
	}
	 := make([]*Column, 0, numLeafColumnsOf(.root))
	.schema = 
	.root.forEachLeaf(func( *Column) {  = append(, ) })

	 := make([]fileRowGroup, len(.metadata.RowGroups))
	for  := range  {
		[].init(, , , &.metadata.RowGroups[])
	}
	.rowGroups = make([]RowGroup, len())
	for  := range  {
		.rowGroups[] = &[]
	}

	if !.SkipBloomFilters {
		 := io.NewSectionReader(, 0, )
		,  := getBufioReader(, .ReadBufferSize)
		defer putBufioReader(, )

		 := format.BloomFilterHeader{}
		 := thrift.CompactProtocol{}
		 := thrift.NewDecoder(.NewReader())

		for  := range  {
			 := &[]

			for  := range .columns {
				 := .columns[].(*fileColumnChunk)

				if  := .chunk.MetaData.BloomFilterOffset;  > 0 {
					.Seek(, io.SeekStart)
					.Reset()

					 = format.BloomFilterHeader{}
					if  := .Decode(&);  != nil {
						return nil, fmt.Errorf("decoding bloom filter header: %w", )
					}

					, _ = .Seek(0, io.SeekCurrent)
					 -= int64(.Buffered())

					if ,  := .(interface{ (,  int64) });  {
						 := .chunk.MetaData.BloomFilterOffset
						 := ( - ) + int64(.NumBytes)
						.(, )
					}

					.bloomFilter = newBloomFilter(, , &)
				}
			}
		}
	}

	sortKeyValueMetadata(.metadata.KeyValueMetadata)
	return , nil
}

// ReadPageIndex reads the page index section of the parquet file f.
//
// If the file did not contain a page index, the method returns two empty slices
// and a nil error.
//
// Only leaf columns have indexes, the returned indexes are arranged using the
// following layout:
//
//	------------------
//	| col 0: chunk 0 |
//	------------------
//	| col 1: chunk 0 |
//	------------------
//	| ...            |
//	------------------
//	| col 0: chunk 1 |
//	------------------
//	| col 1: chunk 1 |
//	------------------
//	| ...            |
//	------------------
//
// This method is useful in combination with the SkipPageIndex option to delay
// reading the page index section until after the file was opened. Note that in
// this case the page index is not cached within the file, programs are expected
// to make use of independently from the parquet package.
func ( *File) () ([]format.ColumnIndex, []format.OffsetIndex, error) {
	if len(.metadata.RowGroups) == 0 {
		return nil, nil, nil
	}

	 := .metadata.RowGroups[0].Columns[0].ColumnIndexOffset
	 := .metadata.RowGroups[0].Columns[0].OffsetIndexOffset
	 := int64(0)
	 := int64(0)

	 := func( func(int, int, *format.ColumnChunk) error) error {
		for  := range .metadata.RowGroups {
			for  := range .metadata.RowGroups[].Columns {
				 := &.metadata.RowGroups[].Columns[]
				if  := (, , );  != nil {
					return 
				}
			}
		}
		return nil
	}

	(func(,  int,  *format.ColumnChunk) error {
		 += int64(.ColumnIndexLength)
		 += int64(.OffsetIndexLength)
		return nil
	})

	if  == 0 &&  == 0 {
		return nil, nil, nil
	}

	 := len(.metadata.RowGroups)
	 := len(.metadata.RowGroups[0].Columns)
	 :=  * 

	 := make([]format.ColumnIndex, )
	 := make([]format.OffsetIndex, )
	 := make([]byte, max(int(), int()))

	if  > 0 {
		 := [:]

		if ,  := .reader.(interface{ (,  int64) });  {
			.(, )
		}
		if ,  := .readAt(, );  != nil {
			return nil, nil, fmt.Errorf("reading %d bytes column index at offset %d: %w", , , )
		}

		 := (func(,  int,  *format.ColumnChunk) error {
			// Some parquet files are missing the column index on some columns.
			//
			// An example of this file is testdata/alltypes_tiny_pages_plain.parquet
			// which was added in https://github.com/apache/parquet-testing/pull/24.
			if .ColumnIndexOffset > 0 {
				 := .ColumnIndexOffset - 
				 := int64(.ColumnIndexLength)
				 := [ : +]
				if  := thrift.Unmarshal(&.protocol, , &[(*)+]);  != nil {
					return fmt.Errorf("decoding column index: rowGroup=%d columnChunk=%d/%d: %w", , , , )
				}
			}
			return nil
		})
		if  != nil {
			return nil, nil, 
		}
	}

	if  > 0 {
		 := [:]

		if ,  := .reader.(interface{ (,  int64) });  {
			.(, )
		}
		if ,  := .readAt(, );  != nil {
			return nil, nil, fmt.Errorf("reading %d bytes offset index at offset %d: %w", , , )
		}

		 := (func(,  int,  *format.ColumnChunk) error {
			if .OffsetIndexOffset > 0 {
				 := .OffsetIndexOffset - 
				 := int64(.OffsetIndexLength)
				 := [ : +]
				if  := thrift.Unmarshal(&.protocol, , &[(*)+]);  != nil {
					return fmt.Errorf("decoding column index: rowGroup=%d columnChunk=%d/%d: %w", , , , )
				}
			}
			return nil
		})
		if  != nil {
			return nil, nil, 
		}
	}

	return , , nil
}

// NumRows returns the number of rows in the file.
func ( *File) () int64 { return .metadata.NumRows }

// RowGroups returns the list of row groups in the file.
func ( *File) () []RowGroup { return .rowGroups }

// Root returns the root column of f.
func ( *File) () *Column { return .root }

// Schema returns the schema of f.
func ( *File) () *Schema { return .schema }

// Metadata returns the metadata of f.
func ( *File) () *format.FileMetaData { return &.metadata }

// Size returns the size of f (in bytes).
func ( *File) () int64 { return .size }

// ReadAt reads bytes into b from f at the given offset.
//
// The method satisfies the io.ReaderAt interface.
func ( *File) ( []byte,  int64) (int, error) {
	if  < 0 ||  >= .size {
		return 0, io.EOF
	}

	if  := .size - ;  < int64(len()) {
		,  := .readAt([:], )
		if  == nil {
			 = io.EOF
		}
		return , 
	}

	return .readAt(, )
}

// ColumnIndexes returns the page index of the parquet file f.
//
// If the file did not contain a column index, the method returns an empty slice
// and nil error.
func ( *File) () []format.ColumnIndex { return .columnIndexes }

// OffsetIndexes returns the page index of the parquet file f.
//
// If the file did not contain an offset index, the method returns an empty
// slice and nil error.
func ( *File) () []format.OffsetIndex { return .offsetIndexes }

// Lookup returns the value associated with the given key in the file key/value
// metadata.
//
// The ok boolean will be true if the key was found, false otherwise.
func ( *File) ( string) ( string,  bool) {
	return lookupKeyValueMetadata(.metadata.KeyValueMetadata, )
}

func ( *File) () bool {
	return .columnIndexes != nil && .offsetIndexes != nil
}

var _ io.ReaderAt = (*File)(nil)

func sortKeyValueMetadata( []format.KeyValue) {
	sort.Slice(, func(,  int) bool {
		switch {
		case [].Key < [].Key:
			return true
		case [].Key > [].Key:
			return false
		default:
			return [].Value < [].Value
		}
	})
}

func lookupKeyValueMetadata( []format.KeyValue,  string) ( string,  bool) {
	 := sort.Search(len(), func( int) bool {
		return [].Key >= 
	})
	if  == len() || [].Key !=  {
		return "", false
	}
	return [].Value, true
}

type fileRowGroup struct {
	schema   *Schema
	rowGroup *format.RowGroup
	columns  []ColumnChunk
	sorting  []SortingColumn
	config   *FileConfig
}

func ( *fileRowGroup) ( *File,  *Schema,  []*Column,  *format.RowGroup) {
	.schema = 
	.rowGroup = 
	.config = .config
	.columns = make([]ColumnChunk, len(.Columns))
	.sorting = make([]SortingColumn, len(.SortingColumns))
	 := make([]fileColumnChunk, len(.Columns))

	for  := range .columns {
		[] = fileColumnChunk{
			file:     ,
			column:   [],
			rowGroup: ,
			chunk:    &.Columns[],
		}

		if .hasIndexes() {
			 := (int(.Ordinal) * len()) + 
			[].columnIndex.Store(&.columnIndexes[])
			[].offsetIndex.Store(&.offsetIndexes[])
		}

		.columns[] = &[]
	}

	for  := range .sorting {
		.sorting[] = &fileSortingColumn{
			column:     [.SortingColumns[].ColumnIdx],
			descending: .SortingColumns[].Descending,
			nullsFirst: .SortingColumns[].NullsFirst,
		}
	}
}

func ( *fileRowGroup) () *Schema                 { return .schema }
func ( *fileRowGroup) () int64                  { return .rowGroup.NumRows }
func ( *fileRowGroup) () []ColumnChunk     { return .columns }
func ( *fileRowGroup) () []SortingColumn { return .sorting }
func ( *fileRowGroup) () Rows                      { return newRowGroupRows(, .config.ReadMode) }

type fileSortingColumn struct {
	column     *Column
	descending bool
	nullsFirst bool
}

func ( *fileSortingColumn) () []string   { return .column.Path() }
func ( *fileSortingColumn) () bool { return .descending }
func ( *fileSortingColumn) () bool { return .nullsFirst }
func ( *fileSortingColumn) () string {
	 := new(strings.Builder)
	if .nullsFirst {
		.WriteString("nulls_first+")
	}
	if .descending {
		.WriteString("descending(")
	} else {
		.WriteString("ascending(")
	}
	.WriteString(columnPath(.Path()).String())
	.WriteString(")")
	return .String()
}

type fileColumnChunk struct {
	file        *File
	column      *Column
	bloomFilter *bloomFilter
	rowGroup    *format.RowGroup
	columnIndex atomic.Pointer[format.ColumnIndex]
	offsetIndex atomic.Pointer[format.OffsetIndex]
	chunk       *format.ColumnChunk
}

func ( *fileColumnChunk) () Type {
	return .column.Type()
}

func ( *fileColumnChunk) () int {
	return int(.column.Index())
}

func ( *fileColumnChunk) () Pages {
	 := new(filePages)
	.init()
	return 
}

func ( *fileColumnChunk) () (ColumnIndex, error) {
	,  := .readColumnIndex()
	if  != nil {
		return nil, 
	}
	if  == nil || .chunk.ColumnIndexOffset == 0 {
		return nil, ErrMissingColumnIndex
	}
	return fileColumnIndex{}, nil
}

func ( *fileColumnChunk) () (OffsetIndex, error) {
	,  := .readOffsetIndex()
	if  != nil {
		return nil, 
	}
	if  == nil || .chunk.OffsetIndexOffset == 0 {
		return nil, ErrMissingOffsetIndex
	}
	return (*fileOffsetIndex)(), nil
}

func ( *fileColumnChunk) () BloomFilter {
	if .bloomFilter == nil {
		return nil
	}
	return .bloomFilter
}

func ( *fileColumnChunk) () int64 {
	return .chunk.MetaData.NumValues
}

func ( *fileColumnChunk) () (*format.ColumnIndex, error) {
	if  := .columnIndex.Load();  != nil {
		return , nil
	}
	 := .file.metadata.RowGroups[.rowGroup.Ordinal].Columns[.Column()]
	,  := .ColumnIndexOffset, .ColumnIndexLength
	if  == 0 {
		return nil, nil
	}

	 := make([]byte, int())
	var  format.ColumnIndex
	if ,  := readAt(.file.reader, , );  != nil {
		return nil, fmt.Errorf("read %d bytes column index at offset %d: %w", , , )
	}
	if  := thrift.Unmarshal(&.file.protocol, , &);  != nil {
		return nil, fmt.Errorf("decode column index: rowGroup=%d columnChunk=%d/%d: %w", .rowGroup.Ordinal, .Column(), len(.rowGroup.Columns), )
	}
	 := &
	// We do a CAS (and Load on CAS failure) instead of a simple Store for
	// the nice property that concurrent calling goroutines will only ever
	// observe a single pointer value for the result.
	if !.columnIndex.CompareAndSwap(nil, ) {
		// another goroutine populated it since we last read the pointer
		return .columnIndex.Load(), nil
	}
	return , nil
}

func ( *fileColumnChunk) () (*format.OffsetIndex, error) {
	if  := .offsetIndex.Load();  != nil {
		return , nil
	}
	 := .file.metadata.RowGroups[.rowGroup.Ordinal].Columns[.Column()]
	,  := .OffsetIndexOffset, .OffsetIndexLength
	if  == 0 {
		return nil, nil
	}

	 := make([]byte, int())
	var  format.OffsetIndex
	if ,  := readAt(.file.reader, , );  != nil {
		return nil, fmt.Errorf("read %d bytes offset index at offset %d: %w", , , )
	}
	if  := thrift.Unmarshal(&.file.protocol, , &);  != nil {
		return nil, fmt.Errorf("decode offset index: rowGroup=%d columnChunk=%d/%d: %w", .rowGroup.Ordinal, .Column(), len(.rowGroup.Columns), )
	}
	 := &
	if !.offsetIndex.CompareAndSwap(nil, ) {
		// another goroutine populated it since we last read the pointer
		return .offsetIndex.Load(), nil
	}
	return , nil
}

type filePages struct {
	chunk    *fileColumnChunk
	rbuf     *bufio.Reader
	rbufpool *sync.Pool
	section  io.SectionReader

	protocol thrift.CompactProtocol
	decoder  thrift.Decoder

	baseOffset int64
	dataOffset int64
	dictOffset int64
	index      int
	skip       int64
	dictionary Dictionary

	bufferSize int
}

func ( *filePages) ( *fileColumnChunk) {
	.chunk = 
	.baseOffset = .chunk.MetaData.DataPageOffset
	.dataOffset = .baseOffset
	.bufferSize = .file.config.ReadBufferSize

	if .chunk.MetaData.DictionaryPageOffset != 0 {
		.baseOffset = .chunk.MetaData.DictionaryPageOffset
		.dictOffset = .baseOffset
	}

	.section = *io.NewSectionReader(.file, .baseOffset, .chunk.MetaData.TotalCompressedSize)
	.rbuf, .rbufpool = getBufioReader(&.section, .bufferSize)
	.decoder.Reset(.protocol.NewReader(.rbuf))
}

func ( *filePages) () (Page, error) {
	if .chunk == nil {
		return nil, io.EOF
	}

	for {
		// Instantiate a new format.PageHeader for each page.
		//
		// A previous implementation reused page headers to save allocations.
		// https://github.com/segmentio/parquet-go/pull/484
		// The optimization turned out to be less effective than expected,
		// because all the values referenced by pointers in the page header
		// are lost when the header is reset and put back in the pool.
		// https://github.com/parquet-go/parquet-go/pull/11
		//
		// Even after being reset, reusing page headers still produced instability
		// issues.
		// https://github.com/parquet-go/parquet-go/issues/70
		 := new(format.PageHeader)
		if  := .decoder.Decode();  != nil {
			return nil, 
		}
		,  := .readPage(, .rbuf)
		if  != nil {
			return nil, 
		}

		var  Page
		switch .Type {
		case format.DataPageV2:
			,  = .readDataPageV2(, )
		case format.DataPage:
			,  = .readDataPageV1(, )
		case format.DictionaryPage:
			// Sometimes parquet files do not have the dictionary page offset
			// recorded in the column metadata. We account for this by lazily
			// reading dictionary pages when we encounter them.
			 = .readDictionaryPage(, )
		default:
			 = fmt.Errorf("cannot read values of type %s from page", .Type)
		}

		.unref()

		if  != nil {
			return nil, fmt.Errorf("decoding page %d of column %q: %w", .index, .columnPath(), )
		}

		if  == nil {
			continue
		}

		.index++
		if .skip == 0 {
			return , nil
		}

		// TODO: what about pages that don't embed the number of rows?
		// (data page v1 with no offset index in the column chunk).
		 := .NumRows()

		if  <= .skip {
			Release()
		} else {
			 := .Slice(.skip, )
			Release()
			.skip = 0
			return , nil
		}

		.skip -= 
	}
}

func ( *filePages) () error {
	 := io.NewSectionReader(.chunk.file, .baseOffset, .chunk.chunk.MetaData.TotalCompressedSize)
	,  := getBufioReader(, .bufferSize)
	defer putBufioReader(, )

	 := thrift.NewDecoder(.protocol.NewReader())

	 := new(format.PageHeader)

	if  := .Decode();  != nil {
		return 
	}

	 := buffers.get(int(.CompressedPageSize))
	defer .unref()

	if ,  := io.ReadFull(, .data);  != nil {
		return 
	}

	return .readDictionaryPage(, )
}

func ( *filePages) ( *format.PageHeader,  *buffer) error {
	if .DictionaryPageHeader == nil {
		return ErrMissingPageHeader
	}
	,  := .chunk.column.decodeDictionary(DictionaryPageHeader{.DictionaryPageHeader}, , .UncompressedPageSize)
	if  != nil {
		return 
	}
	.dictionary = 
	return nil
}

func ( *filePages) ( *format.PageHeader,  *buffer) (Page, error) {
	if .DataPageHeader == nil {
		return nil, ErrMissingPageHeader
	}
	if isDictionaryFormat(.DataPageHeader.Encoding) && .dictionary == nil {
		if  := .readDictionary();  != nil {
			return nil, 
		}
	}
	return .chunk.column.decodeDataPageV1(DataPageHeaderV1{.DataPageHeader}, , .dictionary, .UncompressedPageSize)
}

func ( *filePages) ( *format.PageHeader,  *buffer) (Page, error) {
	if .DataPageHeaderV2 == nil {
		return nil, ErrMissingPageHeader
	}
	if isDictionaryFormat(.DataPageHeaderV2.Encoding) && .dictionary == nil {
		// If the program seeked to a row passed the first page, the dictionary
		// page may not have been seen, in which case we have to lazily load it
		// from the beginning of column chunk.
		if  := .readDictionary();  != nil {
			return nil, 
		}
	}
	return .chunk.column.decodeDataPageV2(DataPageHeaderV2{.DataPageHeaderV2}, , .dictionary, .UncompressedPageSize)
}

func ( *filePages) ( *format.PageHeader,  *bufio.Reader) (*buffer, error) {
	 := buffers.get(int(.CompressedPageSize))
	defer .unref()

	if ,  := io.ReadFull(, .data);  != nil {
		return nil, 
	}

	if .CRC != 0 {
		 := uint32(.CRC)
		 := crc32.ChecksumIEEE(.data)

		if  !=  {
			// The parquet specs indicate that corruption errors could be
			// handled gracefully by skipping pages, tho this may not always
			// be practical. Depending on how the pages are consumed,
			// missing rows may cause unpredictable behaviors in algorithms.
			//
			// For now, we assume these errors to be fatal, but we may
			// revisit later and improve error handling to be more resilient
			// to data corruption.
			return nil, fmt.Errorf("crc32 checksum mismatch in page of column %q: want=0x%08X got=0x%08X: %w",
				.columnPath(),
				,
				,
				ErrCorrupted,
			)
		}
	}

	.ref()
	return , nil
}

func ( *filePages) ( int64) ( error) {
	if .chunk == nil {
		return io.ErrClosedPipe
	}
	if  := .chunk.offsetIndex.Load();  == nil {
		_,  = .section.Seek(.dataOffset-.baseOffset, io.SeekStart)
		.skip = 
		.index = 0
		if .dictOffset > 0 {
			.index = 1
		}
	} else {
		 := .PageLocations
		 := sort.Search(len(), func( int) bool {
			return [].FirstRowIndex > 
		}) - 1
		if  < 0 {
			return ErrSeekOutOfRange
		}
		_,  = .section.Seek([].Offset-.baseOffset, io.SeekStart)
		.skip =  - [].FirstRowIndex
		.index = 
	}
	.rbuf.Reset(&.section)
	return 
}

func ( *filePages) () error {
	putBufioReader(.rbuf, .rbufpool)
	.chunk = nil
	.section = io.SectionReader{}
	.rbuf = nil
	.rbufpool = nil
	.baseOffset = 0
	.dataOffset = 0
	.dictOffset = 0
	.index = 0
	.skip = 0
	.dictionary = nil
	return nil
}

func ( *filePages) () columnPath {
	return columnPath(.chunk.column.Path())
}

type putBufioReaderFunc func()

var (
	bufioReaderPoolLock sync.Mutex
	bufioReaderPool     = map[int]*sync.Pool{}
)

func getBufioReader( io.Reader,  int) (*bufio.Reader, *sync.Pool) {
	 := getBufioReaderPool()
	,  := .Get().(*bufio.Reader)
	if  == nil {
		 = bufio.NewReaderSize(, )
	} else {
		.Reset()
	}
	return , 
}

func putBufioReader( *bufio.Reader,  *sync.Pool) {
	if  != nil &&  != nil {
		.Reset(nil)
		.Put()
	}
}

func getBufioReaderPool( int) *sync.Pool {
	bufioReaderPoolLock.Lock()
	defer bufioReaderPoolLock.Unlock()

	if  := bufioReaderPool[];  != nil {
		return 
	}

	 := &sync.Pool{}
	bufioReaderPool[] = 
	return 
}

func ( *File) ( []byte,  int64) (int, error) {
	return readAt(.reader, , )
}

func readAt( io.ReaderAt,  []byte,  int64) ( int,  error) {
	,  = .ReadAt(, )
	if  == len() {
		 = nil
		// p was fully read.There is no further need to check for errors. This
		// operation is a success in principle.
		return
	}
	return
}