// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package array

import (
	
	
	
	
	

	
	
)

// NewColumnSlice returns a new zero-copy slice of the column with the indicated
// indices i and j, corresponding to the column's array[i:j].
// The returned column must be Release()'d after use.
//
// NewColSlice panics if the slice is outside the valid range of the column's array.
// NewColSlice panics if j < i.
func ( *arrow.Column, ,  int64) *arrow.Column {
	 := NewChunkedSlice(.Data(), , )
	defer .Release()
	return arrow.NewColumn(.Field(), )
}

// NewChunkedSlice constructs a zero-copy slice of the chunked array with the indicated
// indices i and j, corresponding to array[i:j].
// The returned chunked array must be Release()'d after use.
//
// NewSlice panics if the slice is outside the valid range of the input array.
// NewSlice panics if j < i.
func ( *arrow.Chunked, ,  int64) *arrow.Chunked {
	if  > int64(.Len()) ||  >  ||  > int64(.Len()) {
		panic("arrow/array: index out of range")
	}

	var (
		    = 0
		    = 
		     =  - 
		 = make([]arrow.Array, 0, len(.Chunks()))
	)

	for  < len(.Chunks()) &&  >= int64(.Chunks()[].Len()) {
		 -= int64(.Chunks()[].Len())
		++
	}

	for  < len(.Chunks()) &&  > 0 {
		 := .Chunks()[]
		 :=  + 
		if  > int64(.Len()) {
			 = int64(.Len())
		}
		 = append(, NewSlice(, , ))
		 -= int64(.Len()) - 
		 = 0
		++
	}
	 = [:len():len()]
	defer func() {
		for ,  := range  {
			.Release()
		}
	}()

	return arrow.NewChunked(.DataType(), )
}

// simpleTable is a basic, non-lazy in-memory table.
type simpleTable struct {
	refCount atomic.Int64

	rows int64
	cols []arrow.Column

	schema *arrow.Schema
}

// NewTable returns a new basic, non-lazy in-memory table.
// If rows is negative, the number of rows will be inferred from the height
// of the columns.
//
// NewTable panics if the columns and schema are inconsistent.
// NewTable panics if rows is larger than the height of the columns.
func ( *arrow.Schema,  []arrow.Column,  int64) arrow.Table {
	 := simpleTable{
		rows:   ,
		cols:   ,
		schema: ,
	}
	.refCount.Add(1)

	if .rows < 0 {
		switch len(.cols) {
		case 0:
			.rows = 0
		default:
			.rows = int64(.cols[0].Len())
		}
	}

	// validate the table and its constituents.
	// note we retain the columns after having validated the table
	// in case the validation fails and panics (and would otherwise leak
	// a ref-count on the columns.)
	.validate()

	for  := range .cols {
		.cols[].Retain()
	}

	return &
}

// NewTableFromSlice is a convenience function to create a table from a slice
// of slices of arrow.Array.
//
// Like other NewTable functions this can panic if:
//   - len(schema.Fields) != len(data)
//   - the total length of each column's array slice (ie: number of rows
//     in the column) aren't the same for all columns.
func ( *arrow.Schema,  [][]arrow.Array) arrow.Table {
	if len() != .NumFields() {
		panic("array/table: mismatch in number of columns and data for creating a table")
	}

	 := make([]arrow.Column, .NumFields())
	for ,  := range  {
		 := .Field()
		 := arrow.NewChunked(.Type, )
		[] = *arrow.NewColumn(, )
		.Release()
	}

	 := simpleTable{
		schema: ,
		cols:   ,
		rows:   int64([0].Len()),
	}
	.refCount.Add(1)

	defer func() {
		if  := recover();  != nil {
			// if validate panics, let's release the columns
			// so that we don't leak them, then propagate the panic
			for ,  := range  {
				.Release()
			}
			panic()
		}
	}()
	// validate the table and its constituents.
	.validate()

	return &
}

// NewTableFromRecords returns a new basic, non-lazy in-memory table.
//
// NewTableFromRecords panics if the records and schema are inconsistent.
func ( *arrow.Schema,  []arrow.RecordBatch) arrow.Table {
	 := make([]arrow.Array, len())
	 := make([]arrow.Column, .NumFields())

	defer func( []arrow.Column) {
		for  := range  {
			[].Release()
		}
	}()

	for  := range  {
		 := .Field()
		for ,  := range  {
			[] = .Column()
		}
		 := arrow.NewChunked(.Type, )
		[] = *arrow.NewColumn(, )
		.Release()
	}

	return NewTable(, , -1)
}

func ( *simpleTable) () *arrow.Schema { return .schema }

func ( *simpleTable) ( int,  arrow.Field,  arrow.Column) (arrow.Table, error) {
	if int64(.Len()) != .rows {
		return nil, fmt.Errorf("arrow/array: column length mismatch: %d != %d", .Len(), .rows)
	}
	if .Type != .DataType() {
		return nil, fmt.Errorf("arrow/array: column type mismatch: %v != %v", .Type, .DataType())
	}
	,  := .schema.AddField(, )
	if  != nil {
		return nil, 
	}
	 := make([]arrow.Column, len(.cols)+1)
	copy([:], .cols[:])
	[] = 
	copy([+1:], .cols[:])
	 := NewTable(, , .rows)
	return , nil
}

func ( *simpleTable) () int64             { return .rows }
func ( *simpleTable) () int64             { return int64(len(.cols)) }
func ( *simpleTable) ( int) *arrow.Column { return &.cols[] }

func ( *simpleTable) () {
	if len(.cols) != .schema.NumFields() {
		panic(errors.New("arrow/array: table schema mismatch"))
	}
	for ,  := range .cols {
		if !.Field().Equal(.schema.Field()) {
			panic(fmt.Errorf("arrow/array: column field %q is inconsistent with schema", .Name()))
		}

		if int64(.Len()) < .rows {
			panic(fmt.Errorf("arrow/array: column %q expected length >= %d but got length %d", .Name(), .rows, .Len()))
		}
	}
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *simpleTable) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *simpleTable) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		for  := range .cols {
			.cols[].Release()
		}
		.cols = nil
	}
}

func ( *simpleTable) () string {
	 := new(strings.Builder)
	.WriteString(.Schema().String())
	.WriteString("\n")

	for  := 0;  < int(.NumCols()); ++ {
		 := .Column()
		.WriteString(.Field().Name + ": [")
		for ,  := range .Data().Chunks() {
			if  != 0 {
				.WriteString(", ")
			}
			.WriteString(.String())
		}
		.WriteString("]\n")
	}
	return .String()
}

// TableReader is a Record iterator over a (possibly chunked) Table
type TableReader struct {
	refCount atomic.Int64

	tbl   arrow.Table
	cur   int64             // current row
	max   int64             // total number of rows
	rec   arrow.RecordBatch // current RecordBatch
	chksz int64             // chunk size

	chunks  []*arrow.Chunked
	slots   []int   // chunk indices
	offsets []int64 // chunk offsets
}

// NewTableReader returns a new TableReader to iterate over the (possibly chunked) Table.
// if chunkSize is <= 0, the biggest possible chunk will be selected.
func ( arrow.Table,  int64) *TableReader {
	 := .NumCols()
	 := &TableReader{
		tbl:     ,
		cur:     0,
		max:     int64(.NumRows()),
		chksz:   ,
		chunks:  make([]*arrow.Chunked, ),
		slots:   make([]int, ),
		offsets: make([]int64, ),
	}
	.refCount.Add(1)
	.tbl.Retain()

	if .chksz <= 0 {
		.chksz = math.MaxInt64
	}

	for  := range .chunks {
		 := .tbl.Column()
		.chunks[] = .Data()
		.chunks[].Retain()
	}
	return 
}

func ( *TableReader) () *arrow.Schema          { return .tbl.Schema() }
func ( *TableReader) () arrow.RecordBatch { return .rec }

// Deprecated: Use [RecordBatch] instead.
func ( *TableReader) () arrow.Record { return .RecordBatch() }

func ( *TableReader) () bool {
	if .cur >= .max {
		return false
	}

	if .rec != nil {
		.rec.Release()
	}

	// determine the minimum contiguous slice across all columns
	 := imin64(.max, .chksz)
	 := make([]arrow.Array, len(.chunks))
	for  := range  {
		 := .slots[]
		 := .chunks[].Chunk()
		 := int64(.Len()) - .offsets[]
		if  <  {
			 = 
		}

		[] = 
	}

	// slice the chunks, advance each chunk slot as appropriate.
	 := make([]arrow.Array, len(.chunks))
	for ,  := range  {
		var  arrow.Array
		 := .offsets[]
		switch int64(.Len()) -  {
		case :
			.slots[]++
			.offsets[] = 0
			if  > 0 {
				// need to slice
				 = NewSlice(, , +)
			} else {
				// no need to slice
				 = 
				.Retain()
			}
		default:
			.offsets[] += 
			 = NewSlice(, , +)
		}
		[] = 
	}

	.cur += 
	.rec = NewRecord(.tbl.Schema(), , )

	for ,  := range  {
		.Release()
	}

	return true
}

// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func ( *TableReader) () {
	.refCount.Add(1)
}

// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func ( *TableReader) () {
	debug.Assert(.refCount.Load() > 0, "too many releases")

	if .refCount.Add(-1) == 0 {
		.tbl.Release()
		for ,  := range .chunks {
			.Release()
		}
		if .rec != nil {
			.rec.Release()
		}
		.tbl = nil
		.chunks = nil
		.slots = nil
		.offsets = nil
	}
}
func ( *TableReader) () error { return nil }

func imin64(,  int64) int64 {
	if  <  {
		return 
	}
	return 
}

var (
	_ arrow.Table  = (*simpleTable)(nil)
	_ RecordReader = (*TableReader)(nil)
)