// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	
	
)

type streamWriter struct {
	w   io.Writer
	pos int64
}

func ( *streamWriter) () error { return nil }
func ( *streamWriter) () error {
	,  := .Write(kEOS[:])
	return 
}

func ( *streamWriter) ( Payload) error {
	,  := writeIPCPayload(, )
	if  != nil {
		return 
	}
	return nil
}

func ( *streamWriter) ( []byte) (int, error) {
	,  := .w.Write()
	.pos += int64()
	return , 
}

func hasNestedDict( arrow.ArrayData) bool {
	if .DataType().ID() == arrow.DICTIONARY {
		return true
	}
	for ,  := range .Children() {
		if () {
			return true
		}
	}
	return false
}

// Writer is an Arrow stream writer.
type Writer struct {
	w io.Writer

	mem memory.Allocator
	pw  PayloadWriter

	started         bool
	schema          *arrow.Schema
	mapper          dictutils.Mapper
	codec           flatbuf.CompressionType
	compressNP      int
	compressors     []compressor
	minSpaceSavings *float64

	// map of the last written dictionaries by id
	// so we can avoid writing the same dictionary over and over
	lastWrittenDicts map[int64]arrow.Array
	emitDictDeltas   bool
}

// NewWriterWithPayloadWriter constructs a writer with the provided payload writer
// instead of the default stream payload writer. This makes the writer more
// reusable such as by the Arrow Flight writer.
func ( PayloadWriter,  ...Option) *Writer {
	 := newConfig(...)
	return &Writer{
		mem:             .alloc,
		pw:              ,
		schema:          .schema,
		codec:           .codec,
		compressNP:      .compressNP,
		minSpaceSavings: .minSpaceSavings,
		emitDictDeltas:  .emitDictDeltas,
		compressors:     make([]compressor, .compressNP),
	}
}

// NewWriter returns a writer that writes records to the provided output stream.
func ( io.Writer,  ...Option) *Writer {
	 := newConfig(...)
	return &Writer{
		w:              ,
		mem:            .alloc,
		pw:             &streamWriter{w: },
		schema:         .schema,
		codec:          .codec,
		emitDictDeltas: .emitDictDeltas,
		compressNP:     .compressNP,
		compressors:    make([]compressor, .compressNP),
	}
}

func ( *Writer) () error {
	if !.started {
		 := .start()
		if  != nil {
			return 
		}
	}

	if .pw == nil {
		return nil
	}

	 := .pw.Close()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not close payload writer: %w", )
	}
	.pw = nil

	for ,  := range .lastWrittenDicts {
		.Release()
	}

	return nil
}

func ( *Writer) ( arrow.RecordBatch) ( error) {
	defer func() {
		if  := recover();  != nil {
			 = utils.FormatRecoveredError("arrow/ipc: unknown error while writing", )
		}
	}()

	 := .Schema()

	if !.started {
		if .schema == nil {
			.schema = 
		}
		 := .start()
		if  != nil {
			return 
		}
	}

	if  == nil || !.Equal(.schema) {
		return errInconsistentSchema
	}

	const  = true
	var (
		 = Payload{msg: MessageRecordBatch}
		  = newRecordEncoder(
			.mem,
			0,
			kMaxNestingDepth,
			,
			.codec,
			.compressNP,
			.minSpaceSavings,
			.compressors,
		)
	)
	defer .Release()

	 = writeDictionaryPayloads(.mem, , false, .emitDictDeltas, &.mapper, .lastWrittenDicts, .pw, )
	if  != nil {
		return fmt.Errorf("arrow/ipc: failure writing dictionary batches: %w", )
	}

	.reset()
	if  := .Encode(&, );  != nil {
		return fmt.Errorf("arrow/ipc: could not encode record to payload: %w", )
	}

	return .pw.WritePayload()
}

func writeDictionaryPayloads( memory.Allocator,  arrow.RecordBatch,  bool,  bool,  *dictutils.Mapper,  map[int64]arrow.Array,  PayloadWriter,  *recordEncoder) error {
	,  := dictutils.CollectDictionaries(, )
	if  != nil {
		return 
	}
	defer func() {
		for ,  := range  {
			.Dict.Release()
		}
	}()

	 := array.WithNaNsEqual(true)
	for ,  := range  {
		.reset()
		var (
			 int64
			        = dictEncoder{}
		)
		,  := [.ID]
		if  {
			if .Data() == .Dict.Data() {
				continue
			}
			,  := .Dict.Len(), .Len()
			if  ==  && array.ApproxEqual(, .Dict, ) {
				// same dictionary by value
				// might cost CPU, but required for IPC file format
				continue
			}
			if  {
				return errors.New("arrow/ipc: Dictionary replacement detected when writing IPC file format. Arrow IPC File only supports single dictionary per field")
			}

			if  >  &&
				 &&
				!hasNestedDict(.Dict.Data()) &&
				(array.SliceApproxEqual(, 0, int64(), .Dict, 0, int64(), )) {
				 = int64()
			}
		}

		var  = Payload{msg: MessageDictionaryBatch}
		defer .Release()

		 := .Dict
		if  > 0 {
			 = array.NewSlice(, , int64(.Len()))
			defer .Release()
		}
		if  := .Encode(&, .ID,  > 0, );  != nil {
			return 
		}

		if  := .WritePayload();  != nil {
			return 
		}

		[.ID] = .Dict
		if  != nil {
			.Release()
		}
		.Dict.Retain()
	}
	return nil
}

func ( *Writer) () error {
	.started = true

	.mapper.ImportSchema(.schema)
	.lastWrittenDicts = make(map[int64]arrow.Array)

	// write out schema payloads
	 := payloadFromSchema(.schema, .mem, &.mapper)
	defer .Release()

	for ,  := range  {
		 := .pw.WritePayload()
		if  != nil {
			return 
		}
	}

	return nil
}

type dictEncoder struct {
	*recordEncoder
}

func ( *dictEncoder) ( *Payload,  bool, ,  int64) error {
	.meta = writeDictionaryMessage(.mem, , , , .size, .fields, .meta, .codec, .variadicCounts)
	return nil
}

func ( *dictEncoder) ( *Payload,  int64,  bool,  arrow.Array) error {
	.start = 0
	defer func() {
		.start = 0
	}()

	 := arrow.NewSchema([]arrow.Field{{Name: "dictionary", Type: .DataType(), Nullable: true}}, nil)
	 := array.NewRecord(, []arrow.Array{}, int64(.Len()))
	defer .Release()
	if  := .encode(, );  != nil {
		return 
	}

	return .encodeMetadata(, , , .NumRows())
}

type recordEncoder struct {
	mem memory.Allocator

	fields         []fieldMetadata
	meta           []bufferMetadata
	variadicCounts []int64

	depth           int64
	start           int64
	allow64b        bool
	codec           flatbuf.CompressionType
	compressNP      int
	compressors     []compressor
	minSpaceSavings *float64
}

func newRecordEncoder(
	 memory.Allocator,
	,
	 int64,
	 bool,
	 flatbuf.CompressionType,
	 int,
	 *float64,
	 []compressor,
) *recordEncoder {
	return &recordEncoder{
		mem:             ,
		start:           ,
		depth:           ,
		allow64b:        ,
		codec:           ,
		compressNP:      ,
		compressors:     ,
		minSpaceSavings: ,
	}
}

func ( *recordEncoder) (,  int) bool {
	debug.Assert( > 0, "uncompressed size is 0")
	if .minSpaceSavings == nil {
		return true
	}

	 := 1.0 - float64()/float64()
	return  >= *.minSpaceSavings
}

func ( *recordEncoder) () {
	.start = 0
	.fields = make([]fieldMetadata, 0)
}

func ( *recordEncoder) ( int) compressor {
	if .compressors[] == nil {
		.compressors[] = getCompressor(.codec)
	}
	return .compressors[]
}

func ( *recordEncoder) ( *Payload) error {
	 := func( int,  compressor) error {
		if .body[] == nil || .body[].Len() == 0 {
			return nil
		}

		 := memory.NewResizableBuffer(.mem)
		.Reserve(.MaxCompressedLen(.body[].Len()) + arrow.Int64SizeBytes)

		binary.LittleEndian.PutUint64(.Buf(), uint64(.body[].Len()))
		 := &bufferWriter{buf: , pos: arrow.Int64SizeBytes}
		.Reset()

		,  := .Write(.body[].Bytes())
		if  != nil {
			return 
		}
		if  := .Close();  != nil {
			return 
		}

		 := .pos
		 := .pos - arrow.Int64SizeBytes
		if !.shouldCompress(, ) {
			 = copy(.Buf()[arrow.Int64SizeBytes:], .body[].Bytes())
			// size of -1 indicates to the reader that the body
			// doesn't need to be decompressed
			var  int64 = -1
			binary.LittleEndian.PutUint64(.Buf(), uint64())
			 =  + arrow.Int64SizeBytes
		}
		.buf.Resize()
		.body[].Release()
		.body[] = 
		return nil
	}

	if .compressNP <= 1 {
		 := .getCompressor(0)
		for  := range .body {
			if  := (, );  != nil {
				return 
			}
		}
		return nil
	}

	var (
		          sync.WaitGroup
		          = make(chan int)
		       = make(chan error)
		,  = context.WithCancel(context.Background())
	)
	defer ()

	for  := 0;  < .compressNP; ++ {
		.Add(1)
		go func( int) {
			defer .Done()
			 := .getCompressor()
			for {
				select {
				case ,  := <-:
					if ! {
						// we're done, channel is closed!
						return
					}

					if  := (, );  != nil {
						 <- 
						()
						return
					}
				case <-.Done():
					// cancelled, return early
					return
				}
			}
		}()
	}

	for  := range .body {
		 <- 
	}

	close()
	.Wait()
	close()

	return <-
}

func ( *recordEncoder) ( *Payload,  arrow.RecordBatch) error {
	// perform depth-first traversal of the row-batch
	for ,  := range .Columns() {
		 := .visit(, )
		if  != nil {
			return fmt.Errorf("arrow/ipc: could not encode column %d (%q): %w", , .ColumnName(), )
		}
	}

	if .codec != -1 {
		if .minSpaceSavings != nil {
			 := *.minSpaceSavings
			if  < 0 ||  > 1 {
				.Release()
				return fmt.Errorf("%w: minSpaceSavings not in range [0,1]. Provided %.05f",
					arrow.ErrInvalid, )
			}
		}
		.compressBodyBuffers()
	}

	// position for the start of a buffer relative to the passed frame of reference.
	// may be 0 or some other position in an address space.
	 := .start
	.meta = make([]bufferMetadata, len(.body))

	// construct the metadata for the record batch header
	for ,  := range .body {
		var (
			    int64
			 int64
		)
		// the buffer might be null if we are handling zero row lengths.
		if  != nil {
			 = int64(.Len())
			 = bitutil.CeilByte64() - 
		}
		.meta[] = bufferMetadata{
			Offset: ,
			// even though we add padding, we need the Len to be correct
			// so that decompressing works properly.
			Len: ,
		}
		 +=  + 
	}

	.size =  - .start
	if !bitutil.IsMultipleOf8(.size) {
		panic("not aligned")
	}

	return nil
}

func ( *recordEncoder) ( *Payload,  arrow.Array) error {
	if .depth <= 0 {
		return errMaxRecursion
	}

	if !.allow64b && .Len() > math.MaxInt32 {
		return errBigArray
	}

	if .DataType().ID() == arrow.EXTENSION {
		 := .(array.ExtensionArray)
		 := .(, .Storage())
		if  != nil {
			return fmt.Errorf("failed visiting storage of for array %T: %w", , )
		}
		return nil
	}

	if .DataType().ID() == arrow.DICTIONARY {
		 := .(*array.Dictionary)
		return .(, .Indices())
	}

	// add all common elements
	.fields = append(.fields, fieldMetadata{
		Len:    int64(.Len()),
		Nulls:  int64(.NullN()),
		Offset: 0,
	})

	if .DataType().ID() == arrow.NULL {
		return nil
	}

	if internal.HasValidityBitmap(.DataType().ID(), flatbuf.MetadataVersion(currentMetadataVersion)) {
		switch .NullN() {
		case 0:
			// there are no null values, drop the null bitmap
			.body = append(.body, nil)
		default:
			 := .Data()
			var  *memory.Buffer
			if .NullN() == .Len() {
				// every value is null, just use a new zero-initialized bitmap to avoid the expense of copying
				 = memory.NewResizableBuffer(.mem)
				 := paddedLength(bitutil.BytesForBits(int64(.Len())), kArrowAlignment)
				.Resize(int())
			} else {
				// otherwise truncate and copy the bits
				 = newTruncatedBitmap(.mem, int64(.Offset()), int64(.Len()), .Buffers()[0])
			}
			.body = append(.body, )
		}
	}

	switch dtype := .DataType().(type) {
	case *arrow.NullType:
		// ok. NullArrays are completely empty.

	case *arrow.BooleanType:
		var (
			 = .Data()
			 *memory.Buffer
		)

		if .Len() != 0 {
			 = newTruncatedBitmap(.mem, int64(.Offset()), int64(.Len()), .Buffers()[1])
		}
		.body = append(.body, )

	case arrow.FixedWidthDataType:
		 := .Data()
		 := .Buffers()[1]
		 := int64(.Len())
		 := int64(.BitWidth() / 8)
		 := paddedLength(*, kArrowAlignment)

		switch {
		case needTruncate(int64(.Offset()), , ):
			// non-zero offset: slice the buffer
			 := int64(.Offset()) * 
			// send padding if available
			 := min(bitutil.CeilByte64(*), int64(.Len())-)
			 = memory.NewBufferBytes(.Bytes()[ : +])
		default:
			if  != nil {
				.Retain()
			}
		}
		.body = append(.body, )

	case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.StringType, *arrow.LargeStringType:
		 := .(array.BinaryLike)
		 := .getZeroBasedValueOffsets()
		 := .Data()
		 := .Buffers()[2]

		var  int64
		if  != nil {
			 = int64(len(.ValueBytes()))
		}

		switch {
		case needTruncate(int64(.Offset()), , ):
			// slice data buffer to include the range we need now.
			var (
				 int64 = 0
				       = min(paddedLength(, kArrowAlignment), int64())
			)
			if .Len() > 0 {
				 = .ValueOffset64(0)
			}

			 = memory.NewBufferBytes(.Buffers()[2].Bytes()[ : +])
		default:
			if  != nil {
				.Retain()
			}
		}
		.body = append(.body, )
		.body = append(.body, )

	case arrow.BinaryViewDataType:
		 := .Data()
		 := .Buffers()[1]
		 := int64(.Len())
		 := int64(arrow.ViewHeaderSizeBytes)
		 := paddedLength(*, kArrowAlignment)

		switch {
		case needTruncate(int64(.Offset()), , ):
			// non-zero offset: slice the buffer
			 := .Offset() * int()
			// send padding if available
			 := int(min(bitutil.CeilByte64(*), int64(.Len()-)))
			 = memory.SliceBuffer(, , )
		default:
			if  != nil {
				.Retain()
			}
		}
		.body = append(.body, )

		.variadicCounts = append(.variadicCounts, int64(len(.Buffers())-2))
		for ,  := range .Buffers()[2:] {
			.Retain()
			.body = append(.body, )
		}

	case *arrow.StructType:
		.depth--
		 := .(*array.Struct)
		for  := 0;  < .NumField(); ++ {
			 := .(, .Field())
			if  != nil {
				return fmt.Errorf("could not visit field %d of struct-array: %w", , )
			}
		}
		.depth++

	case *arrow.SparseUnionType:
		,  := .Data().Offset(), .Len()
		 := .(*array.SparseUnion)
		 := getTruncatedBuffer(int64(), int64(), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), .TypeCodes())
		.body = append(.body, )

		.depth--
		for  := 0;  < .NumFields(); ++ {
			 := .(, .Field())
			if  != nil {
				return fmt.Errorf("could not visit field %d of sparse union array: %w", , )
			}
		}
		.depth++
	case *arrow.DenseUnionType:
		,  := .Data().Offset(), .Len()
		 := .(*array.DenseUnion)
		 := getTruncatedBuffer(int64(), int64(), int32(unsafe.Sizeof(arrow.UnionTypeCode(0))), .TypeCodes())
		.body = append(.body, )

		.depth--
		 := .UnionType()

		// union type codes are not necessarily 0-indexed
		 := .MaxTypeCode()

		// allocate an array of child offsets. Set all to -1 to indicate we
		// haven't observed a first occurrence of a particular child yet
		 := make([]int32, +1)
		 := make([]int32, +1)
		[0], [0] = -1, 0
		for  := 1;  < len();  *= 2 {
			copy([:], [:])
			copy([:], [:])
		}

		var  *memory.Buffer
		if  != 0 {
			 = .rebaseDenseUnionValueOffsets(, , )
		} else {
			 = getTruncatedBuffer(int64(), int64(), int32(arrow.Int32SizeBytes), .ValueOffsets())
		}
		.body = append(.body, )

		// visit children and slice accordingly
		for  := range .Fields() {
			 := .Field()
			// for sliced unions it's tricky to know how much to truncate
			// the children. For now we'll truncate the children to be
			// no longer than the parent union.

			if  != 0 {
				 := .TypeCodes()[]
				 := []
				 := []

				if  > 0 {
					 = array.NewSlice(, int64(), int64(+))
					defer .Release()
				} else if  < int32(.Len()) {
					 = array.NewSlice(, 0, int64())
					defer .Release()
				}
			}
			if  := .(, );  != nil {
				return fmt.Errorf("could not visit field %d of dense union array: %w", , )
			}
		}
		.depth++
	case *arrow.MapType, *arrow.ListType, *arrow.LargeListType:
		 := .(array.ListLike)
		 := .getZeroBasedValueOffsets()
		.body = append(.body, )

		.depth--
		var (
			        = .ListValues()
			   = false
			 int64
			    int64
		)
		defer func() {
			if  {
				.Release()
			}
		}()

		if .Len() > 0 &&  != nil {
			, _ = .ValueOffsets(0)
			_,  = .ValueOffsets(.Len() - 1)
		}

		if .Len() != 0 ||  < int64(.Len()) {
			// must also slice the values
			 = array.NewSlice(, , )
			 = true
		}
		 := .(, )

		if  != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", , )
		}
		.depth++

	case *arrow.ListViewType, *arrow.LargeListViewType:
		 := .(array.VarLenListLike)

		, ,  := .getZeroBasedListViewOffsets()
		 := .getListViewSizes()

		.body = append(.body, )
		.body = append(.body, )

		.depth--
		var (
			 = .ListValues()
		)

		if  != 0 ||  < int64(.Len()) {
			 = array.NewSlice(, , )
			defer .Release()
		}
		 := .(, )

		if  != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", , )
		}
		.depth++

	case *arrow.FixedSizeListType:
		 := .(*array.FixedSizeList)

		.depth--

		 := int64(.DataType().(*arrow.FixedSizeListType).Len())
		 := int64(.Offset()) * 
		 := int64(.Offset()+.Len()) * 

		 := array.NewSlice(.ListValues(), , )
		defer .Release()

		 := .(, )

		if  != nil {
			return fmt.Errorf("could not visit list element for array %T: %w", , )
		}
		.depth++

	case *arrow.RunEndEncodedType:
		 := .(*array.RunEndEncoded)
		.depth--
		 := .LogicalRunEndsArray(.mem)
		defer .Release()
		if  := .(, );  != nil {
			return 
		}
		 = .LogicalValuesArray()
		defer .Release()
		if  := .(, );  != nil {
			return 
		}
		.depth++

	default:
		panic(fmt.Errorf("arrow/ipc: unknown array %T (dtype=%T)", , ))
	}

	return nil
}

func ( *recordEncoder) ( arrow.Array) *memory.Buffer {
	 := .Data()
	 := .Buffers()[1]
	 := .DataType().(arrow.OffsetsDataType).OffsetTypeTraits()
	 := .BytesRequired(.Len() + 1)

	if  == nil || .Len() == 0 {
		return nil
	}

	 := .DataType().Layout().Buffers[1].ByteWidth

	// if we have a non-zero offset, then the value offsets do not start at
	// zero. we must a) create a new offsets array with shifted offsets and
	// b) slice the values array accordingly
	 := .Offset() != 0

	// or if there are more value offsets than values (the array has been sliced)
	// we need to trim off the trailing offsets
	 :=  < .Len()

	// or if the offsets do not start from the zero index, we need to shift them
	// and slice the values array
	var  int64
	if  == 8 {
		 = arrow.Int64Traits.CastFromBytes(.Bytes())[0]
	} else {
		 = int64(arrow.Int32Traits.CastFromBytes(.Bytes())[0])
	}
	 :=  != 0

	// determine whether the offsets array should be shifted
	 :=  ||  || 

	if  {
		 := memory.NewResizableBuffer(.mem)
		.Resize()

		switch  {
		case 8:
			 := arrow.Int64Traits.CastFromBytes(.Bytes())
			 := arrow.Int64Traits.CastFromBytes(.Bytes())[.Offset() : .Offset()+.Len()+1]

			 := [0]
			for ,  := range  {
				[] =  - 
			}

		default:
			debug.Assert(.DataType().Layout().Buffers[1].ByteWidth == 4, "invalid offset bytewidth")
			 := arrow.Int32Traits.CastFromBytes(.Bytes())
			 := arrow.Int32Traits.CastFromBytes(.Bytes())[.Offset() : .Offset()+.Len()+1]

			 := [0]
			for ,  := range  {
				[] =  - 
			}
		}

		 = 
	} else {
		.Retain()
	}

	return 
}

func getZeroBasedListViewOffsets[ int32 | int64]( memory.Allocator,  array.VarLenListLike) ( *memory.Buffer, ,  ) {
	 := int(unsafe.Sizeof()) * .Len()
	if .Data().Offset() == 0 {
		// slice offsets to used extent, in case we have truncated slice
		,  = 0, (.ListValues().Len())
		 = .Data().Buffers()[1]
		if .Len() >  {
			 = memory.SliceBuffer(, 0, )
		} else {
			.Retain()
		}
		return
	}

	// non-zero offset, it's likely that the smallest offset is not zero
	// we must a) create a new offsets array with shifted offsets and
	// b) slice the values array accordingly

	 = memory.NewResizableBuffer()
	.Resize()
	if .Len() > 0 {
		// max value of int32/int64 based on type
		 = (^(0)) << ((8 * unsafe.Sizeof()) - 1)
		for  := 0;  < .Len(); ++ {
			,  := .ValueOffsets()
			 = utils.Min(, ())
			 = utils.Max(, ())
		}
	}

	 := arrow.GetData[](.Data().Buffers()[1].Bytes())[.Data().Offset():]
	 := arrow.GetData[](.Bytes())
	for  := 0;  < .Len(); ++ {
		[] = [] - 
	}
	return
}

func getListViewSizes[ int32 | int64]( array.VarLenListLike) *memory.Buffer {
	var  
	 := int(unsafe.Sizeof()) * .Len()
	 := .Data().Buffers()[2]

	if .Data().Offset() != 0 || .Len() >  {
		// slice offsets to used extent, in case we have truncated slice
		 := .Data().Offset() * int(unsafe.Sizeof())
		 = memory.SliceBuffer(, , )
	} else {
		.Retain()
	}
	return 
}

func ( *recordEncoder) ( array.VarLenListLike) (*memory.Buffer, int64, int64) {
	if .Len() == 0 {
		return nil, 0, 0
	}

	var (
		     *memory.Buffer
		,  int64
	)

	switch v := .(type) {
	case *array.ListView:
		, ,  := getZeroBasedListViewOffsets[int32](.mem, )
		 = 
		,  = int64(), int64()
	case *array.LargeListView:
		, ,  = getZeroBasedListViewOffsets[int64](.mem, )
	}
	return , , 
}

func ( *recordEncoder) ( array.VarLenListLike) *memory.Buffer {
	if .Len() == 0 {
		return nil
	}

	switch v := .(type) {
	case *array.ListView:
		return getListViewSizes[int32]()
	case *array.LargeListView:
		return getListViewSizes[int64]()
	}
	return nil
}

func ( *recordEncoder) ( *array.DenseUnion, ,  []int32) *memory.Buffer {
	// this case sucks. Because the offsets are different for each
	// child array, when we have a sliced array, we need to re-base
	// the value offsets for each array! ew.
	 := .RawValueOffsets()
	 := .RawTypeCodes()

	 := memory.NewResizableBuffer(.mem)
	.Resize(arrow.Int32Traits.BytesRequired(.Len()))
	 := arrow.Int32Traits.CastFromBytes(.Bytes())

	// compute shifted offsets by subtracting child offset
	for ,  := range  {
		if [] == -1 {
			// offsets are guaranteed to be increasing according to the spec
			// so the first offset we find for a child is the initial offset
			// and will become the "0" for this child.
			[] = []
			[] = 0
		} else {
			[] = [] - []
		}
		[] = max([], []+1)
	}
	return 
}

func ( *recordEncoder) ( *Payload,  arrow.RecordBatch) error {
	if  := .encode(, );  != nil {
		return 
	}
	return .encodeMetadata(, .NumRows())
}

func ( *recordEncoder) ( *Payload,  int64) error {
	.meta = writeRecordMessage(.mem, , .size, .fields, .meta, .codec, .variadicCounts)
	return nil
}

func newTruncatedBitmap( memory.Allocator, ,  int64,  *memory.Buffer) *memory.Buffer {
	if  == nil {
		return nil
	}

	 := paddedLength(bitutil.BytesForBits(), kArrowAlignment)
	switch {
	case  != 0 ||  < int64(.Len()):
		// with a sliced array / non-zero offset, we must copy the bitmap
		 := memory.NewResizableBuffer()
		.Resize(int())
		bitutil.CopyBitmap(.Bytes(), int(), int(), .Bytes(), 0)
		return 
	default:
		.Retain()
		return 
	}
}

func getTruncatedBuffer(,  int64,  int32,  *memory.Buffer) *memory.Buffer {
	if  == nil {
		return 
	}

	 := paddedLength(*int64(), kArrowAlignment)
	if  != 0 ||  < int64(.Len()) {
		return memory.SliceBuffer(, int(*int64()), int(min(, int64(.Len()))))
	}
	.Retain()
	return 
}

func needTruncate( int64,  *memory.Buffer,  int64) bool {
	if  == nil {
		return false
	}
	return  != 0 ||  < int64(.Len())
}

// GetRecordBatchPayload produces the ipc payload for a given record batch.
// The resulting payload itself must be released by the caller via the Release
// method after it is no longer needed.
func ( arrow.RecordBatch,  ...Option) (Payload, error) {
	 := newConfig(...)
	var (
		 = Payload{msg: MessageRecordBatch}
		  = newRecordEncoder(
			.alloc,
			0,
			kMaxNestingDepth,
			true,
			.codec,
			.compressNP,
			.minSpaceSavings,
			make([]compressor, .compressNP),
		)
	)

	 := .Encode(&, )
	if  != nil {
		return Payload{}, 
	}

	return , nil
}

// GetSchemaPayload produces the ipc payload for a given schema.
func ( *arrow.Schema,  memory.Allocator) Payload {
	var  dictutils.Mapper
	.ImportSchema()
	 := payloadFromSchema(, , &)
	return [0]
}