package dynparquet

import (
	
	
	
	

	
	
	
	
	
)

const prehashedPrefix = "hashed"

func ( string) string {
	return prehashedPrefix + "." + 
}

func ( string) bool {
	return strings.HasPrefix(, prehashedPrefix)
}

// findHashedColumn finds the index of the column in the given fields that have been prehashed.
func ( string,  []arrow.Field) int {
	for ,  := range  {
		if HashedColumnName() == .Name {
			return 
		}
	}

	return -1
}

// prehashColumns prehashes the columns in the given record that have been marked as prehashed in the given schema.
func ( *Schema,  arrow.Record) arrow.Record {
	 := array.NewInt64Builder(memory.DefaultAllocator) // TODO pass in allocator
	defer .Release()

	 := .Schema().Fields()
	 := make([]arrow.Field, 0, len())
	 := make([]arrow.Array, 0, len())
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	for ,  := range .Columns() {
		if !.PreHash {
			continue
		}

		for ,  := range  {
			if .Name == .Name || (.Dynamic && strings.HasPrefix(.Name, .Name)) {
				 = append(, arrow.Field{
					Name: HashedColumnName(.Name),
					Type: arrow.PrimitiveTypes.Int64,
				})

				// Hash the column
				 := HashArray(.Column())

				// Build the new column
				.Reserve(len())
				for ,  := range  {
					.UnsafeAppend(int64())
				}

				 = append(, .NewArray())
			}
		}
	}

	if len() == 0 {
		.Retain() // NOTE: we retain here because we expect the caller to release the record that we're returning
		return 
	}

	 := arrow.NewSchema(append(, ...), nil)
	return array.NewRecord(, append(.Columns(), ...), .NumRows())
}

func ( arrow.Array) []uint64 {
	switch ar := .(type) {
	case *array.String:
		return hashStringArray()
	case *array.Binary:
		return hashBinaryArray()
	case *array.Int64:
		return hashInt64Array()
	case *array.Uint64:
		return hashUint64Array()
	case *array.Boolean:
		return hashBooleanArray()
	case *array.Dictionary:
		return hashDictionaryArray()
	case *array.List:
		return hashListArray()
	default:
		panic("unsupported array type " + fmt.Sprintf("%T", ))
	}
}

func hashListArray( *array.List) ( []uint64) {
	 = make([]uint64, .Len())
	 := xxhash.New()
	switch e := .ListValues().(type) {
	case *array.Int64:
		var  [8]byte
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			for  := ;  < ; ++ {
				_, _ = .Write(binary.BigEndian.AppendUint64([:0],
					uint64(.Value(int()))))
			}
			[] = .Sum64()
			.Reset()
		}
		return
	case *array.Float64:
		var  [8]byte
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			for  := ;  < ; ++ {
				_, _ = .Write(binary.BigEndian.AppendUint64([:0],
					math.Float64bits(.Value(int()))))
			}
			[] = .Sum64()
			.Reset()
		}
		return
	case *array.Boolean:
		var  [1]byte
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			for  := ;  < ; ++ {
				if .Value(int()) {
					[0] = 2
				} else {
					[0] = 1
				}
				_, _ = .Write([:])
			}
			[] = .Sum64()
			.Reset()
		}
		return
	case *array.Binary:
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			for  := ;  < ; ++ {
				_, _ = .Write(.Value(int()))
			}
			[] = .Sum64()
			.Reset()
		}
		return
	case *array.String:
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			for  := ;  < ; ++ {
				_, _ = .WriteString(.Value(int()))
			}
			[] = .Sum64()
			.Reset()
		}
		return
	case *array.Dictionary:
		switch dict := .Dictionary().(type) {
		case *array.Binary:
			for  := 0;  < .Len(); ++ {
				,  := .ValueOffsets()
				for  := ;  < ; ++ {
					_, _ = .Write(.Value(.GetValueIndex(int())))
				}
				[] = .Sum64()
				.Reset()
			}
			return
		case *array.String:
			for  := 0;  < .Len(); ++ {
				,  := .ValueOffsets()
				for  := ;  < ; ++ {
					_, _ = .WriteString(.Value(.GetValueIndex(int())))
				}
				[] = .Sum64()
				.Reset()
			}
			return
		default:
			panic(fmt.Sprintf("list dictionary not of expected type: %T", ))
		}
	default:
		panic(fmt.Sprintf("list not of expected type: %T", ))
	}
}

func hashDictionaryArray( *array.Dictionary) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if !.IsNull() {
			switch dict := .Dictionary().(type) {
			case *array.Binary:
				[] = metro.Hash64(.Value(.GetValueIndex()), 0)
			case *array.String:
				[] = metro.Hash64([]byte(.Value(.GetValueIndex())), 0)
			default:
				panic("unsupported dictionary type " + fmt.Sprintf("%T", ))
			}
		}
	}
	return 
}

func hashBinaryArray( *array.Binary) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if !.IsNull() {
			[] = metro.Hash64(.Value(), 0)
		}
	}
	return 
}

func hashBooleanArray( *array.Boolean) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if .IsNull() {
			[] = 0
			continue
		}
		if .Value() {
			[] = 2
		} else {
			[] = 1
		}
	}
	return 
}

func hashStringArray( *array.String) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if !.IsNull() {
			[] = metro.Hash64([]byte(.Value()), 0)
		}
	}
	return 
}

func hashInt64Array( *array.Int64) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if !.IsNull() {
			[] = uint64(.Value())
		}
	}
	return 
}

func hashUint64Array( *array.Uint64) []uint64 {
	 := make([]uint64, .Len())
	for  := 0;  < .Len(); ++ {
		if !.IsNull() {
			[] = .Value()
		}
	}
	return 
}

// RemoveHashedColumns removes the hashed columns from the record.
func ( arrow.Record) arrow.Record {
	 := make([]arrow.Array, 0, .Schema().NumFields())
	 := make([]arrow.Field, 0, .Schema().NumFields())
	for  := 0;  < .Schema().NumFields(); ++ {
		if !IsHashedColumn(.Schema().Field().Name) {
			 = append(, .Column())
			 = append(, .Schema().Field())
		}
	}

	return array.NewRecord(arrow.NewSchema(, nil), , .NumRows())
}