// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dictutils

import (
	
	
	

	
	
	
)

type Kind int8

const (
	KindNew Kind = iota
	KindDelta
	KindReplacement
)

type FieldPos struct {
	parent       *FieldPos
	index, depth int32
}

func () FieldPos { return FieldPos{index: -1} }

func ( *FieldPos) ( int32) FieldPos {
	return FieldPos{parent: , index: , depth: .depth + 1}
}

func ( *FieldPos) () []int32 {
	 := make([]int32, .depth)
	 := 
	for  := .depth - 1;  >= 0; -- {
		[] = int32(.index)
		 = .parent
	}
	return 
}

type Mapper struct {
	pathToID map[uint64]int64
	hasher   maphash.Hash
}

func ( *Mapper) () int {
	 := make(map[int64]bool)
	for ,  := range .pathToID {
		[] = true
	}
	return len()
}

func ( *Mapper) ( int64,  []int32) error {
	.hasher.Write(arrow.Int32Traits.CastToBytes())
	defer .hasher.Reset()

	 := .hasher.Sum64()
	if ,  := .pathToID[];  {
		return errors.New("field already mapped to id")
	}

	.pathToID[] = 
	return nil
}

func ( *Mapper) ( []int32) (int64, error) {
	.hasher.Write(arrow.Int32Traits.CastToBytes())
	defer .hasher.Reset()

	,  := .pathToID[.hasher.Sum64()]
	if ! {
		return -1, errors.New("arrow/ipc: dictionary field not found")
	}
	return , nil
}

func ( *Mapper) () int {
	return len(.pathToID)
}

func ( *Mapper) ( FieldPos) {
	 := len(.pathToID)
	.hasher.Write(arrow.Int32Traits.CastToBytes(.Path()))

	.pathToID[.hasher.Sum64()] = int64()
	.hasher.Reset()
}

func ( *Mapper) ( FieldPos,  arrow.Field) {
	 := .Type
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	if .ID() == arrow.DICTIONARY {
		.InsertPath()
		// import nested dicts
		if ,  := .(*arrow.DictionaryType).ValueType.(arrow.NestedType);  {
			.ImportFields(, .Fields())
		}
		return
	}

	if ,  := .(arrow.NestedType);  {
		.ImportFields(, .Fields())
	}
}

func ( *Mapper) ( FieldPos,  []arrow.Field) {
	for  := range  {
		.ImportField(.Child(int32()), [])
	}
}

func ( *Mapper) ( *arrow.Schema) {
	.pathToID = make(map[uint64]int64)
	// This code path intentionally avoids calling ImportFields with
	// schema.Fields to avoid allocations.
	 := NewFieldPos()
	for  := 0;  < .NumFields(); ++ {
		.ImportField(.Child(int32()), .Field())
	}
}

func hasUnresolvedNestedDict( arrow.ArrayData) bool {
	 := .(*array.Data)
	if .DataType().ID() == arrow.DICTIONARY {
		if .Dictionary().(*array.Data) == nil {
			return true
		}
		if (.Dictionary()) {
			return true
		}
	}
	for ,  := range .Children() {
		if () {
			return true
		}
	}
	return false
}

type dictpair struct {
	ID   int64
	Dict arrow.Array
}

type dictCollector struct {
	dictionaries []dictpair
	mapper       *Mapper
}

func ( *dictCollector) ( FieldPos,  arrow.DataType,  arrow.Array) error {
	for ,  := range .Data().Children() {
		 := array.MakeFromData()
		defer .Release()
		if  := .visit(.Child(int32()), );  != nil {
			return 
		}
	}
	return nil
}

func ( *dictCollector) ( FieldPos,  arrow.Array) error {
	 := .DataType()
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
		 = .(array.ExtensionArray).Storage()
	}

	if .ID() == arrow.DICTIONARY {
		 := .(*array.Dictionary)
		 := .Dictionary()

		// traverse the dictionary to first gather any nested dictionaries
		// so they appear in the output before their respective parents
		 := .(*arrow.DictionaryType)
		.visitChildren(, .ValueType, )

		,  := .mapper.GetFieldID(.Path())
		if  != nil {
			return 
		}
		.Retain()
		.dictionaries = append(.dictionaries, dictpair{ID: , Dict: })
		return nil
	}
	return .visitChildren(, , )
}

func ( *dictCollector) ( arrow.RecordBatch) error {
	var (
		    = NewFieldPos()
		 = .Schema()
	)
	.dictionaries = make([]dictpair, 0, .mapper.NumFields())
	for  := range .Fields() {
		if  := .visit(.Child(int32()), .Column());  != nil {
			return 
		}
	}
	return nil
}

type dictMap map[int64][]arrow.ArrayData
type dictTypeMap map[int64]arrow.DataType

type Memo struct {
	Mapper  Mapper
	dict2id map[arrow.ArrayData]int64

	id2type dictTypeMap
	id2dict dictMap // map of dictionary ID to dictionary array
}

func () Memo {
	return Memo{
		dict2id: make(map[arrow.ArrayData]int64),
		id2dict: make(dictMap),
		id2type: make(dictTypeMap),
		Mapper: Mapper{
			pathToID: make(map[uint64]int64),
		},
	}
}

func ( *Memo) () int { return len(.id2dict) }

func ( *Memo) () {
	for ,  := range .id2dict {
		delete(.id2dict, )
		for ,  := range  {
			delete(.dict2id, )
			.Release()
		}
	}
}

func ( *Memo) ( int64,  memory.Allocator) (arrow.ArrayData, error) {
	,  := .id2dict[]
	if ! {
		return nil, fmt.Errorf("arrow/ipc: no dictionaries found for id=%d", )
	}

	if len() == 1 {
		return [0], nil
	}

	// there are deltas we need to concatenate them with the first dictionary
	 := make([]arrow.Array, 0, len())
	// NOTE: at this point the dictionary data may not be trusted. it needs to
	// be validated as concatenation can crash on invalid or corrupted data.
	for ,  := range  {
		if hasUnresolvedNestedDict() {
			return nil, fmt.Errorf("arrow/ipc: delta dict with unresolved nested dictionary not implemented")
		}
		 := array.MakeFromData()
		defer .Release()

		 = append(, )
		defer .Release()
	}

	,  := array.Concatenate(, )
	if  != nil {
		return nil, 
	}
	defer .Release()
	.Data().Retain()

	.id2dict[] = []arrow.ArrayData{.Data()}
	return .Data(), nil
}

func ( *Memo) ( int64,  memory.Allocator) (arrow.ArrayData, error) {
	return .reify(, )
}

func ( *Memo) ( int64,  arrow.DataType) error {
	if ,  := .id2type[];  && !arrow.TypeEqual(, ) {
		return fmt.Errorf("arrow/ipc: conflicting dictionary types for id %d", )
	}

	.id2type[] = 
	return nil
}

func ( *Memo) ( int64) (arrow.DataType, bool) {
	,  := .id2type[]
	return , 
}

// func (memo *dictMemo) ID(v arrow.Array) int64 {
// 	id, ok := memo.dict2id[v]
// 	if ok {
// 		return id
// 	}

// 	v.Retain()
// 	id = int64(len(memo.dict2id))
// 	memo.dict2id[v] = id
// 	memo.id2dict[id] = v
// 	return id
// }

func ( Memo) ( arrow.ArrayData) bool {
	,  := .dict2id[]
	return 
}

func ( Memo) ( int64) bool {
	,  := .id2dict[]
	return 
}

func ( *Memo) ( int64,  arrow.ArrayData) {
	if ,  := .id2dict[];  {
		panic(fmt.Errorf("arrow/ipc: duplicate id=%d", ))
	}
	.Retain()
	.id2dict[] = []arrow.ArrayData{}
	.dict2id[] = 
}

func ( *Memo) ( int64,  arrow.ArrayData) {
	,  := .id2dict[]
	if ! {
		panic(fmt.Errorf("arrow/ipc: adding delta to non-existing id=%d", ))
	}
	.Retain()
	.id2dict[] = append(, )
}

// AddOrReplace puts the provided dictionary into the memo table. If it
// already exists, then the new data will replace it. Otherwise it is added
// to the memo table.
func ( *Memo) ( int64,  arrow.ArrayData) bool {
	,  := .id2dict[]
	if  {
		// replace the dictionary and release any existing ones
		for ,  := range  {
			.Release()
		}
		[0] = 
		 = [:1]
	} else {
		 = []arrow.ArrayData{}
	}
	.Retain()
	.id2dict[] = 
	return !
}

func ( arrow.RecordBatch,  *Mapper) ( []dictpair,  error) {
	 := dictCollector{mapper: }
	 = .collect()
	 = .dictionaries
	return
}

func ( *Memo,  arrow.ArrayData,  FieldPos,  memory.Allocator) error {
	 := .DataType()
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}
	if .ID() == arrow.DICTIONARY {
		,  := .Mapper.GetFieldID(.Path())
		if  != nil {
			return 
		}
		,  := .Dict(, )
		if  != nil {
			return 
		}
		.(*array.Data).SetDictionary()
		if  := (, , , );  != nil {
			return 
		}
	}
	return ResolveDictionaries(, .Children(), , )
}

func ( *Memo,  []arrow.ArrayData,  FieldPos,  memory.Allocator) error {
	for ,  := range  {
		if  == nil {
			continue
		}
		if  := ResolveFieldDict(, , .Child(int32()), );  != nil {
			return 
		}
	}
	return nil
}