// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
	
	
	

	
	
	
	
	
)

// PayloadWriter is an interface for injecting a different payloadwriter
// allowing more reusability with the Writer object with other scenarios,
// such as with Flight data
type PayloadWriter interface {
	Start() error
	WritePayload(Payload) error
	Close() error
}

type fileWriter struct {
	streamWriter

	schema *arrow.Schema
	dicts  []dataBlock
	recs   []dataBlock
}

func ( *fileWriter) () error {
	var  error

	// only necessary to align to 8-byte boundary at the start of the file
	_,  = .Write(Magic)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not write magic Arrow bytes: %w", )
	}

	 = .align(kArrowIPCAlignment)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not align start block: %w", )
	}

	return .streamWriter.Start()
}

func ( *fileWriter) ( Payload) error {
	 := fileBlock{offset: .pos, meta: 0, body: .size}
	,  := writeIPCPayload(, )
	if  != nil {
		return 
	}

	.meta = int32()

	switch flatbuf.MessageHeader(.msg) {
	case flatbuf.MessageHeaderDictionaryBatch:
		.dicts = append(.dicts, )
	case flatbuf.MessageHeaderRecordBatch:
		.recs = append(.recs, )
	}

	return nil
}

func ( *fileWriter) () error {
	var  error

	if  = .streamWriter.Close();  != nil {
		return 
	}

	 := .pos
	if  = writeFileFooter(.schema, .dicts, .recs, );  != nil {
		return fmt.Errorf("arrow/ipc: could not write file footer: %w", )
	}

	 := .pos - 
	if  <= 0 {
		return fmt.Errorf("arrow/ipc: invalid file footer size (size=%d)", )
	}

	 := make([]byte, 4)
	binary.LittleEndian.PutUint32(, uint32())
	_,  = .Write()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not write file footer size: %w", )
	}

	_,  = .Write(Magic)
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not write Arrow magic bytes: %w", )
	}

	return nil
}

func ( *fileWriter) ( int32) error {
	 := paddedLength(.pos, ) - .pos
	if  == 0 {
		return nil
	}

	,  := .Write(paddingBytes[:int()])
	return 
}

func writeIPCPayload( io.Writer,  Payload) (int, error) {
	,  := writeMessage(.meta, kArrowIPCAlignment, )
	if  != nil {
		return , 
	}

	// now write the buffers
	for ,  := range .body {
		var (
			    int64
			 int64
		)

		// the buffer might be null if we are handling zero row lengths.
		if  != nil {
			 = int64(.Len())
			 = bitutil.CeilByte64() - 
		}

		if  > 0 {
			_,  = .Write(.Bytes())
			if  != nil {
				return , fmt.Errorf("arrow/ipc: could not write payload message body: %w", )
			}
		}

		if  > 0 {
			_,  = .Write(paddingBytes[:])
			if  != nil {
				return , fmt.Errorf("arrow/ipc: could not write payload message padding: %w", )
			}
		}
	}

	return , 
}

// Payload is the underlying message object which is passed to the payload writer
// for actually writing out ipc messages
type Payload struct {
	msg  MessageType
	meta *memory.Buffer
	body []*memory.Buffer
	size int64 // length of body
}

// Meta returns the buffer containing the metadata for this payload,
// callers must call Release on the buffer
func ( *Payload) () *memory.Buffer {
	if .meta != nil {
		.meta.Retain()
	}
	return .meta
}

// SerializeBody serializes the body buffers and writes them to the provided
// writer.
func ( *Payload) ( io.Writer) error {
	for ,  := range .body {
		if  == nil {
			continue
		}

		 := int64(.Len())
		 := bitutil.CeilByte64() - 
		if  > 0 {
			if ,  := .Write(.Bytes());  != nil {
				return fmt.Errorf("arrow/ipc: could not write payload message body: %w", )
			}

			if  > 0 {
				if ,  := .Write(paddingBytes[:]);  != nil {
					return fmt.Errorf("arrow/ipc: could not write payload message padding bytes: %w", )
				}
			}
		}
	}
	return nil
}

// WritePayload serializes the payload in IPC format
// into the provided writer.
func ( *Payload) ( io.Writer) (int, error) {
	return writeIPCPayload(, *)
}

func ( *Payload) () {
	if .meta != nil {
		.meta.Release()
		.meta = nil
	}
	for ,  := range .body {
		if  == nil {
			continue
		}
		.Release()
		.body[] = nil
	}
}

type payloads []Payload

func ( payloads) () {
	for  := range  {
		[].Release()
	}
}

// FileWriter is an Arrow file writer.
type FileWriter struct {
	w io.Writer

	mem memory.Allocator

	headerStarted bool
	footerWritten bool

	pw PayloadWriter

	schema          *arrow.Schema
	mapper          dictutils.Mapper
	codec           flatbuf.CompressionType
	compressNP      int
	compressors     []compressor
	minSpaceSavings *float64

	// map of the last written dictionaries by id
	// so we can avoid writing the same dictionary over and over
	// also needed for correctness when writing IPC format which
	// does not allow replacements or deltas.
	lastWrittenDicts map[int64]arrow.Array
}

// NewFileWriter opens an Arrow file using the provided writer w.
func ( io.Writer,  ...Option) (*FileWriter, error) {
	var (
		 = newConfig(...)
		 error
	)

	 := FileWriter{
		w:               ,
		pw:              &fileWriter{streamWriter: streamWriter{w: }, schema: .schema},
		mem:             .alloc,
		schema:          .schema,
		codec:           .codec,
		compressNP:      .compressNP,
		minSpaceSavings: .minSpaceSavings,
		compressors:     make([]compressor, .compressNP),
	}

	return &, 
}

func ( *FileWriter) () error {
	 := .checkStarted()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not write empty file: %w", )
	}

	if .footerWritten {
		return nil
	}

	 = .pw.Close()
	if  != nil {
		return fmt.Errorf("arrow/ipc: could not close payload writer: %w", )
	}
	.footerWritten = true

	return nil
}

func ( *FileWriter) ( arrow.RecordBatch) error {
	 := .Schema()
	if  == nil || !.Equal(.schema) {
		return errInconsistentSchema
	}

	if  := .checkStarted();  != nil {
		return fmt.Errorf("arrow/ipc: could not write header: %w", )
	}

	const  = true
	var (
		 = Payload{msg: MessageRecordBatch}
		  = newRecordEncoder(
			.mem, 0, kMaxNestingDepth, , .codec, .compressNP, .minSpaceSavings, .compressors,
		)
	)
	defer .Release()

	 := writeDictionaryPayloads(.mem, , true, false, &.mapper, .lastWrittenDicts, .pw, )
	if  != nil {
		return fmt.Errorf("arrow/ipc: failure writing dictionary batches: %w", )
	}

	.reset()
	if  := .Encode(&, );  != nil {
		return fmt.Errorf("arrow/ipc: could not encode record to payload: %w", )
	}

	return .pw.WritePayload()
}

func ( *FileWriter) () error {
	if !.headerStarted {
		return .start()
	}
	return nil
}

func ( *FileWriter) () error {
	.headerStarted = true
	 := .pw.Start()
	if  != nil {
		return 
	}

	.mapper.ImportSchema(.schema)
	.lastWrittenDicts = make(map[int64]arrow.Array)

	// write out schema payloads
	 := payloadFromSchema(.schema, .mem, &.mapper)
	defer .Release()

	for ,  := range  {
		 = .pw.WritePayload()
		if  != nil {
			return 
		}
	}

	return nil
}