// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package table

import (
	
	
	
	
	
	

	
	
	
	
	
	
)

const (
	manifestFileExt = ".avro"
	metadataFileExt = ".json"
	metadataDirName = "metadata"
	dataDirName     = "data"
)

type Identifier = []string

type Table interface {
	Identifier() Identifier
	Metadata() Metadata
	MetadataLocation() string
	Bucket() objstore.Bucket
	Schema() *iceberg.Schema
	Spec() iceberg.PartitionSpec
	SortOrder() SortOrder
	Properties() iceberg.Properties
	Location() string
	CurrentSnapshot() *Snapshot
	SnapshotByID(id int64) *Snapshot
	SnapshotByName(name string) *Snapshot
	Schemas() map[int]*iceberg.Schema
	Equals(other Table) bool

	SnapshotWriter(options ...WriterOption) (SnapshotWriter, error)
}

type ReadOnlyTable struct {
	*baseTable
}

func ( *ReadOnlyTable) ( ...WriterOption) (SnapshotWriter, error) {
	return nil, fmt.Errorf("table is read-only")
}

type baseTable struct {
	identifier       Identifier
	metadata         Metadata
	metadataLocation string
	bucket           objstore.Bucket
}

func ( *baseTable) ( Table) bool {
	return slices.Equal(.identifier, .Identifier()) &&
		.metadataLocation == .MetadataLocation() &&
		reflect.DeepEqual(.metadata, .Metadata())
}

func ( *baseTable) () Identifier   { return .identifier }
func ( *baseTable) () Metadata       { return .metadata }
func ( *baseTable) () string { return .metadataLocation }
func ( *baseTable) () objstore.Bucket  { return .bucket }

func ( *baseTable) () *iceberg.Schema              { return .metadata.CurrentSchema() }
func ( *baseTable) () iceberg.PartitionSpec          { return .metadata.PartitionSpec() }
func ( *baseTable) () SortOrder                 { return .metadata.SortOrder() }
func ( *baseTable) () iceberg.Properties       { return .metadata.Properties() }
func ( *baseTable) () string                     { return .metadata.Location() }
func ( *baseTable) () *Snapshot           { return .metadata.CurrentSnapshot() }
func ( *baseTable) ( int64) *Snapshot      { return .metadata.SnapshotByID() }
func ( *baseTable) ( string) *Snapshot { return .metadata.SnapshotByName() }
func ( *baseTable) () map[int]*iceberg.Schema {
	 := make(map[int]*iceberg.Schema)
	for ,  := range .metadata.Schemas() {
		[.ID] = 
	}
	return 
}

func ( Identifier,  Metadata,  string,  objstore.Bucket) Table {
	return &ReadOnlyTable{
		baseTable: &baseTable{
			identifier:       ,
			metadata:         ,
			metadataLocation: ,
			bucket:           ,
		},
	}
}

func ( Identifier,  string,  objstore.Bucket) (Table, error) {
	var  Metadata

	,  := .Get(context.Background(), )
	if  != nil {
		return nil, 
	}
	defer .Close()

	if ,  = ParseMetadata();  != nil {
		return nil, 
	}

	return New(, , , ), nil
}

// SnapshotWriter is an interface for writing a new snapshot to a table.
type SnapshotWriter interface {
	// Append accepts a ReaderAt object that should read the Parquet file that is to be added to the snapshot.
	Append(ctx context.Context, r io.Reader) error

	// DeleteDataFile deletes a data file from the table. The function passed to this method should return true if the file should be deleted.
	DeleteDataFile(ctx context.Context, del func(file iceberg.DataFile) bool) error

	// Close writes the new snapshot to the table and closes the writer. It is an error to call Append after Close.
	Close(ctx context.Context) error
}

type WriterOption func(*writerOptions)

type writerOptions struct {
	fastAppendMode              bool
	manifestSizeBytes           int
	mergeSchema                 bool
	expireSnapshotsOlderThan    time.Duration
	metadataDeleteAfterCommit   bool
	metadataPreviousVersionsMax int

	logger log.Logger
}

func ( log.Logger) WriterOption {
	return func( *writerOptions) {
		.logger = 
	}
}

func () WriterOption {
	return func( *writerOptions) {
		.metadataDeleteAfterCommit = true
	}
}

func ( int) WriterOption {
	return func( *writerOptions) {
		.metadataPreviousVersionsMax = 
	}
}

func ( time.Duration) WriterOption {
	return func( *writerOptions) {
		.expireSnapshotsOlderThan = 
	}
}

func () WriterOption {
	return func( *writerOptions) {
		.mergeSchema = true
	}
}

func () WriterOption {
	return func( *writerOptions) {
		.fastAppendMode = true
	}
}

func ( int) WriterOption {
	return func( *writerOptions) {
		.manifestSizeBytes = 
	}
}

func generateULID() ulid.ULID {
	 := time.Now()
	 := ulid.Monotonic(rand.New(rand.NewSource(.UnixNano())), 0)
	return ulid.MustNew(ulid.Timestamp(), )
}

func parquetSchemaToIcebergSchema( int,  *parquet.Schema) *iceberg.Schema {
	 := make([]iceberg.NestedField, 0, len(.Fields()))
	for ,  := range .Fields() {
		 = append(, iceberg.NestedField{
			Type:     parquetTypeToIcebergType(.Type()),
			ID:       ,
			Name:     .Name(),
			Required: .Required(),
		})
	}
	return iceberg.NewSchema(, ...)
}

func parquetTypeToIcebergType( parquet.Type) iceberg.Type {
	switch  := .Kind();  {
	case parquet.Boolean:
		return iceberg.BooleanType{}
	case parquet.Int32:
		return iceberg.Int32Type{}
	case parquet.Int64:
		return iceberg.Int64Type{}
	case parquet.Float:
		return iceberg.Float32Type{}
	case parquet.Double:
		return iceberg.Float64Type{}
	case parquet.ByteArray:
		return iceberg.BinaryType{}
	default:
		panic(fmt.Sprintf("unsupported parquet type: %v", ))
	}
}