// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	
	

	
	
	
	
	
	
	
	
)

// Reader reads records from an io.Reader.
// Reader expects a schema (plus any dictionaries) as the first messages
// in the stream, followed by records.
type Reader struct {
	r      MessageReader
	schema *arrow.Schema

	refCount atomic.Int64
	rec      arrow.RecordBatch
	err      error

	// types dictTypeMap
	memo               dictutils.Memo
	readInitialDicts   bool
	done               bool
	swapEndianness     bool
	ensureNativeEndian bool
	expectedSchema     *arrow.Schema

	mem memory.Allocator
}

// NewReaderFromMessageReader allows constructing a new reader object with the
// provided MessageReader allowing injection of reading messages other than
// by simple streaming bytes such as Arrow Flight which receives a protobuf message
func ( MessageReader,  ...Option) ( *Reader,  error) {
	defer func() {
		if  := recover();  != nil {
			 = utils.FormatRecoveredError("arrow/ipc: unknown error while reading", )
		}
	}()
	 := newConfig()
	for ,  := range  {
		()
	}

	 := &Reader{
		r:        ,
		refCount: atomic.Int64{},
		// types:    make(dictTypeMap),
		memo:               dictutils.NewMemo(),
		mem:                .alloc,
		ensureNativeEndian: .ensureNativeEndian,
		expectedSchema:     .schema,
	}
	.refCount.Add(1)

	if !.noAutoSchema {
		if  := .readSchema(.schema);  != nil {
			return nil, 
		}
	}

	return , nil
}

// NewReader returns a reader that reads records from an input stream.
func ( io.Reader,  ...Option) (*Reader, error) {
	return NewReaderFromMessageReader(NewMessageReader(, ...), ...)
}

// Err returns the last error encountered during the iteration over the
// underlying stream.
func ( *Reader) () error { return .err }

func ( *Reader) () *arrow.Schema {
	if .schema == nil {
		if  := .readSchema(.expectedSchema);  != nil {
			.err = fmt.Errorf("arrow/ipc: could not read schema from stream: %w", )
			.done = true
		}
	}
	return .schema
}

func ( *Reader) ( *arrow.Schema) error {
	,  := .r.Message()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not read message schema: %w", )
	}

	if .Type() != MessageSchema {
		return fmt.Errorf("arrow/ipc: invalid message type (got=%v, want=%v)", .Type(), MessageSchema)
	}

	// FIXME(sbinet) refactor msg-header handling.
	var  flatbuf.Schema
	initFB(&, .msg.Header)

	.schema,  = schemaFromFB(&, &.memo)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not decode schema from message schema: %w", )
	}

	// check the provided schema match the one read from stream.
	if  != nil && !.Equal(.schema) {
		return errInconsistentSchema
	}

	if .ensureNativeEndian && !.schema.IsNativeEndian() {
		.swapEndianness = true
		.schema = .schema.WithEndianness(endian.NativeEndian)
	}

	return nil
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *Reader) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *Reader) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		if .rec != nil {
			.rec.Release()
			.rec = nil
		}
		if .r != nil {
			.r.Release()
			.r = nil
		}
		.memo.Clear()
	}
}

// Next returns whether a RecordBatch could be extracted from the underlying stream.
func ( *Reader) () bool {
	if .rec != nil {
		.rec.Release()
		.rec = nil
	}

	if .err != nil || .done {
		return false
	}

	return .next()
}

func ( *Reader) () bool {
	var  *Message
	// we have to get all dictionaries before reconstructing the first
	// record. subsequent deltas and replacements modify the memo
	 := .memo.Mapper.NumDicts()
	// there should be numDicts dictionary messages
	for  := 0;  < ; ++ {
		, .err = .r.Message()
		if .err != nil {
			.done = true
			if .err == io.EOF {
				if  == 0 {
					.err = nil
				} else {
					.err = fmt.Errorf("arrow/ipc: IPC stream ended without reading the expected (%d) dictionaries", )
				}
			}
			return false
		}

		if .Type() != MessageDictionaryBatch {
			.err = fmt.Errorf("arrow/ipc: IPC stream did not have the expected (%d) dictionaries at the start of the stream", )
		}
		if ,  := readDictionary(&.memo, .meta, .body, .swapEndianness, .mem);  != nil {
			.done = true
			.err = 
			return false
		}
	}
	.readInitialDicts = true
	return true
}

func ( *Reader) () bool {
	defer func() {
		if  := recover();  != nil {
			.err = utils.FormatRecoveredError("arrow/ipc: unknown error while reading", )
		}
	}()
	if .schema == nil {
		if  := .readSchema(.expectedSchema);  != nil {
			.err = fmt.Errorf("arrow/ipc: could not read schema from stream: %w", )
			.done = true
			return false
		}
	}

	if !.readInitialDicts && !.getInitialDicts() {
		return false
	}

	var  *Message
	, .err = .r.Message()

	for  != nil && .Type() == MessageDictionaryBatch {
		if _, .err = readDictionary(&.memo, .meta, .body, .swapEndianness, .mem); .err != nil {
			.done = true
			return false
		}
		, .err = .r.Message()
	}
	if .err != nil {
		.done = true
		if errors.Is(.err, io.EOF) {
			.err = nil
		}
		return false
	}

	if ,  := .Type(), MessageRecordBatch;  !=  {
		.err = fmt.Errorf("arrow/ipc: invalid message type (got=%v, want=%v", , )
		return false
	}

	.rec = newRecordBatch(.schema, &.memo, .meta, .body, .swapEndianness, .mem)
	return true
}

// RecordBatch returns the current record batch that has been extracted from the
// underlying stream.
// It is valid until the next call to Next.
func ( *Reader) () arrow.RecordBatch {
	return .rec
}

// Record returns the current record that has been extracted from the
// underlying stream.
// It is valid until the next call to Next.
//
// Deprecated: Use [RecordBatch] instead.
func ( *Reader) () arrow.Record {
	return .RecordBatch()
}

// Read reads the current record batch from the underlying stream and an error, if any.
// When the Reader reaches the end of the underlying stream, it returns (nil, io.EOF).
func ( *Reader) () (arrow.RecordBatch, error) {
	if .rec != nil {
		.rec.Release()
		.rec = nil
	}

	if !.next() {
		if .done && .err == nil {
			return nil, io.EOF
		}
		return nil, .err
	}

	return .rec, nil
}

var _ array.RecordReader = (*Reader)(nil)