// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	

	
	
	
)

// swap the endianness of the array's buffers as needed in-place to save
// the cost of reallocation.
//
// assumes that nested data buffers are never re-used, if an *array.Data
// child is re-used among the children or the dictionary then this might
// end up double-swapping (putting it back into the original endianness).
// if it is needed to support re-using the buffers, then this can be
// re-factored to instead return a NEW array.Data object with newly
// allocated buffers, rather than doing it in place.
//
// For now this is intended to be used by the IPC readers after loading
// arrays from an IPC message which currently is guaranteed to not re-use
// buffers between arrays.
func swapEndianArrayData( *array.Data) error {
	if .Offset() != 0 {
		return errors.New("unsupported data format: data.offset != 0")
	}
	if  := swapType(.DataType(), );  != nil {
		return 
	}
	return swapChildren(.Children())
}

func swapChildren( []arrow.ArrayData) ( error) {
	for  := range  {
		if  = swapEndianArrayData([].(*array.Data));  != nil {
			break
		}
	}
	return
}

func swapType( arrow.DataType,  *array.Data) ( error) {
	switch .ID() {
	case arrow.BINARY, arrow.STRING:
		swapOffsets(1, 32, )
		return
	case arrow.LARGE_BINARY, arrow.LARGE_STRING:
		swapOffsets(1, 64, )
		return
	case arrow.NULL, arrow.BOOL, arrow.INT8, arrow.UINT8,
		arrow.FIXED_SIZE_BINARY, arrow.FIXED_SIZE_LIST, arrow.STRUCT:
		return
	}

	switch dt := .(type) {
	case *arrow.Decimal128Type:
		 := arrow.Uint64Traits.CastFromBytes(.Buffers()[1].Bytes())
		 := .Buffers()[1].Len() / arrow.Decimal128SizeBytes
		for  := 0;  < ; ++ {
			 :=  * 2
			 := bits.ReverseBytes64([])
			[] = bits.ReverseBytes64([+1])
			[+1] = 
		}
	case *arrow.Decimal256Type:
		 := arrow.Uint64Traits.CastFromBytes(.Buffers()[1].Bytes())
		 := .Buffers()[1].Len() / arrow.Decimal256SizeBytes
		for  := 0;  < ; ++ {
			 :=  * 4
			 := bits.ReverseBytes64([])
			 := bits.ReverseBytes64([+1])
			 := bits.ReverseBytes64([+2])
			[] = bits.ReverseBytes64([+3])
			[+1] = 
			[+2] = 
			[+3] = 
		}
	case arrow.UnionType:
		if .Mode() == arrow.DenseMode {
			swapOffsets(2, 32, )
		}
	case *arrow.ListType:
		swapOffsets(1, 32, )
	case *arrow.LargeListType:
		swapOffsets(1, 64, )
	case *arrow.MapType:
		swapOffsets(1, 32, )
	case *arrow.DayTimeIntervalType:
		byteSwapBuffer(32, .Buffers()[1])
	case *arrow.MonthDayNanoIntervalType:
		 := arrow.MonthDayNanoIntervalTraits.CastFromBytes(.Buffers()[1].Bytes())
		for ,  := range  {
			[].Days = int32(bits.ReverseBytes32(uint32(.Days)))
			[].Months = int32(bits.ReverseBytes32(uint32(.Months)))
			[].Nanoseconds = int64(bits.ReverseBytes64(uint64(.Nanoseconds)))
		}
	case arrow.ExtensionType:
		return (.StorageType(), )
	case *arrow.DictionaryType:
		// dictionary itself was already swapped in ReadDictionary calls
		return (.IndexType, )
	case arrow.FixedWidthDataType:
		byteSwapBuffer(.BitWidth(), .Buffers()[1])
	default:
		 = fmt.Errorf("%w: swapping endianness of %s", arrow.ErrNotImplemented, )
	}

	return
}

// this can get called on an invalid Array Data object by the IPC reader,
// so we won't rely on the data.length and will instead rely on the buffer's
// own size instead.
func byteSwapBuffer( int,  *memory.Buffer) {
	if  == 1 ||  == nil {
		// if byte width == 1, no need to swap anything
		return
	}

	switch  {
	case 16:
		 := arrow.Uint16Traits.CastFromBytes(.Bytes())
		for  := range  {
			[] = bits.ReverseBytes16([])
		}
	case 32:
		 := arrow.Uint32Traits.CastFromBytes(.Bytes())
		for  := range  {
			[] = bits.ReverseBytes32([])
		}
	case 64:
		 := arrow.Uint64Traits.CastFromBytes(.Bytes())
		for  := range  {
			[] = bits.ReverseBytes64([])
		}
	}
}

func swapOffsets( int,  int,  *array.Data) {
	if .Buffers()[] == nil || .Buffers()[].Len() == 0 {
		return
	}

	// other than unions, offset has one more element than the data.length
	// don't yet implement large types, so hardcode 32bit offsets for now
	byteSwapBuffer(, .Buffers()[])
}