// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	
	
	

	
	
	
	
	
	flatbuffers 
)

// Magic string identifying an Apache Arrow file.
var Magic = []byte("ARROW1")

const (
	currentMetadataVersion = MetadataV5
	minMetadataVersion     = MetadataV4

	// constants for the extension type metadata keys for the type name and
	// any extension metadata to be passed to deserialize.
	ExtensionTypeKeyName     = "ARROW:extension:name"
	ExtensionMetadataKeyName = "ARROW:extension:metadata"

	// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
	// deeply nested schemas, it is expected the user will indicate explicitly the
	// maximum allowed recursion depth
	kMaxNestingDepth = 64
)

type startVecFunc func(b *flatbuffers.Builder, n int) flatbuffers.UOffsetT

type fieldMetadata struct {
	Len    int64
	Nulls  int64
	Offset int64
}

type bufferMetadata struct {
	Offset int64 // relative offset into the memory page to the starting byte of the buffer
	Len    int64 // absolute length in bytes of the buffer
}

type fileBlock struct {
	offset int64
	meta   int32
	body   int64

	r   io.ReaderAt
	mem memory.Allocator
}

func ( fileBlock) () int64 { return .offset }
func ( fileBlock) () int32   { return .meta }
func ( fileBlock) () int64   { return .body }

func fileBlocksToFB( *flatbuffers.Builder,  []dataBlock,  startVecFunc) flatbuffers.UOffsetT {
	(, len())
	for  := len() - 1;  >= 0; -- {
		 := []
		flatbuf.CreateBlock(, .Offset(), .Meta(), .Body())
	}

	return .EndVector(len())
}

func ( fileBlock) () (*Message, error) {
	var (
		  error
		  []byte
		 *memory.Buffer
		 *memory.Buffer
		    = .section()
	)

	 = memory.NewResizableBuffer(.mem)
	.Resize(int(.meta))
	defer .Release()

	 = .Bytes()
	_,  = io.ReadFull(, )
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read message metadata: %w", )
	}

	 := 0
	switch binary.LittleEndian.Uint32() {
	case 0:
	case kIPCContToken:
		 = 8
	default:
		// ARROW-6314: backwards compatibility for reading old IPC
		// messages produced prior to version 0.15.0
		 = 4
	}

	// drop buf-size already known from blk.Meta
	 = memory.SliceBuffer(, , int(.meta)-)
	defer .Release()

	 = memory.NewResizableBuffer(.mem)
	defer .Release()
	.Resize(int(.body))
	 = .Bytes()
	_,  = io.ReadFull(, )
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read message body: %w", )
	}

	return NewMessage(, ), nil
}

func ( fileBlock) () io.Reader {
	return io.NewSectionReader(.r, .offset, int64(.meta)+.body)
}

func unitFromFB( flatbuf.TimeUnit) arrow.TimeUnit {
	switch  {
	case flatbuf.TimeUnitSECOND:
		return arrow.Second
	case flatbuf.TimeUnitMILLISECOND:
		return arrow.Millisecond
	case flatbuf.TimeUnitMICROSECOND:
		return arrow.Microsecond
	case flatbuf.TimeUnitNANOSECOND:
		return arrow.Nanosecond
	default:
		panic(fmt.Errorf("arrow/ipc: invalid flatbuf.TimeUnit(%d) value", ))
	}
}

func unitToFB( arrow.TimeUnit) flatbuf.TimeUnit {
	switch  {
	case arrow.Second:
		return flatbuf.TimeUnitSECOND
	case arrow.Millisecond:
		return flatbuf.TimeUnitMILLISECOND
	case arrow.Microsecond:
		return flatbuf.TimeUnitMICROSECOND
	case arrow.Nanosecond:
		return flatbuf.TimeUnitNANOSECOND
	default:
		panic(fmt.Errorf("arrow/ipc: invalid arrow.TimeUnit(%d) value", ))
	}
}

// initFB is a helper function to handle flatbuffers' polymorphism.
func initFB( interface {
	() flatbuffers.Table
	([]byte, flatbuffers.UOffsetT)
},  func( *flatbuffers.Table) bool) {
	 := .()
	if !(&) {
		panic(fmt.Errorf("arrow/ipc: could not initialize %T from flatbuffer", ))
	}
	.(.Bytes, .Pos)
}

func fieldFromFB( *flatbuf.Field,  dictutils.FieldPos,  *dictutils.Memo) (arrow.Field, error) {
	var (
		 error
		   arrow.Field
	)

	.Name = string(.Name())
	.Nullable = .Nullable()
	.Metadata,  = metadataFromFB()
	if  != nil {
		return , 
	}

	 := .ChildrenLength()
	 := make([]arrow.Field, )
	for  := range  {
		var  flatbuf.Field
		if !.Children(&, ) {
			return , fmt.Errorf("arrow/ipc: could not load field child %d", )

		}
		,  := (&, .Child(int32()), )
		if  != nil {
			return , fmt.Errorf("arrow/ipc: could not convert field child %d: %w", , )
		}
		[] = 
	}

	.Type,  = typeFromFB(, , , &.Metadata, )
	if  != nil {
		return , fmt.Errorf("arrow/ipc: could not convert field type: %w", )
	}

	return , nil
}

func fieldToFB( *flatbuffers.Builder,  dictutils.FieldPos,  arrow.Field,  *dictutils.Mapper) flatbuffers.UOffsetT {
	var  = fieldVisitor{b: , memo: , pos: , meta: make(map[string]string)}
	return .result()
}

type fieldVisitor struct {
	b      *flatbuffers.Builder
	memo   *dictutils.Mapper
	pos    dictutils.FieldPos
	dtype  flatbuf.Type
	offset flatbuffers.UOffsetT
	kids   []flatbuffers.UOffsetT
	meta   map[string]string
}

func ( *fieldVisitor) ( arrow.Field) {
	 := .Type
	switch dt := .(type) {
	case *arrow.NullType:
		.dtype = flatbuf.TypeNull
		flatbuf.NullStart(.b)
		.offset = flatbuf.NullEnd(.b)

	case *arrow.BooleanType:
		.dtype = flatbuf.TypeBool
		flatbuf.BoolStart(.b)
		.offset = flatbuf.BoolEnd(.b)

	case *arrow.Uint8Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), false)

	case *arrow.Uint16Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), false)

	case *arrow.Uint32Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), false)

	case *arrow.Uint64Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), false)

	case *arrow.Int8Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), true)

	case *arrow.Int16Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), true)

	case *arrow.Int32Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), true)

	case *arrow.Int64Type:
		.dtype = flatbuf.TypeInt
		.offset = intToFB(.b, int32(.BitWidth()), true)

	case *arrow.Float16Type:
		.dtype = flatbuf.TypeFloatingPoint
		.offset = floatToFB(.b, int32(.BitWidth()))

	case *arrow.Float32Type:
		.dtype = flatbuf.TypeFloatingPoint
		.offset = floatToFB(.b, int32(.BitWidth()))

	case *arrow.Float64Type:
		.dtype = flatbuf.TypeFloatingPoint
		.offset = floatToFB(.b, int32(.BitWidth()))

	case arrow.DecimalType:
		.dtype = flatbuf.TypeDecimal
		flatbuf.DecimalStart(.b)
		flatbuf.DecimalAddPrecision(.b, .GetPrecision())
		flatbuf.DecimalAddScale(.b, .GetScale())
		flatbuf.DecimalAddBitWidth(.b, int32(.BitWidth()))
		.offset = flatbuf.DecimalEnd(.b)

	case *arrow.FixedSizeBinaryType:
		.dtype = flatbuf.TypeFixedSizeBinary
		flatbuf.FixedSizeBinaryStart(.b)
		flatbuf.FixedSizeBinaryAddByteWidth(.b, int32(.ByteWidth))
		.offset = flatbuf.FixedSizeBinaryEnd(.b)

	case *arrow.BinaryType:
		.dtype = flatbuf.TypeBinary
		flatbuf.BinaryStart(.b)
		.offset = flatbuf.BinaryEnd(.b)

	case *arrow.LargeBinaryType:
		.dtype = flatbuf.TypeLargeBinary
		flatbuf.LargeBinaryStart(.b)
		.offset = flatbuf.LargeBinaryEnd(.b)

	case *arrow.StringType:
		.dtype = flatbuf.TypeUtf8
		flatbuf.Utf8Start(.b)
		.offset = flatbuf.Utf8End(.b)

	case *arrow.LargeStringType:
		.dtype = flatbuf.TypeLargeUtf8
		flatbuf.LargeUtf8Start(.b)
		.offset = flatbuf.LargeUtf8End(.b)

	case *arrow.BinaryViewType:
		.dtype = flatbuf.TypeBinaryView
		flatbuf.BinaryViewStart(.b)
		.offset = flatbuf.BinaryViewEnd(.b)

	case *arrow.StringViewType:
		.dtype = flatbuf.TypeUtf8View
		flatbuf.Utf8ViewStart(.b)
		.offset = flatbuf.Utf8ViewEnd(.b)

	case *arrow.Date32Type:
		.dtype = flatbuf.TypeDate
		flatbuf.DateStart(.b)
		flatbuf.DateAddUnit(.b, flatbuf.DateUnitDAY)
		.offset = flatbuf.DateEnd(.b)

	case *arrow.Date64Type:
		.dtype = flatbuf.TypeDate
		flatbuf.DateStart(.b)
		flatbuf.DateAddUnit(.b, flatbuf.DateUnitMILLISECOND)
		.offset = flatbuf.DateEnd(.b)

	case *arrow.Time32Type:
		.dtype = flatbuf.TypeTime
		flatbuf.TimeStart(.b)
		flatbuf.TimeAddUnit(.b, unitToFB(.Unit))
		flatbuf.TimeAddBitWidth(.b, 32)
		.offset = flatbuf.TimeEnd(.b)

	case *arrow.Time64Type:
		.dtype = flatbuf.TypeTime
		flatbuf.TimeStart(.b)
		flatbuf.TimeAddUnit(.b, unitToFB(.Unit))
		flatbuf.TimeAddBitWidth(.b, 64)
		.offset = flatbuf.TimeEnd(.b)

	case *arrow.TimestampType:
		.dtype = flatbuf.TypeTimestamp
		 := unitToFB(.Unit)
		var  flatbuffers.UOffsetT
		if .TimeZone != "" {
			 = .b.CreateString(.TimeZone)
		}
		flatbuf.TimestampStart(.b)
		flatbuf.TimestampAddUnit(.b, )
		flatbuf.TimestampAddTimezone(.b, )
		.offset = flatbuf.TimestampEnd(.b)

	case *arrow.StructType:
		.dtype = flatbuf.TypeStruct_
		 := make([]flatbuffers.UOffsetT, .NumFields())
		for ,  := range .Fields() {
			[] = fieldToFB(.b, .pos.Child(int32()), , .memo)
		}
		flatbuf.Struct_Start(.b)
		for  := len() - 1;  >= 0; -- {
			.b.PrependUOffsetT([])
		}
		.offset = flatbuf.Struct_End(.b)
		.kids = append(.kids, ...)

	case *arrow.ListType:
		.dtype = flatbuf.TypeList
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.ListStart(.b)
		.offset = flatbuf.ListEnd(.b)

	case *arrow.LargeListType:
		.dtype = flatbuf.TypeLargeList
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.LargeListStart(.b)
		.offset = flatbuf.LargeListEnd(.b)

	case *arrow.ListViewType:
		.dtype = flatbuf.TypeListView
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.ListViewStart(.b)
		.offset = flatbuf.ListViewEnd(.b)

	case *arrow.LargeListViewType:
		.dtype = flatbuf.TypeLargeListView
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.LargeListViewStart(.b)
		.offset = flatbuf.LargeListViewEnd(.b)

	case *arrow.FixedSizeListType:
		.dtype = flatbuf.TypeFixedSizeList
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.FixedSizeListStart(.b)
		flatbuf.FixedSizeListAddListSize(.b, .Len())
		.offset = flatbuf.FixedSizeListEnd(.b)

	case *arrow.MonthIntervalType:
		.dtype = flatbuf.TypeInterval
		flatbuf.IntervalStart(.b)
		flatbuf.IntervalAddUnit(.b, flatbuf.IntervalUnitYEAR_MONTH)
		.offset = flatbuf.IntervalEnd(.b)

	case *arrow.DayTimeIntervalType:
		.dtype = flatbuf.TypeInterval
		flatbuf.IntervalStart(.b)
		flatbuf.IntervalAddUnit(.b, flatbuf.IntervalUnitDAY_TIME)
		.offset = flatbuf.IntervalEnd(.b)

	case *arrow.MonthDayNanoIntervalType:
		.dtype = flatbuf.TypeInterval
		flatbuf.IntervalStart(.b)
		flatbuf.IntervalAddUnit(.b, flatbuf.IntervalUnitMONTH_DAY_NANO)
		.offset = flatbuf.IntervalEnd(.b)

	case *arrow.DurationType:
		.dtype = flatbuf.TypeDuration
		 := unitToFB(.Unit)
		flatbuf.DurationStart(.b)
		flatbuf.DurationAddUnit(.b, )
		.offset = flatbuf.DurationEnd(.b)

	case *arrow.MapType:
		.dtype = flatbuf.TypeMap
		.kids = append(.kids, fieldToFB(.b, .pos.Child(0), .ElemField(), .memo))
		flatbuf.MapStart(.b)
		flatbuf.MapAddKeysSorted(.b, .KeysSorted)
		.offset = flatbuf.MapEnd(.b)

	case *arrow.RunEndEncodedType:
		.dtype = flatbuf.TypeRunEndEncoded
		var  [2]flatbuffers.UOffsetT
		[0] = fieldToFB(.b, .pos.Child(0),
			arrow.Field{Name: "run_ends", Type: .RunEnds()}, .memo)
		[1] = fieldToFB(.b, .pos.Child(1),
			arrow.Field{Name: "values", Type: .Encoded(), Nullable: true}, .memo)
		flatbuf.RunEndEncodedStart(.b)
		.b.PrependUOffsetT([1])
		.b.PrependUOffsetT([0])
		.offset = flatbuf.RunEndEncodedEnd(.b)
		.kids = append(.kids, [0], [1])

	case arrow.ExtensionType:
		.Type = .StorageType()
		.()
		.meta[ExtensionTypeKeyName] = .ExtensionName()
		.meta[ExtensionMetadataKeyName] = string(.Serialize())

	case *arrow.DictionaryType:
		.Type = .ValueType
		.()

	case arrow.UnionType:
		.dtype = flatbuf.TypeUnion
		 := make([]flatbuffers.UOffsetT, .NumFields())
		for ,  := range .Fields() {
			[] = fieldToFB(.b, .pos.Child(int32()), , .memo)
		}

		 := .TypeCodes()
		flatbuf.UnionStartTypeIdsVector(.b, len())

		for  := len() - 1;  >= 0; -- {
			.b.PlaceInt32(int32([]))
		}
		 := .b.EndVector(len(.TypeCodes()))
		flatbuf.UnionStart(.b)
		switch .Mode() {
		case arrow.SparseMode:
			flatbuf.UnionAddMode(.b, flatbuf.UnionModeSparse)
		case arrow.DenseMode:
			flatbuf.UnionAddMode(.b, flatbuf.UnionModeDense)
		default:
			panic("invalid union mode")
		}
		flatbuf.UnionAddTypeIds(.b, )
		.offset = flatbuf.UnionEnd(.b)
		.kids = append(.kids, ...)

	default:
		 := fmt.Errorf("arrow/ipc: invalid data type %v", )
		panic() // FIXME(sbinet): implement all data-types.
	}
}

func ( *fieldVisitor) ( arrow.Field) flatbuffers.UOffsetT {
	 := .b.CreateString(.Name)

	.visit()

	flatbuf.FieldStartChildrenVector(.b, len(.kids))
	for  := len(.kids) - 1;  >= 0; -- {
		.b.PrependUOffsetT(.kids[])
	}
	 := .b.EndVector(len(.kids))

	 := .Type
	if .ID() == arrow.EXTENSION {
		 = .(arrow.ExtensionType).StorageType()
	}

	var  flatbuffers.UOffsetT
	if .ID() == arrow.DICTIONARY {
		 := .Type.(*arrow.DictionaryType).IndexType.(arrow.FixedWidthDataType)

		,  := .memo.GetFieldID(.pos.Path())
		if  != nil {
			panic()
		}
		var  bool
		switch .ID() {
		case arrow.UINT8, arrow.UINT16, arrow.UINT32, arrow.UINT64:
			 = false
		case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64:
			 = true
		}
		 := intToFB(.b, int32(.BitWidth()), )
		flatbuf.DictionaryEncodingStart(.b)
		flatbuf.DictionaryEncodingAddId(.b, )
		flatbuf.DictionaryEncodingAddIndexType(.b, )
		flatbuf.DictionaryEncodingAddIsOrdered(.b, .Type.(*arrow.DictionaryType).Ordered)
		 = flatbuf.DictionaryEncodingEnd(.b)
	}

	var (
		 flatbuffers.UOffsetT
		    []flatbuffers.UOffsetT
	)
	for ,  := range .Metadata.Keys() {
		 := .Metadata.Values()[]
		 := .b.CreateString()
		 := .b.CreateString()
		flatbuf.KeyValueStart(.b)
		flatbuf.KeyValueAddKey(.b, )
		flatbuf.KeyValueAddValue(.b, )
		 = append(, flatbuf.KeyValueEnd(.b))
	}
	{
		 := make([]string, 0, len(.meta))
		for  := range .meta {
			 = append(, )
		}
		sort.Strings()
		for ,  := range  {
			 := .meta[]
			 := .b.CreateString()
			 := .b.CreateString()
			flatbuf.KeyValueStart(.b)
			flatbuf.KeyValueAddKey(.b, )
			flatbuf.KeyValueAddValue(.b, )
			 = append(, flatbuf.KeyValueEnd(.b))
		}
	}
	if len() > 0 {
		flatbuf.FieldStartCustomMetadataVector(.b, len())
		for  := len() - 1;  >= 0; -- {
			.b.PrependUOffsetT([])
		}
		 = .b.EndVector(len())
	}

	flatbuf.FieldStart(.b)
	flatbuf.FieldAddName(.b, )
	flatbuf.FieldAddNullable(.b, .Nullable)
	flatbuf.FieldAddTypeType(.b, .dtype)
	flatbuf.FieldAddType(.b, .offset)
	flatbuf.FieldAddDictionary(.b, )
	flatbuf.FieldAddChildren(.b, )
	flatbuf.FieldAddCustomMetadata(.b, )

	 := flatbuf.FieldEnd(.b)

	return 
}

func typeFromFB( *flatbuf.Field,  dictutils.FieldPos,  []arrow.Field,  *arrow.Metadata,  *dictutils.Memo) (arrow.DataType, error) {
	var  flatbuffers.Table
	if !.Type(&) {
		return nil, fmt.Errorf("arrow/ipc: could not load field type data")
	}

	,  := concreteTypeFromFB(.TypeType(), , )
	if  != nil {
		return , 
	}

	var (
		        = int64(-1)
		 arrow.DataType
		      = .Dictionary(nil)
	)
	if  != nil {
		var  flatbuf.Int
		.IndexType(&)
		,  := intFromFB()
		if  != nil {
			return nil, 
		}

		 = 
		 = &arrow.DictionaryType{IndexType: , ValueType: , Ordered: .IsOrdered()}
		 = .Id()

		if  = .Mapper.AddField(, .Path());  != nil {
			return , 
		}
		if  = .AddType(, );  != nil {
			return , 
		}

	}

	// look for extension metadata in custom metadata field.
	if .Len() > 0 {
		 := .FindKey(ExtensionTypeKeyName)
		if  < 0 {
			return , 
		}

		 := arrow.GetExtensionType(.Values()[])
		if  == nil {
			// if the extension type is unknown, we do not error here.
			// simply return the storage type.
			return , 
		}

		var (
			    string
			 int
		)

		if  = .FindKey(ExtensionMetadataKeyName);  >= 0 {
			 = .Values()[]
		}

		,  = .Deserialize(, )
		if  != nil {
			return , 
		}

		 := .Keys()
		 := .Values()
		if  < 0 {
			// if there was no extension metadata, just the name, we only have to
			// remove the extension name metadata key/value to ensure roundtrip
			// metadata consistency
			* = arrow.NewMetadata(append([:], [+1:]...), append([:], [+1:]...))
		} else {
			// if there was extension metadata, we need to remove both the type name
			// and the extension metadata keys and values.
			 := make([]string, 0, .Len()-2)
			 := make([]string, 0, .Len()-2)
			for  := range  {
				if  !=  &&  !=  { // copy everything except the extension metadata keys/values
					 = append(, [])
					 = append(, [])
				}
			}
			* = arrow.NewMetadata(, )
		}
	}

	return , 
}

func concreteTypeFromFB( flatbuf.Type,  flatbuffers.Table,  []arrow.Field) (arrow.DataType, error) {
	switch  {
	case flatbuf.TypeNONE:
		return nil, fmt.Errorf("arrow/ipc: Type metadata cannot be none")

	case flatbuf.TypeNull:
		return arrow.Null, nil

	case flatbuf.TypeInt:
		var  flatbuf.Int
		.Init(.Bytes, .Pos)
		return intFromFB()

	case flatbuf.TypeFloatingPoint:
		var  flatbuf.FloatingPoint
		.Init(.Bytes, .Pos)
		return floatFromFB()

	case flatbuf.TypeDecimal:
		var  flatbuf.Decimal
		.Init(.Bytes, .Pos)
		return decimalFromFB()

	case flatbuf.TypeBinary:
		return arrow.BinaryTypes.Binary, nil

	case flatbuf.TypeFixedSizeBinary:
		var  flatbuf.FixedSizeBinary
		.Init(.Bytes, .Pos)
		return &arrow.FixedSizeBinaryType{ByteWidth: int(.ByteWidth())}, nil

	case flatbuf.TypeUtf8:
		return arrow.BinaryTypes.String, nil

	case flatbuf.TypeLargeBinary:
		return arrow.BinaryTypes.LargeBinary, nil

	case flatbuf.TypeLargeUtf8:
		return arrow.BinaryTypes.LargeString, nil

	case flatbuf.TypeUtf8View:
		return arrow.BinaryTypes.StringView, nil

	case flatbuf.TypeBinaryView:
		return arrow.BinaryTypes.BinaryView, nil

	case flatbuf.TypeBool:
		return arrow.FixedWidthTypes.Boolean, nil

	case flatbuf.TypeList:
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: List must have exactly 1 child field (got=%d)", len())
		}
		 := arrow.ListOfField([0])
		return , nil

	case flatbuf.TypeLargeList:
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: LargeList must have exactly 1 child field (got=%d)", len())
		}
		 := arrow.LargeListOfField([0])
		return , nil

	case flatbuf.TypeListView:
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: ListView must have exactly 1 child field (got=%d)", len())
		}
		 := arrow.ListViewOfField([0])
		return , nil

	case flatbuf.TypeLargeListView:
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: LargeListView must have exactly 1 child field (got=%d)", len())
		}
		 := arrow.LargeListViewOfField([0])
		return , nil

	case flatbuf.TypeFixedSizeList:
		var  flatbuf.FixedSizeList
		.Init(.Bytes, .Pos)
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: FixedSizeList must have exactly 1 child field (got=%d)", len())
		}
		 := arrow.FixedSizeListOfField(.ListSize(), [0])
		return , nil

	case flatbuf.TypeStruct_:
		return arrow.StructOf(...), nil

	case flatbuf.TypeUnion:
		var  flatbuf.Union
		.Init(.Bytes, .Pos)
		var (
			    arrow.UnionMode
			 []arrow.UnionTypeCode
		)

		switch .Mode() {
		case flatbuf.UnionModeSparse:
			 = arrow.SparseMode
		case flatbuf.UnionModeDense:
			 = arrow.DenseMode
		}

		 := .TypeIdsLength()

		if  == 0 {
			for  := range  {
				 = append(, int8())
			}
		} else {
			for  := 0;  < ; ++ {
				 := .TypeIds()
				 := arrow.UnionTypeCode()
				if int32() !=  {
					return nil, errors.New("union type id out of bounds")
				}
				 = append(, )
			}
		}

		return arrow.UnionOf(, , ), nil

	case flatbuf.TypeTime:
		var  flatbuf.Time
		.Init(.Bytes, .Pos)
		return timeFromFB()

	case flatbuf.TypeTimestamp:
		var  flatbuf.Timestamp
		.Init(.Bytes, .Pos)
		return timestampFromFB()

	case flatbuf.TypeDate:
		var  flatbuf.Date
		.Init(.Bytes, .Pos)
		return dateFromFB()

	case flatbuf.TypeInterval:
		var  flatbuf.Interval
		.Init(.Bytes, .Pos)
		return intervalFromFB()

	case flatbuf.TypeDuration:
		var  flatbuf.Duration
		.Init(.Bytes, .Pos)
		return durationFromFB()

	case flatbuf.TypeMap:
		if len() != 1 {
			return nil, fmt.Errorf("arrow/ipc: Map must have exactly 1 child field")
		}

		if [0].Nullable || [0].Type.ID() != arrow.STRUCT || len([0].Type.(*arrow.StructType).Fields()) != 2 {
			return nil, fmt.Errorf("arrow/ipc: Map's key-item pairs must be non-nullable structs")
		}

		 := [0].Type.(*arrow.StructType)
		if .Field(0).Nullable {
			return nil, fmt.Errorf("arrow/ipc: Map's keys must be non-nullable")
		}

		var  flatbuf.Map
		.Init(.Bytes, .Pos)
		 := arrow.MapOf(.Field(0).Type, .Field(1).Type)
		.SetItemNullable(.Field(1).Nullable)
		.KeysSorted = .KeysSorted()
		return , nil

	case flatbuf.TypeRunEndEncoded:
		if len() != 2 {
			return nil, fmt.Errorf("%w: arrow/ipc: RunEndEncoded must have exactly 2 child fields", arrow.ErrInvalid)
		}
		switch [0].Type.ID() {
		case arrow.INT16, arrow.INT32, arrow.INT64:
		default:
			return nil, fmt.Errorf("%w: arrow/ipc: run-end encoded run_ends field must be one of int16, int32, or int64 type", arrow.ErrInvalid)
		}
		return arrow.RunEndEncodedOf([0].Type, [1].Type), nil

	default:
		panic(fmt.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[]))
	}
}

func intFromFB( flatbuf.Int) (arrow.DataType, error) {
	 := .BitWidth()
	if  > 64 {
		return nil, fmt.Errorf("arrow/ipc: integers with more than 64 bits not implemented (bits=%d)", )
	}
	if  < 8 {
		return nil, fmt.Errorf("arrow/ipc: integers with less than 8 bits not implemented (bits=%d)", )
	}

	switch  {
	case 8:
		if !.IsSigned() {
			return arrow.PrimitiveTypes.Uint8, nil
		}
		return arrow.PrimitiveTypes.Int8, nil

	case 16:
		if !.IsSigned() {
			return arrow.PrimitiveTypes.Uint16, nil
		}
		return arrow.PrimitiveTypes.Int16, nil

	case 32:
		if !.IsSigned() {
			return arrow.PrimitiveTypes.Uint32, nil
		}
		return arrow.PrimitiveTypes.Int32, nil

	case 64:
		if !.IsSigned() {
			return arrow.PrimitiveTypes.Uint64, nil
		}
		return arrow.PrimitiveTypes.Int64, nil
	default:
		return nil, fmt.Errorf("arrow/ipc: integers not in cstdint are not implemented")
	}
}

func intToFB( *flatbuffers.Builder,  int32,  bool) flatbuffers.UOffsetT {
	flatbuf.IntStart()
	flatbuf.IntAddBitWidth(, )
	flatbuf.IntAddIsSigned(, )
	return flatbuf.IntEnd()
}

func floatFromFB( flatbuf.FloatingPoint) (arrow.DataType, error) {
	switch  := .Precision();  {
	case flatbuf.PrecisionHALF:
		return arrow.FixedWidthTypes.Float16, nil
	case flatbuf.PrecisionSINGLE:
		return arrow.PrimitiveTypes.Float32, nil
	case flatbuf.PrecisionDOUBLE:
		return arrow.PrimitiveTypes.Float64, nil
	default:
		return nil, fmt.Errorf("arrow/ipc: floating point type with %d precision not implemented", )
	}
}

func floatToFB( *flatbuffers.Builder,  int32) flatbuffers.UOffsetT {
	switch  {
	case 16:
		flatbuf.FloatingPointStart()
		flatbuf.FloatingPointAddPrecision(, flatbuf.PrecisionHALF)
		return flatbuf.FloatingPointEnd()
	case 32:
		flatbuf.FloatingPointStart()
		flatbuf.FloatingPointAddPrecision(, flatbuf.PrecisionSINGLE)
		return flatbuf.FloatingPointEnd()
	case 64:
		flatbuf.FloatingPointStart()
		flatbuf.FloatingPointAddPrecision(, flatbuf.PrecisionDOUBLE)
		return flatbuf.FloatingPointEnd()
	default:
		panic(fmt.Errorf("arrow/ipc: invalid floating point precision %d-bits", ))
	}
}

func decimalFromFB( flatbuf.Decimal) (arrow.DataType, error) {
	switch .BitWidth() {
	case 32:
		return &arrow.Decimal32Type{Precision: .Precision(), Scale: .Scale()}, nil
	case 64:
		return &arrow.Decimal64Type{Precision: .Precision(), Scale: .Scale()}, nil
	case 128:
		return &arrow.Decimal128Type{Precision: .Precision(), Scale: .Scale()}, nil
	case 256:
		return &arrow.Decimal256Type{Precision: .Precision(), Scale: .Scale()}, nil
	default:
		return nil, fmt.Errorf("arrow/ipc: invalid decimal bitwidth: %d", .BitWidth())
	}
}

func timeFromFB( flatbuf.Time) (arrow.DataType, error) {
	 := .BitWidth()
	 := unitFromFB(.Unit())

	switch  {
	case 32:
		switch  {
		case arrow.Millisecond:
			return arrow.FixedWidthTypes.Time32ms, nil
		case arrow.Second:
			return arrow.FixedWidthTypes.Time32s, nil
		default:
			return nil, fmt.Errorf("arrow/ipc: Time32 type with %v unit not implemented", )
		}
	case 64:
		switch  {
		case arrow.Nanosecond:
			return arrow.FixedWidthTypes.Time64ns, nil
		case arrow.Microsecond:
			return arrow.FixedWidthTypes.Time64us, nil
		default:
			return nil, fmt.Errorf("arrow/ipc: Time64 type with %v unit not implemented", )
		}
	default:
		return nil, fmt.Errorf("arrow/ipc: Time type with %d bitwidth not implemented", )
	}
}

func timestampFromFB( flatbuf.Timestamp) (arrow.DataType, error) {
	 := unitFromFB(.Unit())
	 := string(.Timezone())
	return &arrow.TimestampType{Unit: , TimeZone: }, nil
}

func dateFromFB( flatbuf.Date) (arrow.DataType, error) {
	switch .Unit() {
	case flatbuf.DateUnitDAY:
		return arrow.FixedWidthTypes.Date32, nil
	case flatbuf.DateUnitMILLISECOND:
		return arrow.FixedWidthTypes.Date64, nil
	}
	return nil, fmt.Errorf("arrow/ipc: Date type with %d unit not implemented", .Unit())
}

func intervalFromFB( flatbuf.Interval) (arrow.DataType, error) {
	switch .Unit() {
	case flatbuf.IntervalUnitYEAR_MONTH:
		return arrow.FixedWidthTypes.MonthInterval, nil
	case flatbuf.IntervalUnitDAY_TIME:
		return arrow.FixedWidthTypes.DayTimeInterval, nil
	case flatbuf.IntervalUnitMONTH_DAY_NANO:
		return arrow.FixedWidthTypes.MonthDayNanoInterval, nil
	}
	return nil, fmt.Errorf("arrow/ipc: Interval type with %d unit not implemented", .Unit())
}

func durationFromFB( flatbuf.Duration) (arrow.DataType, error) {
	switch .Unit() {
	case flatbuf.TimeUnitSECOND:
		return arrow.FixedWidthTypes.Duration_s, nil
	case flatbuf.TimeUnitMILLISECOND:
		return arrow.FixedWidthTypes.Duration_ms, nil
	case flatbuf.TimeUnitMICROSECOND:
		return arrow.FixedWidthTypes.Duration_us, nil
	case flatbuf.TimeUnitNANOSECOND:
		return arrow.FixedWidthTypes.Duration_ns, nil
	}
	return nil, fmt.Errorf("arrow/ipc: Duration type with %d unit not implemented", .Unit())
}

type customMetadataer interface {
	CustomMetadataLength() int
	CustomMetadata(*flatbuf.KeyValue, int) bool
}

func metadataFromFB( customMetadataer) (arrow.Metadata, error) {
	var (
		 = make([]string, .CustomMetadataLength())
		 = make([]string, .CustomMetadataLength())
	)

	for  := range  {
		var  flatbuf.KeyValue
		if !.CustomMetadata(&, ) {
			return arrow.Metadata{}, fmt.Errorf("arrow/ipc: could not read key-value %d from flatbuffer", )
		}
		[] = string(.Key())
		[] = string(.Value())
	}

	return arrow.NewMetadata(, ), nil
}

func metadataToFB( *flatbuffers.Builder,  arrow.Metadata,  startVecFunc) flatbuffers.UOffsetT {
	if .Len() == 0 {
		return 0
	}

	 := .Len()
	 := make([]flatbuffers.UOffsetT, )
	for  := range  {
		 := .CreateString(.Keys()[])
		 := .CreateString(.Values()[])
		flatbuf.KeyValueStart()
		flatbuf.KeyValueAddKey(, )
		flatbuf.KeyValueAddValue(, )
		[] = flatbuf.KeyValueEnd()
	}

	(, )
	for  :=  - 1;  >= 0; -- {
		.PrependUOffsetT([])
	}
	return .EndVector()
}

func schemaFromFB( *flatbuf.Schema,  *dictutils.Memo) (*arrow.Schema, error) {
	var (
		    error
		 = make([]arrow.Field, .FieldsLength())
		    = dictutils.NewFieldPos()
	)

	for  := range  {
		var  flatbuf.Field
		if !.Fields(&, ) {
			return nil, fmt.Errorf("arrow/ipc: could not read field %d from schema", )
		}

		[],  = fieldFromFB(&, .Child(int32()), )
		if  != nil {
			return nil, fmt.Errorf("arrow/ipc: could not convert field %d from flatbuf: %w", , )
		}
	}

	,  := metadataFromFB()
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not convert schema metadata from flatbuf: %w", )
	}

	return arrow.NewSchemaWithEndian(, &, endian.Endianness(.Endianness())), nil
}

func schemaToFB( *flatbuffers.Builder,  *arrow.Schema,  *dictutils.Mapper) flatbuffers.UOffsetT {
	 := make([]flatbuffers.UOffsetT, .NumFields())
	 := dictutils.NewFieldPos()
	for  := 0;  < .NumFields(); ++ {
		[] = fieldToFB(, .Child(int32()), .Field(), )
	}

	flatbuf.SchemaStartFieldsVector(, len())
	for  := len() - 1;  >= 0; -- {
		.PrependUOffsetT([])
	}
	 := .EndVector(len())

	 := metadataToFB(, .Metadata(), flatbuf.SchemaStartCustomMetadataVector)

	flatbuf.SchemaStart()
	flatbuf.SchemaAddEndianness(, flatbuf.Endianness(.Endianness()))
	flatbuf.SchemaAddFields(, )
	flatbuf.SchemaAddCustomMetadata(, )
	 := flatbuf.SchemaEnd()

	return 
}

// payloadFromSchema returns a slice of payloads corresponding to the given schema.
// Callers of payloadFromSchema will need to call Release after use.
func payloadFromSchema( *arrow.Schema,  memory.Allocator,  *dictutils.Mapper) payloads {
	 := make(payloads, 1)
	[0].msg = MessageSchema
	[0].meta = writeSchemaMessage(, , )

	return 
}

func writeFBBuilder( *flatbuffers.Builder,  memory.Allocator) *memory.Buffer {
	 := .FinishedBytes()
	 := memory.NewResizableBuffer()
	.Resize(len())
	copy(.Bytes(), )
	return 
}

func writeMessageFB( *flatbuffers.Builder,  memory.Allocator,  flatbuf.MessageHeader,  flatbuffers.UOffsetT,  int64) *memory.Buffer {

	flatbuf.MessageStart()
	flatbuf.MessageAddVersion(, flatbuf.MetadataVersion(currentMetadataVersion))
	flatbuf.MessageAddHeaderType(, )
	flatbuf.MessageAddHeader(, )
	flatbuf.MessageAddBodyLength(, )
	 := flatbuf.MessageEnd()
	.Finish()

	return writeFBBuilder(, )
}

func writeSchemaMessage( *arrow.Schema,  memory.Allocator,  *dictutils.Mapper) *memory.Buffer {
	 := flatbuffers.NewBuilder(1024)
	 := schemaToFB(, , )
	return writeMessageFB(, , flatbuf.MessageHeaderSchema, , 0)
}

func writeFileFooter( *arrow.Schema, ,  []dataBlock,  io.Writer) error {
	var (
		    = flatbuffers.NewBuilder(1024)
		 dictutils.Mapper
	)
	.ImportSchema()

	 := schemaToFB(, , &)
	 := fileBlocksToFB(, , flatbuf.FooterStartDictionariesVector)
	 := fileBlocksToFB(, , flatbuf.FooterStartRecordBatchesVector)

	flatbuf.FooterStart()
	flatbuf.FooterAddVersion(, flatbuf.MetadataVersion(currentMetadataVersion))
	flatbuf.FooterAddSchema(, )
	flatbuf.FooterAddDictionaries(, )
	flatbuf.FooterAddRecordBatches(, )
	 := flatbuf.FooterEnd()

	.Finish()

	,  := .Write(.FinishedBytes())
	return 
}

func writeRecordMessage( memory.Allocator, ,  int64,  []fieldMetadata,  []bufferMetadata,  flatbuf.CompressionType,  []int64) *memory.Buffer {
	 := flatbuffers.NewBuilder(0)
	 := recordToFB(, , , , , , )
	return writeMessageFB(, , flatbuf.MessageHeaderRecordBatch, , )
}

func writeDictionaryMessage( memory.Allocator,  int64,  bool, ,  int64,  []fieldMetadata,  []bufferMetadata,  flatbuf.CompressionType,  []int64) *memory.Buffer {
	 := flatbuffers.NewBuilder(0)
	 := recordToFB(, , , , , , )

	flatbuf.DictionaryBatchStart()
	flatbuf.DictionaryBatchAddId(, )
	flatbuf.DictionaryBatchAddData(, )
	flatbuf.DictionaryBatchAddIsDelta(, )
	 := flatbuf.DictionaryBatchEnd()
	return writeMessageFB(, , flatbuf.MessageHeaderDictionaryBatch, , )
}

func recordToFB( *flatbuffers.Builder, ,  int64,  []fieldMetadata,  []bufferMetadata,  flatbuf.CompressionType,  []int64) flatbuffers.UOffsetT {
	 := writeFieldNodes(, , flatbuf.RecordBatchStartNodesVector)
	 := writeBuffers(, , flatbuf.RecordBatchStartBuffersVector)
	var  flatbuffers.UOffsetT
	if  != -1 {
		 = writeBodyCompression(, )
	}

	var  *flatbuffers.UOffsetT
	if len() > 0 {
		flatbuf.RecordBatchStartVariadicBufferCountsVector(, len())
		for  := len() - 1;  >= 0; -- {
			.PrependInt64([])
		}
		 := .EndVector(len())
		 = &
	}

	flatbuf.RecordBatchStart()
	flatbuf.RecordBatchAddLength(, )
	flatbuf.RecordBatchAddNodes(, )
	flatbuf.RecordBatchAddBuffers(, )
	if  != nil {
		flatbuf.RecordBatchAddVariadicBufferCounts(, *)
	}

	if  != -1 {
		flatbuf.RecordBatchAddCompression(, )
	}

	return flatbuf.RecordBatchEnd()
}

func writeFieldNodes( *flatbuffers.Builder,  []fieldMetadata,  startVecFunc) flatbuffers.UOffsetT {

	(, len())
	for  := len() - 1;  >= 0; -- {
		 := []
		if .Offset != 0 {
			panic(fmt.Errorf("arrow/ipc: field metadata for IPC must have offset 0"))
		}
		flatbuf.CreateFieldNode(, .Len, .Nulls)
	}

	return .EndVector(len())
}

func writeBuffers( *flatbuffers.Builder,  []bufferMetadata,  startVecFunc) flatbuffers.UOffsetT {
	(, len())
	for  := len() - 1;  >= 0; -- {
		 := []
		flatbuf.CreateBuffer(, .Offset, .Len)
	}
	return .EndVector(len())
}

func writeBodyCompression( *flatbuffers.Builder,  flatbuf.CompressionType) flatbuffers.UOffsetT {
	flatbuf.BodyCompressionStart()
	flatbuf.BodyCompressionAddCodec(, )
	flatbuf.BodyCompressionAddMethod(, flatbuf.BodyCompressionMethodBUFFER)
	return flatbuf.BodyCompressionEnd()
}

func writeMessage( *memory.Buffer,  int32,  io.Writer) (int, error) {
	var (
		   int
		 error
	)

	// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
	 := int32(.Len()) + 8
	 :=  % 
	if  != 0 {
		 +=  - 
	}

	 := make([]byte, 4)

	// write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
	binary.LittleEndian.PutUint32(, kIPCContToken)
	_,  = .Write()
	if  != nil {
		return 0, fmt.Errorf("arrow/ipc: could not write continuation bit indicator: %w", )
	}

	// the returned message size includes the length prefix, the flatbuffer, + padding
	 = int()

	// write the flatbuffer size prefix, including padding
	 :=  - 8
	binary.LittleEndian.PutUint32(, uint32())
	_,  = .Write()
	if  != nil {
		return , fmt.Errorf("arrow/ipc: could not write message flatbuffer size prefix: %w", )
	}

	// write the flatbuffer
	_,  = .Write(.Bytes())
	if  != nil {
		return , fmt.Errorf("arrow/ipc: could not write message flatbuffer: %w", )
	}

	// write any padding
	 :=  - int32(.Len()) - 8
	if  > 0 {
		_,  = .Write(paddingBytes[:])
		if  != nil {
			return , fmt.Errorf("arrow/ipc: could not write message padding bytes: %w", )
		}
	}

	return , 
}