// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	
	
	
	

	
	
	
	
)

// RecordReader reads a stream of records.
type RecordReader interface {
	Retain()
	Release()

	Schema() *arrow.Schema

	Next() bool
	RecordBatch() arrow.RecordBatch
	// Deprecated: Use [RecordBatch] instead.
	Record() arrow.Record
	Err() error
}

// simpleRecords is a simple iterator over a collection of records.
type simpleRecords struct {
	refCount atomic.Int64

	schema *arrow.Schema
	recs   []arrow.RecordBatch
	cur    arrow.RecordBatch
}

// NewRecordReader returns a simple iterator over the given slice of records.
func ( *arrow.Schema,  []arrow.RecordBatch) (RecordReader, error) {
	 := &simpleRecords{
		schema: ,
		recs:   ,
		cur:    nil,
	}
	.refCount.Add(1)

	for ,  := range .recs {
		.Retain()
	}

	for ,  := range  {
		if !.Schema().Equal(.schema) {
			.Release()
			return nil, fmt.Errorf("arrow/array: mismatch schema")
		}
	}

	return , nil
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *simpleRecords) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *simpleRecords) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		if .cur != nil {
			.cur.Release()
		}
		for ,  := range .recs {
			.Release()
		}
		.recs = nil
	}
}

func ( *simpleRecords) () *arrow.Schema          { return .schema }
func ( *simpleRecords) () arrow.RecordBatch { return .cur }

// Deprecated: Use [RecordBatch] instead.
func ( *simpleRecords) () arrow.Record { return .RecordBatch() }
func ( *simpleRecords) () bool {
	if len(.recs) == 0 {
		return false
	}
	if .cur != nil {
		.cur.Release()
	}
	.cur = .recs[0]
	.recs = .recs[1:]
	return true
}
func ( *simpleRecords) () error { return nil }

// simpleRecord is a basic, non-lazy in-memory record batch.
type simpleRecord struct {
	refCount atomic.Int64

	schema *arrow.Schema

	rows int64
	arrs []arrow.Array
}

// NewRecordBatch returns a basic, non-lazy in-memory record batch.
//
// NewRecordBatch panics if the columns and schema are inconsistent.
// NewRecordBatch panics if rows is larger than the height of the columns.
func ( *arrow.Schema,  []arrow.Array,  int64) arrow.RecordBatch {
	 := &simpleRecord{
		schema: ,
		rows:   ,
		arrs:   make([]arrow.Array, len()),
	}
	.refCount.Add(1)

	copy(.arrs, )
	for ,  := range .arrs {
		.Retain()
	}

	if .rows < 0 {
		switch len(.arrs) {
		case 0:
			.rows = 0
		default:
			.rows = int64(.arrs[0].Len())
		}
	}

	 := .validate()
	if  != nil {
		.Release()
		panic()
	}

	return 
}

// Deprecated: Use [NewRecordBatch] instead.
func ( *arrow.Schema,  []arrow.Array,  int64) arrow.Record {
	return NewRecordBatch(, , )
}

func ( *simpleRecord) ( int,  arrow.Array) (arrow.RecordBatch, error) {
	if  < 0 ||  >= len(.arrs) {
		return nil, fmt.Errorf("arrow/array: column index out of range [0, %d): got=%d", len(.arrs), )
	}

	if .Len() != int(.rows) {
		return nil, fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d",
			.schema.Field().Name,
			.Len(), .rows,
		)
	}

	 := .schema.Field()
	if !arrow.TypeEqual(.Type, .DataType()) {
		return nil, fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v",
			.Name,
			.DataType(), .Type,
		)
	}
	 := make([]arrow.Array, len(.arrs))
	copy(, .arrs)
	[] = 

	return NewRecordBatch(.schema, , .rows), nil
}

func ( *simpleRecord) () error {
	if .rows == 0 && len(.arrs) == 0 {
		return nil
	}

	if len(.arrs) != .schema.NumFields() {
		return fmt.Errorf("arrow/array: number of columns/fields mismatch")
	}

	for ,  := range .arrs {
		 := .schema.Field()
		if int64(.Len()) < .rows {
			return fmt.Errorf("arrow/array: mismatch number of rows in column %q: got=%d, want=%d",
				.Name,
				.Len(), .rows,
			)
		}
		if !arrow.TypeEqual(.Type, .DataType()) {
			return fmt.Errorf("arrow/array: column %q type mismatch: got=%v, want=%v",
				.Name,
				.DataType(), .Type,
			)
		}
	}
	return nil
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *simpleRecord) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *simpleRecord) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		for ,  := range .arrs {
			.Release()
		}
		.arrs = nil
	}
}

func ( *simpleRecord) () *arrow.Schema    { return .schema }
func ( *simpleRecord) () int64           { return .rows }
func ( *simpleRecord) () int64           { return int64(len(.arrs)) }
func ( *simpleRecord) () []arrow.Array   { return .arrs }
func ( *simpleRecord) ( int) arrow.Array { return .arrs[] }
func ( *simpleRecord) ( int) string  { return .schema.Field().Name }

// NewSlice constructs a zero-copy slice of the record with the indicated
// indices i and j, corresponding to array[i:j].
// The returned record must be Release()'d after use.
//
// NewSlice panics if the slice is outside the valid range of the record array.
// NewSlice panics if j < i.
func ( *simpleRecord) (,  int64) arrow.RecordBatch {
	 := make([]arrow.Array, len(.arrs))
	for ,  := range .arrs {
		[] = NewSlice(, , )
	}
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()
	return NewRecordBatch(.schema, , -)
}

func ( *simpleRecord) () string {
	 := new(strings.Builder)
	fmt.Fprintf(, "record:\n  %v\n", .schema)
	fmt.Fprintf(, "  rows: %d\n", .rows)
	for ,  := range .arrs {
		fmt.Fprintf(, "  col[%d][%s]: %v\n", , .schema.Field().Name, )
	}

	return .String()
}

func ( *simpleRecord) () ([]byte, error) {
	 := RecordToStructArray()
	defer .Release()
	return .MarshalJSON()
}

// RecordBuilder eases the process of building a Record, iteratively, from
// a known Schema.
type RecordBuilder struct {
	refCount atomic.Int64
	mem      memory.Allocator
	schema   *arrow.Schema
	fields   []Builder
}

// NewRecordBuilder returns a builder, using the provided memory allocator and a schema.
func ( memory.Allocator,  *arrow.Schema) *RecordBuilder {
	 := &RecordBuilder{
		mem:    ,
		schema: ,
		fields: make([]Builder, .NumFields()),
	}
	.refCount.Add(1)

	for  := 0;  < .NumFields(); ++ {
		.fields[] = NewBuilder(.mem, .Field().Type)
	}

	return 
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *RecordBuilder) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
func ( *RecordBuilder) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		for ,  := range .fields {
			.Release()
		}
		.fields = nil
	}
}

func ( *RecordBuilder) () *arrow.Schema { return .schema }
func ( *RecordBuilder) () []Builder     { return .fields }
func ( *RecordBuilder) ( int) Builder   { return .fields[] }

func ( *RecordBuilder) ( int) {
	for ,  := range .fields {
		.Reserve()
	}
}

// NewRecordBatch creates a new record batch from the memory buffers and resets the
// RecordBuilder so it can be used to build a new record batch.
//
// The returned RecordBatch must be Release()'d after use.
//
// NewRecordBatch panics if the fields' builder do not have the same length.
func ( *RecordBuilder) () arrow.RecordBatch {
	 := make([]arrow.Array, len(.fields))
	 := int64(0)

	defer func( []arrow.Array) {
		for ,  := range  {
			if  == nil {
				continue
			}
			.Release()
		}
	}()

	for ,  := range .fields {
		[] = .NewArray()
		 := int64([].Len())
		if  > 0 &&  !=  {
			panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", , , ))
		}
		 = 
	}

	return NewRecordBatch(.schema, , )
}

// Deprecated: Use [NewRecordBatch] instead.
func ( *RecordBuilder) () arrow.Record {
	return .NewRecordBatch()
}

// UnmarshalJSON for record builder will read in a single object and add the values
// to each field in the recordbuilder, missing fields will get a null and unexpected
// keys will be ignored. If reading in an array of records as a single batch, then use
// a structbuilder and use RecordFromStruct.
func ( *RecordBuilder) ( []byte) error {
	 := json.NewDecoder(bytes.NewReader())
	// should start with a '{'
	,  := .Token()
	if  != nil {
		return 
	}

	if ,  := .(json.Delim); ! ||  != '{' {
		return fmt.Errorf("record should start with '{', not %s", )
	}

	 := make(map[string]bool)
	for .More() {
		,  := .Token()
		if  != nil {
			return 
		}

		 := .(string)
		if [] {
			return fmt.Errorf("key %s shows up twice in row to be decoded", )
		}
		[] = true

		 := .schema.FieldIndices()
		if len() == 0 {
			var  interface{}
			if  := .Decode(&);  != nil {
				return 
			}
			continue
		}

		if  := .fields[[0]].UnmarshalOne();  != nil {
			return 
		}
	}

	for  := 0;  < .schema.NumFields(); ++ {
		if ![.schema.Field().Name] {
			.fields[].AppendNull()
		}
	}
	return nil
}

type iterReader struct {
	refCount atomic.Int64

	schema *arrow.Schema
	cur    arrow.RecordBatch

	next func() (arrow.RecordBatch, error, bool)
	stop func()

	err error
}

func ( *iterReader) () *arrow.Schema { return .schema }

func ( *iterReader) () { .refCount.Add(1) }
func ( *iterReader) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		.stop()
		.schema, .next = nil, nil
		if .cur != nil {
			.cur.Release()
		}
	}
}

func ( *iterReader) () arrow.RecordBatch { return .cur }

// Deprecated: Use [RecordBatch] instead.
func ( *iterReader) () arrow.Record { return .RecordBatch() }
func ( *iterReader) () error           { return .err }

func ( *iterReader) () bool {
	if .cur != nil {
		.cur.Release()
	}

	var  bool
	.cur, .err,  = .next()
	if .err != nil {
		.stop()
		return false
	}

	return 
}

// ReaderFromIter wraps a go iterator for arrow.RecordBatch + error into a RecordReader
// interface object for ease of use.
func ( *arrow.Schema,  iter.Seq2[arrow.RecordBatch, error]) RecordReader {
	,  := iter.Pull2()
	 := &iterReader{
		schema: ,
		next:   ,
		stop:   ,
	}
	.refCount.Add(1)
	return 
}

// IterFromReader converts a RecordReader interface into an iterator that
// you can use range on. The semantics are still important, if a record
// that is returned is desired to be utilized beyond the scope of an iteration
// then Retain must be called on it.
func ( RecordReader) iter.Seq2[arrow.RecordBatch, error] {
	.Retain()
	return func( func(arrow.RecordBatch, error) bool) {
		defer .Release()
		for .Next() {
			if !(.RecordBatch(), nil) {
				return
			}
		}

		if .Err() != nil {
			(nil, .Err())
		}
	}
}

var (
	_ arrow.RecordBatch = (*simpleRecord)(nil)
	_ RecordReader      = (*simpleRecords)(nil)
)