// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	
	

	
	
	
)

// MetadataVersion represents the Arrow metadata version.
type MetadataVersion flatbuf.MetadataVersion

const (
	MetadataV1 = MetadataVersion(flatbuf.MetadataVersionV1) // version for Arrow Format-0.1.0
	MetadataV2 = MetadataVersion(flatbuf.MetadataVersionV2) // version for Arrow Format-0.2.0
	MetadataV3 = MetadataVersion(flatbuf.MetadataVersionV3) // version for Arrow Format-0.3.0 to 0.7.1
	MetadataV4 = MetadataVersion(flatbuf.MetadataVersionV4) // version for >= Arrow Format-0.8.0
	MetadataV5 = MetadataVersion(flatbuf.MetadataVersionV5) // version for >= Arrow Format-1.0.0, backward compatible with v4
)

func ( MetadataVersion) () string {
	if ,  := flatbuf.EnumNamesMetadataVersion[flatbuf.MetadataVersion()];  {
		return 
	}
	return fmt.Sprintf("MetadataVersion(%d)", int16())
}

// MessageType represents the type of Message in an Arrow format.
type MessageType flatbuf.MessageHeader

const (
	MessageNone            = MessageType(flatbuf.MessageHeaderNONE)
	MessageSchema          = MessageType(flatbuf.MessageHeaderSchema)
	MessageDictionaryBatch = MessageType(flatbuf.MessageHeaderDictionaryBatch)
	MessageRecordBatch     = MessageType(flatbuf.MessageHeaderRecordBatch)
	MessageTensor          = MessageType(flatbuf.MessageHeaderTensor)
	MessageSparseTensor    = MessageType(flatbuf.MessageHeaderSparseTensor)
)

func ( MessageType) () string {
	if ,  := flatbuf.EnumNamesMessageHeader[flatbuf.MessageHeader()];  {
		return 
	}
	return fmt.Sprintf("MessageType(%d)", int())
}

// Message is an IPC message, including metadata and body.
type Message struct {
	refCount atomic.Int64
	msg      *flatbuf.Message
	meta     *memory.Buffer
	body     *memory.Buffer
}

// NewMessage creates a new message from the metadata and body buffers.
// NewMessage panics if any of these buffers is nil.
func (,  *memory.Buffer) *Message {
	if  == nil ||  == nil {
		panic("arrow/ipc: nil buffers")
	}
	.Retain()
	.Retain()
	 := &Message{
		msg:  flatbuf.GetRootAsMessage(.Bytes(), 0),
		meta: ,
		body: ,
	}
	.refCount.Add(1)
	return 
}

func newMessageFromFB( *flatbuf.Message,  *memory.Buffer) *Message {
	if  == nil ||  == nil {
		panic("arrow/ipc: nil buffers")
	}
	.Retain()
	 := &Message{
		msg:  ,
		meta: memory.NewBufferBytes(.Table().Bytes),
		body: ,
	}
	.refCount.Add(1)
	return 
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *Message) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// Release may be called simultaneously from multiple goroutines.
// When the reference count goes to zero, the memory is freed.
func ( *Message) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		.meta.Release()
		.body.Release()
		.msg = nil
		.meta = nil
		.body = nil
	}
}

func ( *Message) () MetadataVersion {
	return MetadataVersion(.msg.Version())
}

func ( *Message) () MessageType {
	return MessageType(.msg.HeaderType())
}

func ( *Message) () int64 {
	return .msg.BodyLength()
}

type MessageReader interface {
	Message() (*Message, error)
	Release()
	Retain()
}

// MessageReader reads messages from an io.Reader.
type messageReader struct {
	r io.Reader

	refCount atomic.Int64
	msg      *Message

	mem memory.Allocator
}

// NewMessageReader returns a reader that reads messages from an input stream.
func ( io.Reader,  ...Option) MessageReader {
	 := newConfig()
	for ,  := range  {
		()
	}

	 := &messageReader{r: , mem: .alloc}
	.refCount.Add(1)
	return 
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *messageReader) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *messageReader) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		if .msg != nil {
			.msg.Release()
			.msg = nil
		}
	}
}

// Message returns the current message that has been extracted from the
// underlying stream.
// It is valid until the next call to Message.
func ( *messageReader) () (*Message, error) {
	 := make([]byte, 4)
	,  := io.ReadFull(.r, )
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read continuation indicator: %w", )
	}
	var (
		    = binary.LittleEndian.Uint32()
		 int32
	)
	switch  {
	case 0:
		// EOS message.
		return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
	case kIPCContToken:
		_,  = io.ReadFull(.r, )
		if  != nil {
			return nil, fmt.Errorf("arrow/ipc: could not read message length: %w", )
		}
		 = int32(binary.LittleEndian.Uint32())
		if  == 0 {
			// optional 0 EOS control message
			return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
		}

	default:
		// ARROW-6314: backwards compatibility for reading old IPC
		// messages produced prior to version 0.15.0
		 = int32()
	}

	 = make([]byte, )
	_,  = io.ReadFull(.r, )
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read message metadata: %w", )
	}

	 := flatbuf.GetRootAsMessage(, 0)
	 := .BodyLength()

	 := memory.NewResizableBuffer(.mem)
	defer .Release()
	.Resize(int())

	_,  = io.ReadFull(.r, .Bytes())
	if  != nil {
		return nil, fmt.Errorf("arrow/ipc: could not read message body: %w", )
	}

	if .msg != nil {
		.msg.Release()
		.msg = nil
	}
	.msg = newMessageFromFB(, )

	return .msg, nil
}