package iceberg

import (
	
	
	

	
)

func ( string,  int64,  *Schema,  io.ReaderAt) (ManifestEntry, *Schema, error) {
	, ,  := DataFileFromParquet(, , , )
	if  != nil {
		return nil, nil, 
	}

	return NewManifestEntryV1(EntryStatusADDED, , ), , nil
}

func ( string,  int64,  *Schema,  io.ReaderAt) (DataFile, *Schema, error) {
	,  := parquet.OpenFile(, )
	if  != nil {
		return nil, nil, 
	}
	,  := .Merge(parquetSchemaToIcebergSchema(-1, .Schema()))
	if  != nil {
		return nil, nil, 
	}

	 := NewDataFileV1Builder(
		,
		ParquetFile,
		map[string]any{}, // TODO: At present Parquet writes are assumed to be unpartitioned.
		.NumRows(),
		,
	)

	// Create the upper and lower bounds for each column.
	 := .NumFields()
	,  := make(map[int][]byte, ), make(map[int][]byte, )
	for  := 0;  < ; ++ {
		// Check if this columns exists in the parquet file. If it does add the upper and lower bounds.
		// If it doesn't exist, then the bounds will be nil.
		,  := .Schema().Lookup(.Fields()[].Name)
		if  {
			[] = maxColValue(.ColumnIndex, )
			[] = minColValue(.ColumnIndex, )
		}
	}

	.WithLowerBounds()
	.WithUpperBounds()
	.WithColumnSizes(colSizes())

	// Create the schema.
	return .Build(), , nil
}

func colSizes( *parquet.File) map[int]int64 {
	 := make(map[int]int64, len(.Metadata().RowGroups[0].Columns))
	for ,  := range .Metadata().RowGroups {
		for ,  := range .Columns {
			[] += .MetaData.TotalUncompressedSize
		}
	}
	return 
}

// maxColValue returns the maximum value of a column in a parquet file.
func maxColValue( int,  *parquet.File) []byte {
	var  parquet.Value
	for ,  := range .RowGroups() {
		,  := .ColumnChunks()[].ColumnIndex()
		if  != nil {
			return nil
		}

		for  := 0;  < .NumPages(); ++ {
			 := .MaxValue()
			if .IsNull() {
				 = 
				continue
			}

			if compare(, ) == -1 {
				 = 
			}
		}
	}

	// Create the byte representation of the max value.
	return binarySingleValueSerialize()
}

// https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
func binarySingleValueSerialize( parquet.Value) []byte {
	if .IsNull() {
		return nil
	}

	switch .Kind() {
	case parquet.ByteArray:
		return .Bytes()
	case parquet.FixedLenByteArray:
		return .Bytes()
	case parquet.Double:
		 := make([]byte, 8)
		binary.LittleEndian.PutUint64(, uint64(.Double()))
		return 
	case parquet.Int64:
		 := make([]byte, 8)
		binary.LittleEndian.PutUint64(, uint64(.Int64()))
		return 
	case parquet.Int32:
		 := make([]byte, 4)
		binary.LittleEndian.PutUint32(, uint32(.Int32()))
		return 
	case parquet.Float:
		 := make([]byte, 4)
		binary.LittleEndian.PutUint32(, uint32(.Float()))
		return 
	case parquet.Boolean:
		switch .Boolean() {
		case true:
			return []byte{1}
		default:
			return []byte{0}
		}
	default:
		panic(fmt.Sprintf("unsupported value comparison: %v", .Kind()))
	}
}

// minColValue returns the minimum value of a column in a parquet file.
func minColValue( int,  *parquet.File) []byte {
	var  parquet.Value
	for ,  := range .RowGroups() {
		,  := .ColumnChunks()[].ColumnIndex()
		if  != nil {
			return nil
		}

		for  := 0;  < .NumPages(); ++ {
			 := .MinValue()
			if .IsNull() {
				 = 
				continue
			}

			if compare(, ) == 1 {
				 = 
			}
		}
	}

	// Create the byte representation of the min value.
	return binarySingleValueSerialize()
}

// compares two parquet values. 0 if they are equal, -1 if v1 < v2, 1 if v1 > v2.
func compare(,  parquet.Value) int {
	switch .Kind() {
	case parquet.Int32:
		return parquet.Int32Type.Compare(, )
	case parquet.Int64:
		return parquet.Int64Type.Compare(, )
	case parquet.Float:
		return parquet.FloatType.Compare(, )
	case parquet.Double:
		return parquet.DoubleType.Compare(, )
	case parquet.ByteArray, parquet.FixedLenByteArray:
		return parquet.ByteArrayType.Compare(, )
	case parquet.Boolean:
		return parquet.BooleanType.Compare(, )
	default:
		panic(fmt.Sprintf("unsupported value comparison: %v", .Kind()))
	}
}

func parquetSchemaToIcebergSchema( int,  *parquet.Schema) *Schema {
	 := make([]NestedField, 0, len(.Fields()))
	for ,  := range .Fields() {
		 = append(, NestedField{
			Type:     parquetTypeToIcebergType(.Type()),
			ID:       ,
			Name:     .Name(),
			Required: .Required(),
		})
	}
	return NewSchema(, ...)
}

func parquetTypeToIcebergType( parquet.Type) Type {
	switch .Kind() {
	case parquet.Boolean:
		return BooleanType{}
	case parquet.Int32:
		return Int32Type{}
	case parquet.Int64:
		return Int64Type{}
	case parquet.Float:
		return Float32Type{}
	case parquet.Double:
		return Float64Type{}
	case parquet.ByteArray:
		return BinaryType{}
	default:
		panic(fmt.Sprintf("unsupported parquet type: %v", .Kind()))
	}
}