package records

import (
	
	
	
	
	
	

	
	
	
	

	schemapb 
	
)

const (
	TagName = "frostdb"
)

type Record struct {
	arrow.Record
	SortingColumns []arrowutils.SortingColumn
}

// Build is a generic arrow.Record builder that ingests structs of type T. The
// generated record can be passed to (*Table).InsertRecord.
//
// Struct tag `frostdb` is used to pass options for the schema for T and use
// (*Build[T]).Schema to obtain schema v1alpha1.
//
// This api is opinionated.
//
//   - Nested Columns are not supported
//
// # Tags
//
// Use `frostdb` to define tags that customizes field values. You can express
// everything needed to construct schema v1alpha1.
//
// Tags are defined as a comma separated list. The first item is the column
// name. Column name is optional, when omitted it is derived from the field name
// (snake_cased)
//
// Supported Tags
//
//	    delta_binary_packed | Delta binary packed encoding.
//	                 brotli | Brotli compression.
//	                    asc | Sorts in ascending order.Use asc(n) where n is an integer for sorting order.
//	                   gzip | GZIP compression.
//	                 snappy | Snappy compression.
//	delta_length_byte_array | Delta Length Byte Array encoding.
//	       delta_byte_array | Delta Byte Array encoding.
//	                   desc | Sorts in descending order.Use desc(n) where n is an integer for sorting order
//	                lz4_raw | LZ4_RAW compression.
//	               pre_hash | Prehash the column before storing it.
//	             null_first | When used wit asc nulls are smallest and with des nulls are largest.
//	                   zstd | ZSTD compression.
//	               rle_dict | Dictionary run-length encoding.
//	                  plain | Plain encoding.
//
// Example tagged Sample struct
//
//	type Sample struct {
//		ExampleType string      `frostdb:"example_type,rle_dict,asc(0)"`
//		Labels      []Label     `frostdb:"labels,rle_dict,null,dyn,asc(1),null_first"`
//		Stacktrace  []uuid.UUID `frostdb:"stacktrace,rle_dict,asc(3),null_first"`
//		Timestamp   int64       `frostdb:"timestamp,asc(2)"`
//		Value       int64       `frostdb:"value"`
//	}
//
// # Dynamic columns
//
// Field of type map<string, T> is a dynamic column by default.
//
//	type Example struct {
//		// Use supported tags to customize the column value
//		Labels map[string]string `frostdb:"labels"`
//	}
//
// # Repeated columns
//
// Fields of type []int64, []float64, []bool, and []string are supported. These
// are represented as arrow.LIST.
//
// Generated schema for the repeated columns applies all supported tags. By
// default repeated fields are nullable. You can safely pass nil slices for
// repeated columns.
type Build[ any] struct {
	fields []*fieldRecord
	buffer []arrow.Array
	sort   []*fieldRecord
}

func [ any]( memory.Allocator) *Build[] {
	var  
	 := reflect.TypeOf()
	for .Kind() == reflect.Ptr {
		 = .Elem()
	}
	if .Kind() != reflect.Struct {
		panic("frostdb/dynschema: " + .String() + " is not supported")
	}
	 := &Build[]{}
	for  := 0;  < .NumField(); ++ {
		 := .Field()
		var (
			        arrow.DataType
			 bool
			    bool
			   bool
			 bool
			  bool
			  int
			  schemapb.SortingColumn_Direction

			    schemapb.StorageLayout_Encoding
			 schemapb.StorageLayout_Compression
			        schemapb.StorageLayout_Type
		)
		,  := fieldName()
		if  != "" {
			walkTag(, func(,  string) {
				switch  {
				case "null_first":
					 = true
				case "asc", "desc":
					 = true
					, _ = strconv.Atoi()
					if  == "asc" {
						 = schemapb.SortingColumn_DIRECTION_ASCENDING
					} else {
						 = schemapb.SortingColumn_DIRECTION_DESCENDING
					}
				case "pre_hash":
					 = true
				case "plain":
					 = schemapb.StorageLayout_ENCODING_PLAIN_UNSPECIFIED
				case "rle_dict":
					 = schemapb.StorageLayout_ENCODING_RLE_DICTIONARY
					 = true
				case "delta_binary_packed":
					 = schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED
				case "delta_byte_array":
					 = schemapb.StorageLayout_ENCODING_DELTA_BINARY_PACKED
				case "delta_length_byte_array":
					 = schemapb.StorageLayout_ENCODING_DELTA_LENGTH_BYTE_ARRAY
				case "snappy":
					 = schemapb.StorageLayout_COMPRESSION_SNAPPY
				case "gzip":
					 = schemapb.StorageLayout_COMPRESSION_GZIP
				case "brotli":
					 = schemapb.StorageLayout_COMPRESSION_BROTLI
				case "lz4_raw":
					 = schemapb.StorageLayout_COMPRESSION_LZ4_RAW
				case "zstd":
					 = schemapb.StorageLayout_COMPRESSION_ZSTD
				}
			})
		}
		 := &fieldRecord{
			name:        ,
			preHash:     ,
			sort:        ,
			sortOrder:   ,
			nullFirst:   ,
			direction:   ,
			compression: ,
			encoding:    ,
		}
		 := .Type
		for .Kind() == reflect.Ptr {
			 = true
			 = .Elem()
		}
		switch .Kind() {
		case reflect.Map:
			,  = baseType(.Elem(), )
			.typ = 
			.dynamic = true
			.nullable = true
			.build = newMapFieldBuilder(newFieldFunc(, , ,
				// Pointer base types needs to be property handled even for dynamic columns
				// so map[string]string and map[string]*string should all work the same.
				.Elem().Kind() == reflect.Ptr),
				newRowsBeforeFunc(, .numRowsBefore),
			)
		case reflect.Slice:
			switch {
			case isUUIDSlice():
				.typ = schemapb.StorageLayout_TYPE_STRING
				.build = newUUIDSliceField(, )
			default:
				,  = baseType(.Elem(), )
				.typ = 
				.repeated = true
				// Repeated columns are always nullable
				.nullable = true
				 = arrow.ListOf()
				.build = newFieldBuild(, , , true)
			}
		case reflect.Int64, reflect.Float64, reflect.Bool, reflect.String, reflect.Uint64:
			,  = baseType(, )
			.typ = 
			.nullable = 
			.build = newFieldBuild(, , , )
		default:
			panic("frostdb/dynschema: " + .String() + " is npt supported")
		}
		.fields = append(.fields, )
	}
	return 
}

// For dynamic columns we need to know the state of row counts to adjust nulls to
// match the record row count.
//
// This handles the case where a series of T without any dynamic columns is
// followed by dynamic columns.
func ( *Build[]) ( int) int {
	for  := 0;  <= len(.fields) &&  != ; ++ {
		 :=  < 
		 := .fields[]
		if .dynamic {
			// If we have dynamic columns before/after fieldIdx. We can stop looking if
			// the columns were appended to.
			if  := .build.Len();  != 0 {
				if  {
					// The field has already been processed. Adjust the size because we care
					// about rows count before current T appending
					--
				}
				return 
			}
			continue
		}
		 := .fields[].build.Len()
		if  {
			// The field has already been processed. Adjust the size because we care
			// about rows count before current T appending
			--
		}
		return 
	}
	return 0
}

func ( *Build[]) ( ...) error {
	for ,  := range  {
		 := reflect.ValueOf()
		for .Kind() == reflect.Ptr {
			 = .Elem()
		}
		for  := 0;  < .NumField(); ++ {
			 := .fields[].build.Append(.Field())
			if  != nil {
				return 
			}
		}
	}
	return nil
}

func ( *Build[]) () *Record {
	 := make([]arrow.Field, 0, len(.fields))
	for ,  := range .fields {
		 := .build.Fields()
		if .sort {
			if .dynamic {
				for  := 0;  < len(); ++ {
					.sort = append(.sort, )
				}
			} else {
				.sort = append(.sort, )
			}
		}
		 = append(, ...)
		.buffer = .build.NewArray(.buffer)
	}
	defer func() {
		for  := range .buffer {
			.buffer[].Release()
		}
		.buffer = .buffer[:0]
		.sort = .sort[:0]
	}()
	sort.Slice(.sort, func(,  int) bool {
		return .sort[].sortOrder < .sort[].sortOrder
	})
	 := make([]arrowutils.SortingColumn, 0, len(.sort))
	for ,  := range .sort {
		 := arrowutils.Ascending
		if .direction == schemapb.SortingColumn_DIRECTION_DESCENDING {
			 = arrowutils.Descending
		}
		 = append(, arrowutils.SortingColumn{
			Index:      ,
			Direction:  ,
			NullsFirst: .nullFirst,
		})
	}
	return &Record{
		Record: array.NewRecord(
			arrow.NewSchema(, nil),
			.buffer,
			int64(.buffer[0].Len()),
		),
		SortingColumns: ,
	}
}

func ( Build[]) ( string) ( *schemapb.Schema) {
	 = &schemapb.Schema{Name: , Columns: make([]*schemapb.Column, 0, len(.fields))}
	var  []*fieldRecord
	for ,  := range .fields {
		.Columns = append(.Columns, &schemapb.Column{
			Name:    .name,
			Dynamic: .dynamic,
			Prehash: .preHash,
			StorageLayout: &schemapb.StorageLayout{
				Type:        .typ,
				Encoding:    .encoding,
				Compression: .compression,
				Nullable:    .nullable,
				Repeated:    .repeated,
			},
		})
		if .sort {
			 = append(, )
		}
	}
	sort.Slice(, func(,  int) bool {
		return [].sortOrder < [].sortOrder
	})
	for ,  := range  {
		.SortingColumns = append(.SortingColumns, &schemapb.SortingColumn{
			Name:       .name,
			Direction:  .direction,
			NullsFirst: .nullFirst,
		})
	}
	return
}

func ( *Build[]) () {
	for ,  := range .fields {
		.build.Release()
	}
	.buffer = .buffer[:0]
}

type fieldBuilder interface {
	Fields() []arrow.Field
	Len() int
	AppendNull()
	Append(reflect.Value) error
	NewArray([]arrow.Array) []arrow.Array
	Release()
}

type mapFieldBuilder struct {
	newField   func(string) fieldBuilder
	rowsBefore func() int
	columns    map[string]fieldBuilder
	seen       map[string]struct{}
	keys       []string
}

func newFieldFunc( arrow.DataType,  memory.Allocator,  string,  bool) func(string) fieldBuilder {
	return func( string) fieldBuilder {
		return newFieldBuild(, , +"."+, )
	}
}

func newRowsBeforeFunc( int,  func(int) int) func() int {
	return func() int {
		return ()
	}
}

func newMapFieldBuilder( func(string) fieldBuilder,  func() int) *mapFieldBuilder {
	return &mapFieldBuilder{
		newField:   ,
		rowsBefore: ,
		columns:    make(map[string]fieldBuilder),
		seen:       make(map[string]struct{}),
	}
}

var _ fieldBuilder = (*mapFieldBuilder)(nil)

func ( *mapFieldBuilder) () ( []arrow.Field) {
	if len(.columns) == 0 {
		return []arrow.Field{}
	}
	 = make([]arrow.Field, 0, len(.columns))
	.keys = slices.Grow(.keys, len(.columns))
	for  := range .columns {
		.keys = append(.keys, )
	}
	sort.Strings(.keys)
	for ,  := range .keys {
		 = append(, .columns[].Fields()...)
	}
	return
}

func ( *mapFieldBuilder) ( []arrow.Array) []arrow.Array {
	if len(.columns) == 0 {
		return 
	}
	.keys = .keys[:0]
	for  := range .columns {
		.keys = append(.keys, )
	}
	sort.Strings(.keys)
	for ,  := range .keys {
		 = .columns[].NewArray()
	}
	for ,  := range .columns {
		.Release()
	}
	clear(.columns)
	.keys = .keys[:0]
	return 
}

func ( *mapFieldBuilder) () {}

func ( *mapFieldBuilder) () {
	for ,  := range .columns {
		.Release()
	}
	clear(.columns)
	.keys = .keys[:0]
}

func ( *mapFieldBuilder) ( reflect.Value) error {
	if .IsNil() || .Len() == 0 {
		for ,  := range .columns {
			.AppendNull()
		}
		return nil
	}
	clear(.seen)
	 := .MapKeys()
	 := .Len()
	if  == 0 {
		// Maybe we never supplied dynamic columns before but other columns were
		// appended.
		 = .rowsBefore()
	}
	for ,  := range  {
		 := .Interface().(string)
		.seen[] = struct{}{}
		 := .get(, ).Append(.MapIndex())
		if  != nil {
			return 
		}
	}
	for ,  := range .columns {
		,  := .seen[]
		if ! {
			// All record columns must have the same length. Set columns not present in v
			// to null
			.AppendNull()
		}
	}
	return nil
}

func ( *mapFieldBuilder) () int {
	for ,  := range .columns {
		return .Len()
	}
	return 0
}

func ( *mapFieldBuilder) ( string,  int) fieldBuilder {
	,  := .columns[]
	if  {
		return 
	}
	 = .newField()
	for  := 0;  < ; ++ {
		.AppendNull()
	}

	.columns[] = 
	return 
}

func baseType( reflect.Type,  bool) ( arrow.DataType,  schemapb.StorageLayout_Type) {
	for .Kind() == reflect.Ptr {
		 = .Elem()
	}
	switch .Kind() {
	case reflect.Int64:
		 = arrow.PrimitiveTypes.Int64
		 = schemapb.StorageLayout_TYPE_INT64
	case reflect.Float64:
		 = arrow.PrimitiveTypes.Float64
		 = schemapb.StorageLayout_TYPE_DOUBLE
	case reflect.Bool:
		 = arrow.FixedWidthTypes.Boolean
		 = schemapb.StorageLayout_TYPE_BOOL
	case reflect.String:
		 = arrow.BinaryTypes.String
		 = schemapb.StorageLayout_TYPE_STRING
	case reflect.Uint64:
		 = arrow.PrimitiveTypes.Uint64
		 = schemapb.StorageLayout_TYPE_UINT64
	default:
		panic("frostdb/dynschema: " + .String() + " is npt supported")
	}
	if  {
		 = &arrow.DictionaryType{
			IndexType: &arrow.Uint32Type{},
			ValueType: ,
		}
	}
	return
}

func fieldName( reflect.StructField) (,  string) {
	, , _ = strings.Cut(.Tag.Get(TagName), ",")
	if  == "" {
		 = ToSnakeCase(.Name)
	}
	return
}

func newFieldBuild( arrow.DataType,  memory.Allocator,  string,  bool) ( *fieldBuilderFunc) {
	 := array.NewBuilder(, )
	 = &fieldBuilderFunc{
		col: arrow.Field{
			Name:     ,
			Type:     ,
			Nullable: ,
		},
		releaseFunc: .Release,
		nilFunc:     .AppendNull,
		len:         .Len,
		newArraysFunc: func( []arrow.Array) []arrow.Array {
			return append(, .NewArray())
		},
	}
	switch e := .(type) {
	case *array.Int64Builder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			.Append(.Int())
			return nil
		}
	case *array.Int64DictionaryBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			return .Append(.Int())
		}
	case *array.Uint64Builder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			.Append(.Uint())
			return nil
		}
	case *array.Uint64DictionaryBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			return .Append(.Uint())
		}
	case *array.Float64Builder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			.Append(.Float())
			return nil
		}
	case *array.Float64DictionaryBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			return .Append(.Float())
		}
	case *array.BooleanBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			.Append(.Bool())
			return nil
		}
	case *array.StringBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			.Append(.Interface().(string))
			return nil
		}
	case *array.BinaryDictionaryBuilder:
		.buildFunc = func( reflect.Value) error {
			if  {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				 = .Elem()
			}
			return .AppendString(.Interface().(string))
		}
	case *array.ListBuilder:
		switch build := .ValueBuilder().(type) {
		case *array.Int64Builder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyInt(, func( int64) error {
					.Append()
					return nil
				})
			}
		case *array.Int64DictionaryBuilder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyInt(, .Append)
			}
		case *array.Uint64Builder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyUInt(, func( uint64) error {
					.Append()
					return nil
				})
			}
		case *array.Float64Builder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyFloat64(, func( float64) error {
					.Append()
					return nil
				})
			}
		case *array.Float64DictionaryBuilder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyFloat64(, .Append)
			}

		case *array.StringBuilder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyString(, func( string) error {
					.Append()
					return nil
				})
			}
		case *array.BinaryDictionaryBuilder:
			.buildFunc = func( reflect.Value) error {
				if .Len() == 0 {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyString(, .AppendString)
			}
		case *array.BooleanBuilder:
			.buildFunc = func( reflect.Value) error {
				if .IsNil() {
					.AppendNull()
					return nil
				}
				.Append(true)
				.Reserve(.Len())
				return applyBool(, func( bool) error {
					.Append()
					return nil
				})
			}
		}
	default:
		panic("frostdb:dynschema: unsupported array builder " + .Type().String())
	}
	return
}

func applyString( reflect.Value,  func(string) error) error {
	return listApply[string](, func( reflect.Value) string {
		return .Interface().(string)
	}, )
}

func applyFloat64( reflect.Value,  func(float64) error) error {
	return listApply[float64](, func( reflect.Value) float64 {
		return .Float()
	}, )
}

func applyBool( reflect.Value,  func(bool) error) error {
	return listApply[bool](, func( reflect.Value) bool {
		return .Bool()
	}, )
}

func applyInt( reflect.Value,  func(int64) error) error {
	return listApply[int64](, func( reflect.Value) int64 {
		return .Int()
	}, )
}

func applyUInt( reflect.Value,  func(uint64) error) error {
	return listApply[uint64](, func( reflect.Value) uint64 {
		return .Uint()
	}, )
}

func listApply[ any]( reflect.Value,  func(reflect.Value) ,  func() error) error {
	for  := 0;  < .Len(); ++ {
		 := ((.Index()))
		if  != nil {
			return 
		}
	}
	return nil
}

func newUUIDSliceField( memory.Allocator,  string) ( *fieldBuilderFunc) {
	 := &arrow.DictionaryType{
		IndexType: &arrow.Int32Type{},
		ValueType: &arrow.BinaryType{},
	}
	 := array.NewBuilder(, )
	 = &fieldBuilderFunc{
		col: arrow.Field{
			Name: ,
			Type: ,
		},
		releaseFunc: .Release,
		nilFunc:     .AppendNull,
		len:         .Len,
		newArraysFunc: func( []arrow.Array) []arrow.Array {
			return append(, .NewArray())
		},
	}
	 := .(*array.BinaryDictionaryBuilder)
	.buildFunc = func( reflect.Value) error {
		return .Append(ExtractLocationIDs(.Interface().([]uuid.UUID)))
	}
	return
}

func ( []uuid.UUID) []byte {
	 := make([]byte, len()*16) // UUID are 16 bytes thus multiply by 16
	 := 0
	for  := len() - 1;  >= 0; -- {
		copy([:+16], [][:])
		 += 16
	}
	return 
}

type fieldBuilderFunc struct {
	len           func() int
	col           arrow.Field
	nilFunc       func()
	buildFunc     func(reflect.Value) error
	newArraysFunc func([]arrow.Array) []arrow.Array
	releaseFunc   func()
}

var _ fieldBuilder = (*fieldBuilderFunc)(nil)

func ( *fieldBuilderFunc) () []arrow.Field                  { return []arrow.Field{.col} }
func ( *fieldBuilderFunc) () int                               { return .len() }
func ( *fieldBuilderFunc) ()                            { .nilFunc() }
func ( *fieldBuilderFunc) ( reflect.Value) error           { return .buildFunc() }
func ( *fieldBuilderFunc) ( []arrow.Array) []arrow.Array { return .newArraysFunc() }
func ( *fieldBuilderFunc) ()                               { .releaseFunc() }

type fieldRecord struct {
	name        string
	dynamic     bool
	preHash     bool
	nullable    bool
	repeated    bool
	sort        bool
	nullFirst   bool
	sortOrder   int
	direction   schemapb.SortingColumn_Direction
	encoding    schemapb.StorageLayout_Encoding
	compression schemapb.StorageLayout_Compression
	typ         schemapb.StorageLayout_Type
	build       fieldBuilder
}

func walkTag( string,  func(,  string)) {
	if  == "" {
		return
	}
	, ,  := strings.Cut(, ",")
	if  != "" {
		, ,  := strings.Cut(, "(")
		, _, _ = strings.Cut(, ")")
		(, )
	}
	(, )
}

var uuidSliceType = reflect.TypeOf([]uuid.UUID{})

func isUUIDSlice( reflect.Type) bool {
	return .AssignableTo(uuidSliceType)
}

var (
	matchFirstCap = regexp.MustCompile("(.)([A-Z][a-z]+)")
	matchAllCap   = regexp.MustCompile("([a-z0-9])([A-Z])")
)

func ( string) string {
	 := matchFirstCap.ReplaceAllString(, "${1}_${2}")
	 = matchAllCap.ReplaceAllString(, "${1}_${2}")
	return strings.ToLower()
}