// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package iceberg

import (
	
	
	

	
)

const (
	partitionDataIDStart   = 1000
	InitialPartitionSpecID = 0
)

// UnpartitionedSpec is the default unpartitioned spec which can
// be used for comparisons or to just provide a convenience for referencing
// the same unpartitioned spec object.
var UnpartitionedSpec = &PartitionSpec{id: 0}

// PartitionField represents how one partition value is derived from the
// source column by transformation.
type PartitionField struct {
	// SourceID is the source column id of the table's schema
	SourceID int `json:"source-id"`
	// FieldID is the partition field id across all the table partition specs
	FieldID int `json:"field-id,omitempty"`
	// Name is the name of the partition field itself
	Name string `json:"name"`
	// Transform is the transform used to produce the partition value
	Transform Transform `json:"transform"`
}

func ( *PartitionField) () string {
	return fmt.Sprintf("%d: %s: %s(%d)", .FieldID, .Name, .Transform, .SourceID)
}

func ( *PartitionField) ( []byte) error {
	type  PartitionField
	 := struct {
		 string `json:"transform"`
		*
	}{
		: (*)(),
	}

	 := json.Unmarshal(, &)
	if  != nil {
		return 
	}

	if .Transform,  = ParseTransform(.);  != nil {
		return 
	}

	return nil
}

// PartitionSpec captures the transformation from table data to partition values
type PartitionSpec struct {
	// any change to a PartitionSpec will produce a new spec id
	id     int
	fields []PartitionField

	// this is populated by initialize after creation
	sourceIdToFields map[int][]PartitionField
}

func ( ...PartitionField) PartitionSpec {
	return NewPartitionSpecID(InitialPartitionSpecID, ...)
}

func ( int,  ...PartitionField) PartitionSpec {
	 := PartitionSpec{id: , fields: }
	.initialize()
	return 
}

// CompatibleWith returns true if this partition spec is considered
// compatible with the passed in partition spec. This means that the two
// specs have equivalent field lists regardless of the spec id.
func ( *PartitionSpec) ( *PartitionSpec) bool {
	if  ==  {
		return true
	}

	if len(.fields) != len(.fields) {
		return false
	}

	return slices.EqualFunc(.fields, .fields, func(,  PartitionField) bool {
		return .SourceID == .SourceID && .Name == .Name &&
			.Transform == .Transform
	})
}

// Equals returns true iff the field lists are the same AND the spec id
// is the same between this partition spec and the provided one.
func ( *PartitionSpec) ( PartitionSpec) bool {
	return .id == .id && slices.Equal(.fields, .fields)
}

func ( PartitionSpec) () ([]byte, error) {
	if .fields == nil {
		.fields = []PartitionField{}
	}
	return json.Marshal(struct {
		     int              `json:"spec-id"`
		 []PartitionField `json:"fields"`
	}{.id, .fields})
}

func ( *PartitionSpec) ( []byte) error {
	 := struct {
		     int              `json:"spec-id"`
		 []PartitionField `json:"fields"`
	}{: .id, : .fields}

	if  := json.Unmarshal(, &);  != nil {
		return 
	}

	.id, .fields = ., .
	.initialize()
	return nil
}

func ( *PartitionSpec) () {
	.sourceIdToFields = make(map[int][]PartitionField)
	for ,  := range .fields {
		.sourceIdToFields[.SourceID] =
			append(.sourceIdToFields[.SourceID], )
	}
}

func ( *PartitionSpec) () int                    { return .id }
func ( *PartitionSpec) () int             { return len(.fields) }
func ( *PartitionSpec) ( int) PartitionField { return .fields[] }

func ( *PartitionSpec) () bool {
	if len(.fields) == 0 {
		return true
	}

	for ,  := range .fields {
		if ,  := .Transform.(VoidTransform); ! {
			return false
		}
	}

	return true
}

func ( *PartitionSpec) ( int) []PartitionField {
	return slices.Clone(.sourceIdToFields[])
}

func ( PartitionSpec) () string {
	var  strings.Builder
	.WriteByte('[')
	for ,  := range .fields {
		if  == 0 {
			.WriteString("\n")
		}
		.WriteString("\t")
		.WriteString(.String())
		.WriteString("\n")
	}
	.WriteByte(']')

	return .String()
}

func ( *PartitionSpec) () int {
	if len(.fields) == 0 {
		return partitionDataIDStart - 1
	}

	 := .fields[0].FieldID
	for ,  := range .fields[1:] {
		if .FieldID >  {
			 = .FieldID
		}
	}
	return 
}

// PartitionType produces a struct of the partition spec.
//
// The partition fields should be optional:
//   - All partition transforms are required to produce null if the input value
//     is null. This can happen when the source column is optional.
//   - Partition fields may be added later, in which case not all files would
//     have the result field and it may be null.
//
// There is a case where we can guarantee that a partition field in the first
// and only parittion spec that uses a required source column will never be
// null, but it doesn't seem worth tracking this case.
func ( *PartitionSpec) ( *Schema) *StructType {
	 := []NestedField{}
	for ,  := range .fields {
		,  := .FindTypeByID(.SourceID)
		if ! {
			continue
		}
		 := .Transform.ResultType()
		 = append(, NestedField{
			ID:       .FieldID,
			Name:     .Name,
			Type:     ,
			Required: false,
		})
	}
	return &StructType{FieldList: }
}