// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	
	
	

	
	
	
	
	
	
	
	
)

type readerImpl interface {
	getFooterEnd() (int64, error)
	getBytes(offset, length int64) ([]byte, error)
	dict(memory.Allocator, *footerBlock, int) (dataBlock, error)
	block(memory.Allocator, *footerBlock, int) (dataBlock, error)
}

type footerBlock struct {
	offset int64
	buffer *memory.Buffer
	data   *flatbuf.Footer
}

type dataBlock interface {
	Offset() int64
	Meta() int32
	Body() int64
	NewMessage() (*Message, error)
}

const footerSizeLen = 4

var minimumOffsetSize = int64(len(Magic)*2 + footerSizeLen)

type basicReaderImpl struct {
	r ReadAtSeeker
}

func ( *basicReaderImpl) (,  int64) ([]byte, error) {
	 := make([]byte, )
	,  := .r.ReadAt(, )
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at offset %d: %w", , , )
	}
	if int64() !=  {
		return nil, fmt.Errorf("arrow/ipc: could not read %d bytes at offset %d", , )
	}
	return , nil
}

func ( *basicReaderImpl) () (int64, error) {
	return .r.Seek(0, io.SeekEnd)
}

func ( *basicReaderImpl) ( memory.Allocator,  *footerBlock,  int) (dataBlock, error) {
	var  flatbuf.Block
	if !.data.RecordBatches(&, ) {
		return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract file block %d", )
	}

	return fileBlock{
		offset: .Offset(),
		meta:   .MetaDataLength(),
		body:   .BodyLength(),
		r:      .r,
		mem:    ,
	}, nil
}

func ( *basicReaderImpl) ( memory.Allocator,  *footerBlock,  int) (dataBlock, error) {
	var  flatbuf.Block
	if !.data.Dictionaries(&, ) {
		return fileBlock{}, fmt.Errorf("arrow/ipc: could not extract dictionary block %d", )
	}

	return fileBlock{
		offset: .Offset(),
		meta:   .MetaDataLength(),
		body:   .BodyLength(),
		r:      .r,
		mem:    ,
	}, nil
}

type mappedReaderImpl struct {
	data []byte
}

func ( *mappedReaderImpl) (,  int64) ([]byte, error) {
	if  < 0 || +int64() > int64(len(.data)) {
		return nil, fmt.Errorf("arrow/ipc: invalid offset=%d or length=%d", , )
	}

	return .data[ : +], nil
}

func ( *mappedReaderImpl) () (int64, error) { return int64(len(.data)), nil }

func ( *mappedReaderImpl) ( memory.Allocator,  *footerBlock,  int) (dataBlock, error) {
	var  flatbuf.Block
	if !.data.RecordBatches(&, ) {
		return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not extract file block %d", )
	}

	return mappedFileBlock{
		offset: .Offset(),
		meta:   .MetaDataLength(),
		body:   .BodyLength(),
		data:   .data,
	}, nil
}

func ( *mappedReaderImpl) ( memory.Allocator,  *footerBlock,  int) (dataBlock, error) {
	var  flatbuf.Block
	if !.data.Dictionaries(&, ) {
		return mappedFileBlock{}, fmt.Errorf("arrow/ipc: could not extract dictionary block %d", )
	}

	return mappedFileBlock{
		offset: .Offset(),
		meta:   .MetaDataLength(),
		body:   .BodyLength(),
		data:   .data,
	}, nil
}

// FileReader is an Arrow file reader.
type FileReader struct {
	r readerImpl

	footer footerBlock

	// fields dictTypeMap
	memo dictutils.Memo

	schema *arrow.Schema
	record arrow.RecordBatch

	irec int   // current record index. used for the arrio.Reader interface
	err  error // last error

	mem            memory.Allocator
	swapEndianness bool
}

// NewMappedFileReader is like NewFileReader but instead of using a ReadAtSeeker,
// which will force copies through the Read/ReadAt methods, it uses a byte slice
// and pulls slices directly from the data. This is useful specifically when
// dealing with mmapped data so that you can lazily load the buffers and avoid
// extraneous copies. The slices used for the record column buffers will simply
// reference the existing data instead of performing copies via ReadAt/Read.
//
// For example, syscall.Mmap returns a byte slice which could be referencing
// a shared memory region or otherwise a memory-mapped file.
func ( []byte,  ...Option) (*FileReader, error) {
	var (
		 = newConfig(...)
		   = FileReader{
			r:   &mappedReaderImpl{data: },
			mem: .alloc,
		}
	)

	if  := .init();  != nil {
		return nil, 
	}
	return &, nil
}

// NewFileReader opens an Arrow file using the provided reader r.
func ( ReadAtSeeker,  ...Option) (*FileReader, error) {
	var (
		 = newConfig(...)
		   = FileReader{
			r:    &basicReaderImpl{r: },
			memo: dictutils.NewMemo(),
			mem:  .alloc,
		}
	)

	if  := .init();  != nil {
		return nil, 
	}
	return &, nil
}

func ( *FileReader) ( *config) error {
	var  error
	if .footer.offset <= 0 {
		.footer.offset,  = .r.getFooterEnd()
		if  != nil {
			return fmt.Errorf("arrow/ipc: could retrieve footer offset: %w", )
		}
	}
	.footer.offset = .footer.offset

	 = .readFooter()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not decode footer: %w", )
	}

	 = .readSchema(.ensureNativeEndian)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not decode schema: %w", )
	}

	if .schema != nil && !.schema.Equal(.schema) {
		return fmt.Errorf("arrow/ipc: inconsistent schema for reading (got: %v, want: %v)", .schema, .schema)
	}

	return 
}

func ( *FileReader) ( bool) error {
	var (
		  error
		 dictutils.Kind
	)

	 := .footer.data.Schema(nil)
	if  == nil {
		return fmt.Errorf("arrow/ipc: could not load schema from flatbuffer data")
	}
	.schema,  = schemaFromFB(, &.memo)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not read schema: %w", )
	}

	if  && !.schema.IsNativeEndian() {
		.swapEndianness = true
		.schema = .schema.WithEndianness(endian.NativeEndian)
	}

	for  := 0;  < .NumDictionaries(); ++ {
		,  := .r.dict(.mem, &.footer, )
		if  != nil {
			return fmt.Errorf("arrow/ipc: could not read dictionary[%d]: %w", , )
		}
		switch {
		case !bitutil.IsMultipleOf8(.Offset()):
			return fmt.Errorf("arrow/ipc: invalid file offset=%d for dictionary %d", .Offset(), )
		case !bitutil.IsMultipleOf8(int64(.Meta())):
			return fmt.Errorf("arrow/ipc: invalid file metadata=%d position for dictionary %d", .Meta(), )
		case !bitutil.IsMultipleOf8(.Body()):
			return fmt.Errorf("arrow/ipc: invalid file body=%d position for dictionary %d", .Body(), )
		}

		,  := .NewMessage()
		if  != nil {
			return 
		}

		,  = readDictionary(&.memo, .meta, .body, .swapEndianness, .mem)
		if  != nil {
			return 
		}
		if  == dictutils.KindReplacement {
			return errors.New("arrow/ipc: unsupported dictionary replacement in IPC file")
		}
	}

	return 
}

func ( *FileReader) () error {
	if .footer.offset <= minimumOffsetSize {
		return fmt.Errorf("arrow/ipc: file too small (size=%d)", .footer.offset)
	}

	 := int64(len(Magic) + footerSizeLen)
	,  := .r.getBytes(.footer.offset-, )
	if  != nil {
		return 
	}

	if !bytes.Equal([4:], Magic) {
		return errNotArrowFile
	}

	 := int64(binary.LittleEndian.Uint32([:footerSizeLen]))
	if  <= 0 || +minimumOffsetSize > .footer.offset {
		return errInconsistentFileMetadata
	}

	,  = .r.getBytes(.footer.offset--, )
	if  != nil {
		return 
	}

	.footer.buffer = memory.NewBufferBytes()
	.footer.data = flatbuf.GetRootAsFooter(, 0)
	return nil
}

func ( *FileReader) () *arrow.Schema {
	return .schema
}

func ( *FileReader) () int {
	if .footer.data == nil {
		return 0
	}
	return .footer.data.DictionariesLength()
}

func ( *FileReader) () int {
	return .footer.data.RecordBatchesLength()
}

func ( *FileReader) () MetadataVersion {
	return MetadataVersion(.footer.data.Version())
}

// Close cleans up resources used by the File.
// Close does not close the underlying reader.
func ( *FileReader) () error {
	if .footer.data != nil {
		.footer.data = nil
	}

	if .footer.buffer != nil {
		.footer.buffer.Release()
		.footer.buffer = nil
	}

	if .record != nil {
		.record.Release()
		.record = nil
	}
	return nil
}

// RecordBatch returns the i-th record batch from the file.
// The returned value is valid until the next call to RecordBatch.
// Users need to call Retain on that RecordBatch to keep it valid for longer.
func ( *FileReader) ( int) (arrow.RecordBatch, error) {
	,  := .RecordBatchAt()
	if  != nil {
		return nil, 
	}

	if .record != nil {
		.record.Release()
	}

	.record = 
	return , nil
}

// Record returns the i-th record from the file.
// The returned value is valid until the next call to Record.
// Users need to call Retain on that Record to keep it valid for longer.
//
// Deprecated: Use [RecordBatch] instead.
func ( *FileReader) ( int) (arrow.Record, error) {
	return .RecordBatch()
}

// RecordBatchAt returns the i-th record batch from the file. Ownership is transferred to the
// caller and must call Release() to free the memory. This method is safe to
// call concurrently.
func ( *FileReader) ( int) (arrow.RecordBatch, error) {
	if  < 0 ||  > .NumRecords() {
		panic("arrow/ipc: record index out of bounds")
	}

	,  := .r.block(.mem, &.footer, )
	if  != nil {
		return nil, 
	}
	switch {
	case !bitutil.IsMultipleOf8(.Offset()):
		return nil, fmt.Errorf("arrow/ipc: invalid file offset=%d for record %d", .Offset(), )
	case !bitutil.IsMultipleOf8(int64(.Meta())):
		return nil, fmt.Errorf("arrow/ipc: invalid file metadata=%d position for record %d", .Meta(), )
	case !bitutil.IsMultipleOf8(.Body()):
		return nil, fmt.Errorf("arrow/ipc: invalid file body=%d position for record %d", .Body(), )
	}

	,  := .NewMessage()
	if  != nil {
		return nil, 
	}
	defer .Release()

	if .Type() != MessageRecordBatch {
		return nil, fmt.Errorf("arrow/ipc: message %d is not a RecordBatch", )
	}

	return newRecordBatch(.schema, &.memo, .meta, .body, .swapEndianness, .mem), nil
}

// RecordAt returns the i-th record from the file. Ownership is transferred to the
// caller and must call Release() to free the memory. This method is safe to
// call concurrently.
//
// Deprecated: Use [RecordBatchAt] instead.
func ( *FileReader) ( int) (arrow.Record, error) {
	return .RecordBatchAt()
}

// Read reads the current record batch from the underlying stream and an error, if any.
// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
//
// The returned record batch value is valid until the next call to Read.
// Users need to call Retain on that RecordBatch to keep it valid for longer.
func ( *FileReader) () ( arrow.RecordBatch,  error) {
	if .irec == .NumRecords() {
		return nil, io.EOF
	}
	, .err = .RecordBatch(.irec)
	.irec++
	return , .err
}

// ReadAt reads the i-th record batch from the underlying stream and an error, if any.
func ( *FileReader) ( int64) (arrow.RecordBatch, error) {
	return .RecordBatch(int())
}

func newRecordBatch( *arrow.Schema,  *dictutils.Memo,  *memory.Buffer,  *memory.Buffer,  bool,  memory.Allocator) arrow.RecordBatch {
	var (
		   = flatbuf.GetRootAsMessage(.Bytes(), 0)
		    flatbuf.RecordBatch
		 decompressor
	)
	initFB(&, .Header)
	 := .Length()

	 := .Compression(nil)
	if  != nil {
		 = getDecompressor(.Codec())
		defer .Close()
	}

	 := &arrayLoaderContext{
		src: ipcSource{
			meta:     &,
			rawBytes: ,
			codec:    ,
			mem:      ,
		},
		memo:    ,
		max:     kMaxNestingDepth,
		version: MetadataVersion(.Version()),
	}

	 := dictutils.NewFieldPos()
	 := make([]arrow.Array, .NumFields())
	for  := 0;  < .NumFields(); ++ {
		 := .loadArray(.Field().Type)
		defer .Release()

		if  := dictutils.ResolveFieldDict(, , .Child(int32()), );  != nil {
			panic()
		}

		if  {
			swapEndianArrayData(.(*array.Data))
		}

		[] = array.MakeFromData()
		defer [].Release()
	}

	return array.NewRecord(, , )
}

type ipcSource struct {
	meta     *flatbuf.RecordBatch
	rawBytes *memory.Buffer
	codec    decompressor
	mem      memory.Allocator
}

func ( *ipcSource) ( int) *memory.Buffer {
	var  flatbuf.Buffer
	if !.meta.Buffers(&, ) {
		panic("arrow/ipc: buffer index out of bound")
	}

	if .Length() == 0 {
		return memory.NewBufferBytes(nil)
	}

	var  *memory.Buffer
	if .codec == nil {
		 = memory.SliceBuffer(.rawBytes, int(.Offset()), int(.Length()))
	} else {
		 := .rawBytes.Bytes()[.Offset() : .Offset()+.Length()]
		 := int64(binary.LittleEndian.Uint64([:8]))

		// check for an uncompressed buffer
		if  != -1 {
			 = memory.NewResizableBuffer(.mem)
			.Resize(int())
			.codec.Reset(bytes.NewReader([8:]))
			if ,  := io.ReadFull(.codec, .Bytes());  != nil {
				panic()
			}
		} else {
			 = memory.SliceBuffer(.rawBytes, int(.Offset())+8, int(.Length())-8)
		}
	}

	return 
}

func ( *ipcSource) ( int) *flatbuf.FieldNode {
	var  flatbuf.FieldNode
	if !.meta.Nodes(&, ) {
		panic("arrow/ipc: field metadata out of bound")
	}
	return &
}

func ( *ipcSource) ( int) int64 {
	return .meta.VariadicBufferCounts()
}

type arrayLoaderContext struct {
	src       ipcSource
	ifield    int
	ibuffer   int
	ivariadic int
	max       int
	memo      *dictutils.Memo
	version   MetadataVersion
}

func ( *arrayLoaderContext) () *flatbuf.FieldNode {
	 := .src.fieldMetadata(.ifield)
	.ifield++
	return 
}

func ( *arrayLoaderContext) () *memory.Buffer {
	 := .src.buffer(.ibuffer)
	.ibuffer++
	return 
}

func ( *arrayLoaderContext) () int64 {
	 := .src.variadicCount(.ivariadic)
	.ivariadic++
	return 
}

func ( *arrayLoaderContext) ( arrow.DataType) arrow.ArrayData {
	switch dt := .(type) {
	case *arrow.NullType:
		return .loadNull()

	case *arrow.DictionaryType:
		 := .loadPrimitive(.IndexType)
		defer .Release()
		return array.NewData(, .Len(), .Buffers(), .Children(), .NullN(), .Offset())

	case *arrow.BooleanType,
		*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
		*arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
		*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
		arrow.DecimalType,
		*arrow.Time32Type, *arrow.Time64Type,
		*arrow.TimestampType,
		*arrow.Date32Type, *arrow.Date64Type,
		*arrow.MonthIntervalType, *arrow.DayTimeIntervalType, *arrow.MonthDayNanoIntervalType,
		*arrow.DurationType:
		return .loadPrimitive()

	case *arrow.BinaryType, *arrow.StringType, *arrow.LargeStringType, *arrow.LargeBinaryType:
		return .loadBinary()

	case arrow.BinaryViewDataType:
		return .loadBinaryView()

	case *arrow.FixedSizeBinaryType:
		return .loadFixedSizeBinary()

	case *arrow.ListType:
		return .loadList()

	case *arrow.LargeListType:
		return .loadList()

	case *arrow.ListViewType:
		return .loadListView()

	case *arrow.LargeListViewType:
		return .loadListView()

	case *arrow.FixedSizeListType:
		return .loadFixedSizeList()

	case *arrow.StructType:
		return .loadStruct()

	case *arrow.MapType:
		return .loadMap()

	case arrow.ExtensionType:
		 := .(.StorageType())
		defer .Release()
		return array.NewData(, .Len(), .Buffers(), .Children(), .NullN(), .Offset())

	case *arrow.RunEndEncodedType:
		,  := .loadCommon(.ID(), 1)
		defer memory.ReleaseBuffers()

		 := .loadChild(.RunEnds())
		defer .Release()
		 := .loadChild(.Encoded())
		defer .Release()

		return array.NewData(, int(.Length()), , []arrow.ArrayData{, }, int(.NullCount()), 0)

	case arrow.UnionType:
		return .loadUnion()

	default:
		panic(fmt.Errorf("arrow/ipc: array type %T not handled yet", ))
	}
}

func ( *arrayLoaderContext) ( arrow.Type,  int) (*flatbuf.FieldNode, []*memory.Buffer) {
	 := make([]*memory.Buffer, 0, )
	 := .field()

	var  *memory.Buffer

	if internal.HasValidityBitmap(, flatbuf.MetadataVersion(.version)) {
		switch .NullCount() {
		case 0:
			.ibuffer++
		default:
			 = .buffer()
		}
	}
	 = append(, )

	return , 
}

func ( *arrayLoaderContext) ( arrow.DataType) arrow.ArrayData {
	if .max == 0 {
		panic("arrow/ipc: nested type limit reached")
	}
	.max--
	 := .loadArray()
	.max++
	return 
}

func ( *arrayLoaderContext) () arrow.ArrayData {
	 := .field()
	return array.NewData(arrow.Null, int(.Length()), nil, nil, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.DataType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 2)

	switch .Length() {
	case 0:
		 = append(, nil)
		.ibuffer++
	default:
		 = append(, .buffer())
	}

	defer memory.ReleaseBuffers()

	return array.NewData(, int(.Length()), , nil, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.DataType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 3)
	 = append(, .buffer(), .buffer())
	defer memory.ReleaseBuffers()

	return array.NewData(, int(.Length()), , nil, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.DataType) arrow.ArrayData {
	 := .variadic()
	,  := .loadCommon(.ID(), 2+int())
	 = append(, .buffer())
	for  := 0;  < int(); ++ {
		 = append(, .buffer())
	}
	defer memory.ReleaseBuffers()

	return array.NewData(, int(.Length()), , nil, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( *arrow.FixedSizeBinaryType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 2)
	 = append(, .buffer())
	defer memory.ReleaseBuffers()

	return array.NewData(, int(.Length()), , nil, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( *arrow.MapType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 2)
	 = append(, .buffer())
	defer memory.ReleaseBuffers()

	 := .loadChild(.Elem())
	defer .Release()

	return array.NewData(, int(.Length()), , []arrow.ArrayData{}, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.ListLikeType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 2)
	 = append(, .buffer())
	defer memory.ReleaseBuffers()

	 := .loadChild(.Elem())
	defer .Release()

	return array.NewData(, int(.Length()), , []arrow.ArrayData{}, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.VarLenListLikeType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 3)
	 = append(, .buffer(), .buffer())
	defer memory.ReleaseBuffers()

	 := .loadChild(.Elem())
	defer .Release()

	return array.NewData(, int(.Length()), , []arrow.ArrayData{}, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( *arrow.FixedSizeListType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 1)
	defer memory.ReleaseBuffers()

	 := .loadChild(.Elem())
	defer .Release()

	return array.NewData(, int(.Length()), , []arrow.ArrayData{}, int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( *arrow.StructType) arrow.ArrayData {
	,  := .loadCommon(.ID(), 1)
	defer memory.ReleaseBuffers()

	 := make([]arrow.ArrayData, .NumFields())
	for ,  := range .Fields() {
		[] = .loadChild(.Type)
	}
	defer func() {
		for  := range  {
			[].Release()
		}
	}()

	return array.NewData(, int(.Length()), , , int(.NullCount()), 0)
}

func ( *arrayLoaderContext) ( arrow.UnionType) arrow.ArrayData {
	// Sparse unions have 2 buffers (a nil validity bitmap, and the type ids)
	 := 2
	// Dense unions have a third buffer, the offsets
	if .Mode() == arrow.DenseMode {
		 = 3
	}

	,  := .loadCommon(.ID(), )
	if .NullCount() != 0 && [0] != nil {
		panic("arrow/ipc: cannot read pre-1.0.0 union array with top-level validity bitmap")
	}

	switch .Length() {
	case 0:
		 = append(, memory.NewBufferBytes([]byte{}))
		.ibuffer++
		if .Mode() == arrow.DenseMode {
			 = append(, nil)
			.ibuffer++
		}
	default:
		 = append(, .buffer())
		if .Mode() == arrow.DenseMode {
			 = append(, .buffer())
		}
	}

	defer memory.ReleaseBuffers()
	 := make([]arrow.ArrayData, .NumFields())
	for ,  := range .Fields() {
		[] = .loadChild(.Type)
	}
	defer func() {
		for  := range  {
			[].Release()
		}
	}()
	return array.NewData(, int(.Length()), , , 0, 0)
}

func readDictionary( *dictutils.Memo,  *memory.Buffer,  *memory.Buffer,  bool,  memory.Allocator) (dictutils.Kind, error) {
	var (
		   = flatbuf.GetRootAsMessage(.Bytes(), 0)
		    flatbuf.DictionaryBatch
		  flatbuf.RecordBatch
		 decompressor
	)
	initFB(&, .Header)

	.Data(&)
	 := .Compression(nil)
	if  != nil {
		 = getDecompressor(.Codec())
		defer .Close()
	}

	 := .Id()
	// look up the dictionary value type, which must have been added to the
	// memo already before calling this function
	,  := .Type()
	if ! {
		return 0, fmt.Errorf("arrow/ipc: no dictionary type found with id: %d", )
	}

	 := &arrayLoaderContext{
		src: ipcSource{
			meta:     &,
			codec:    ,
			rawBytes: ,
			mem:      ,
		},
		memo: ,
		max:  kMaxNestingDepth,
	}

	 := .loadArray()
	defer .Release()

	if  {
		swapEndianArrayData(.(*array.Data))
	}

	if .IsDelta() {
		.AddDelta(, )
		return dictutils.KindDelta, nil
	}
	if .AddOrReplace(, ) {
		return dictutils.KindNew, nil
	}
	return dictutils.KindReplacement, nil
}

type mappedFileBlock struct {
	offset int64
	meta   int32
	body   int64

	data []byte
}

func ( mappedFileBlock) () int64 { return .offset }
func ( mappedFileBlock) () int32   { return .meta }
func ( mappedFileBlock) () int64   { return .body }

func ( mappedFileBlock) () []byte {
	return .data[.offset : .offset+int64(.meta)+.body]
}

func ( mappedFileBlock) () (*Message, error) {
	var (
		 *memory.Buffer
		 *memory.Buffer
		  = .section()
	)

	 := [:.meta]

	 := 0
	switch binary.LittleEndian.Uint32() {
	case 0:
	case kIPCContToken:
		 = 8
	default:
		// ARROW-6314: backwards compatibility for reading old IPC
		// messages produced prior to version 0.15.0
		 = 4
	}

	 = memory.NewBufferBytes([:])
	 = memory.NewBufferBytes([.meta : int64(.meta)+.body])
	return NewMessage(, ), nil
}