// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package table

import (
	
	
	
	

	

	
)

// Metadata for an iceberg table as specified in the Iceberg spec
//
// https://iceberg.apache.org/spec/#iceberg-table-spec
type Metadata interface {
	// Version indicates the version of this metadata, 1 for V1, 2 for V2, etc.
	Version() int
	// TableUUID returns a UUID that identifies the table, generated when the
	// table is created. Implementations must throw an exception if a table's
	// UUID does not match the expected UUID after refreshing metadata.
	TableUUID() uuid.UUID
	// Location is the table's base location. This is used by writers to determine
	// where to store data files, manifest files, and table metadata files.
	Location() string
	// LastUpdatedMillis is the timestamp in milliseconds from the unix epoch when
	// the table was last updated. Each table metadata file should update this
	// field just before writing.
	LastUpdatedMillis() int64
	// LastColumnID returns the highest assigned column ID for the table.
	// This is used to ensure fields are always assigned an unused ID when
	// evolving schemas.
	LastColumnID() int
	// Schemas returns the list of schemas, stored as objects with their
	// schema-id.
	Schemas() []*iceberg.Schema
	// SchemaID returns the ID of the current schema.
	SchemaID() int
	// CurrentSchema returns the table's current schema.
	CurrentSchema() *iceberg.Schema
	// PartitionSpecs returns the list of all partition specs in the table.
	PartitionSpecs() []iceberg.PartitionSpec
	// PartitionSpec returns the current partition spec that the table is using.
	PartitionSpec() iceberg.PartitionSpec
	// DefaultPartitionSpec is the ID of the current spec that writers should
	// use by default.
	DefaultPartitionSpec() int
	// LastPartitionSpecID is the highest assigned partition field ID across
	// all partition specs for the table. This is used to ensure partition
	// fields are always assigned an unused ID when evolving specs.
	LastPartitionSpecID() *int
	// Snapshots returns the list of valid snapshots. Valid snapshots are
	// snapshots for which all data files exist in the file system. A data
	// file must not be deleted from the file system until the last snapshot
	// in which it was listed is garbage collected.
	Snapshots() []Snapshot
	// SnapshotByID find and return a specific snapshot by its ID. Returns
	// nil if the ID is not found in the list of snapshots.
	SnapshotByID(int64) *Snapshot
	// SnapshotID returns the ID of the current snapshot.
	SnapshotID() *int64
	// SnapshotByName searches the list of snapshots for a snapshot with a given
	// ref name. Returns nil if there's no ref with this name for a snapshot.
	SnapshotByName(name string) *Snapshot
	// CurrentSnapshot returns the table's current snapshot.
	CurrentSnapshot() *Snapshot
	// SortOrder returns the table's current sort order, ie: the one with the
	// ID that matches the default-sort-order-id.
	SortOrder() SortOrder
	// SortOrders returns the list of sort orders in the table.
	SortOrders() []SortOrder
	// Properties is a string to string map of table properties. This is used
	// to control settings that affect reading and writing and is not intended
	// to be used for arbitrary metadata. For example, commit.retry.num-retries
	// is used to control the number of commit retries.
	Properties() iceberg.Properties

	// GetSnapshotLog is a list of snapshot log entries. This is used to track
	GetSnapshotLog() []SnapshotLogEntry
	// GetMetadataLog is a list of metadata log entries. This is used to track
	GetMetadataLog() []MetadataLogEntry
	// SortOrderID returns the ID of the current sort order.
	SortOrderID() int
	// SnapshotRefs is a map of snapshot ref names to snapshot refs. This is used to
	SnapshotRefs() map[string]SnapshotRef
}

var (
	ErrInvalidMetadataFormatVersion = errors.New("invalid or missing format-version in table metadata")
	ErrInvalidMetadata              = errors.New("invalid metadata")
)

// ParseMetadata parses json metadata provided by the passed in reader,
// returning an error if one is encountered.
func ( io.Reader) (Metadata, error) {
	,  := io.ReadAll()
	if  != nil {
		return nil, 
	}

	return ParseMetadataBytes()
}

// ParseMetadataString is like [ParseMetadata], but for a string rather than
// an io.Reader.
func ( string) (Metadata, error) {
	return ParseMetadataBytes([]byte())
}

// ParseMetadataBytes is like [ParseMetadataString] but for a byte slice.
func ( []byte) (Metadata, error) {
	 := struct {
		 int `json:"format-version"`
	}{}
	if  := json.Unmarshal(, &);  != nil {
		return nil, 
	}

	var  Metadata
	switch . {
	case 1:
		 = &MetadataV1{}
	case 2:
		 = &MetadataV2{}
	default:
		return nil, ErrInvalidMetadataFormatVersion
	}

	return , json.Unmarshal(, )
}

// https://iceberg.apache.org/spec/#iceberg-table-spec
type commonMetadata struct {
	FormatVersion      int                     `json:"format-version"`
	UUID               uuid.UUID               `json:"table-uuid"`
	Loc                string                  `json:"location"`
	LastUpdatedMS      int64                   `json:"last-updated-ms"`
	LastColumnId       int                     `json:"last-column-id"`
	SchemaList         []*iceberg.Schema       `json:"schemas,omitempty"`
	CurrentSchemaID    int                     `json:"current-schema-id"`
	Specs              []iceberg.PartitionSpec `json:"partition-specs"`
	DefaultSpecID      int                     `json:"default-spec-id"`
	LastPartitionID    *int                    `json:"last-partition-id,omitempty"`
	Props              iceberg.Properties      `json:"properties"`
	SnapshotList       []Snapshot              `json:"snapshots,omitempty"`
	CurrentSnapshotID  *int64                  `json:"current-snapshot-id,omitempty"`
	SnapshotLog        []SnapshotLogEntry      `json:"snapshot-log,omitempty"`
	MetadataLog        []MetadataLogEntry      `json:"metadata-log,omitempty"`
	SortOrderList      []SortOrder             `json:"sort-orders,omitempty"`
	DefaultSortOrderID int                     `json:"default-sort-order-id,omitempty"`
	Refs               map[string]SnapshotRef  `json:"refs,omitempty"`
}

func ( *commonMetadata) () []SnapshotLogEntry   { return .SnapshotLog }
func ( *commonMetadata) () []MetadataLogEntry   { return .MetadataLog }
func ( *commonMetadata) () map[string]SnapshotRef { return .Refs }
func ( *commonMetadata) () int                     { return .DefaultSortOrderID }
func ( *commonMetadata) () *int64                   { return .CurrentSnapshotID }
func ( *commonMetadata) () int                        { return .CurrentSchemaID }
func ( *commonMetadata) () uuid.UUID                 { return .UUID }
func ( *commonMetadata) () string                     { return .Loc }
func ( *commonMetadata) () int64             { return .LastUpdatedMS }
func ( *commonMetadata) () int                    { return .LastColumnId }
func ( *commonMetadata) () []*iceberg.Schema           { return .SchemaList }
func ( *commonMetadata) () *iceberg.Schema {
	for ,  := range .SchemaList {
		if .ID == .CurrentSchemaID {
			return 
		}
	}
	panic("should never get here")
}

func ( *commonMetadata) () []iceberg.PartitionSpec {
	return .Specs
}

func ( *commonMetadata) () int {
	return .DefaultSpecID
}

func ( *commonMetadata) () iceberg.PartitionSpec {
	for ,  := range .Specs {
		if .ID() == .DefaultSpecID {
			return 
		}
	}
	return *iceberg.UnpartitionedSpec
}

func ( *commonMetadata) () *int { return .LastPartitionID }
func ( *commonMetadata) () []Snapshot     { return .SnapshotList }
func ( *commonMetadata) ( int64) *Snapshot {
	for  := range .SnapshotList {
		if .SnapshotList[].SnapshotID ==  {
			return &.SnapshotList[]
		}
	}
	return nil
}

func ( *commonMetadata) ( string) *Snapshot {
	if ,  := .Refs[];  {
		return .SnapshotByID(.SnapshotID)
	}
	return nil
}

func ( *commonMetadata) () *Snapshot {
	if .CurrentSnapshotID == nil {
		return nil
	}
	return .SnapshotByID(*.CurrentSnapshotID)
}

func ( *commonMetadata) () []SortOrder { return .SortOrderList }
func ( *commonMetadata) () SortOrder {
	for ,  := range .SortOrderList {
		if .OrderID == .DefaultSortOrderID {
			return 
		}
	}
	return UnsortedSortOrder
}

func ( *commonMetadata) () iceberg.Properties {
	return .Props
}

// preValidate updates values in the metadata struct with defaults based on
// combinations of struct members. Such as initializing slices as empty slices
// if they were null in the metadata, or normalizing inconsistencies between
// metadata versions.
func ( *commonMetadata) () {
	if .CurrentSnapshotID != nil && *.CurrentSnapshotID == -1 {
		// treat -1 as the same as nil, clean this up in pre-validation
		// to make the validation logic simplified later
		.CurrentSnapshotID = nil
	}

	if .Refs == nil {
		.Refs = make(map[string]SnapshotRef)
	}

	if .CurrentSnapshotID != nil {
		if ,  := .Refs[MainBranch]; ! {
			.Refs[MainBranch] = SnapshotRef{
				SnapshotID:      *.CurrentSnapshotID,
				SnapshotRefType: BranchRef,
			}
		}
	}

	if .MetadataLog == nil {
		.MetadataLog = []MetadataLogEntry{}
	}

	if .SnapshotLog == nil {
		.SnapshotLog = []SnapshotLogEntry{}
	}

	if .Props == nil {
		.Props = iceberg.Properties{}
	}
}

func ( *commonMetadata) () error {
	// check that current-schema-id is present in schemas
	for ,  := range .SchemaList {
		if .ID == .CurrentSchemaID {
			return nil
		}
	}

	return fmt.Errorf("%w: current-schema-id %d can't be found in any schema",
		ErrInvalidMetadata, .CurrentSchemaID)
}

func ( *commonMetadata) () error {
	for ,  := range .Specs {
		if .ID() == .DefaultSpecID {
			return nil
		}
	}

	return fmt.Errorf("%w: default-spec-id %d can't be found",
		ErrInvalidMetadata, .DefaultSpecID)
}

func ( *commonMetadata) () error {
	if .DefaultSortOrderID == UnsortedSortOrderID {
		return nil
	}

	for ,  := range .SortOrderList {
		if .OrderID == .DefaultSortOrderID {
			return nil
		}
	}

	return fmt.Errorf("%w: default-sort-order-id %d can't be found in %+v",
		ErrInvalidMetadata, .DefaultSortOrderID, .SortOrderList)
}

func ( *commonMetadata) () error {
	if  := .checkSchemas();  != nil {
		return 
	}

	if  := .checkPartitionSpecs();  != nil {
		return 
	}

	if  := .checkSortOrders();  != nil {
		return 
	}

	switch {
	case .LastUpdatedMS == 0:
		// last-updated-ms is required
		return fmt.Errorf("%w: missing last-updated-ms", ErrInvalidMetadata)
	case .LastColumnId == 0:
		// last-column-id is required
		return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata)
	}

	return nil
}

func ( *commonMetadata) () int { return .FormatVersion }

type MetadataV1 struct {
	commonMetadata
	Schema    *iceberg.Schema          `json:"schema"`
	Partition []iceberg.PartitionField `json:"partition-spec"`
}

func ( *MetadataV1) () {
	if len(.Partition) == 0 {
		.Partition = []iceberg.PartitionField{}
	}

	if len(.SchemaList) == 0 {
		.SchemaList = []*iceberg.Schema{.Schema}
	}

	if len(.Specs) == 0 {
		.Specs = []iceberg.PartitionSpec{
			iceberg.NewPartitionSpec(.Partition...)}
		.DefaultSpecID = .Specs[0].ID()
	}

	if .LastPartitionID == nil {
		 := .Specs[0].LastAssignedFieldID()
		for ,  := range .Specs[1:] {
			 := .LastAssignedFieldID()
			if  >  {
				 = 
			}
		}
		.LastPartitionID = &
	}

	if len(.SortOrderList) == 0 {
		.SortOrderList = []SortOrder{UnsortedSortOrder}
	}

	.commonMetadata.preValidate()
}

func ( *MetadataV1) () *iceberg.Schema {
	if .Schema != nil {
		return .Schema
	}

	return .commonMetadata.CurrentSchema()
}

func ( *MetadataV1) ( []byte) error {
	type  MetadataV1
	 := (*)()

	if  := json.Unmarshal(, );  != nil {
		return 
	}

	.preValidate()
	return .validate()
}

func ( *MetadataV1) () MetadataV2 {
	 := .commonMetadata
	.FormatVersion = 2
	if .UUID.String() == "" {
		.UUID = uuid.New()
	}

	return MetadataV2{commonMetadata: }
}

type MetadataV2 struct {
	LastSequenceNumber int `json:"last-sequence-number"`

	commonMetadata
}

func ( *MetadataV2) ( []byte) error {
	type  MetadataV2
	 := (*)()

	if  := json.Unmarshal(, );  != nil {
		return 
	}

	.preValidate()
	return .validate()
}