package array
import (
"errors"
"fmt"
"math"
"strings"
"sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
)
func NewColumnSlice (col *arrow .Column , i , j int64 ) *arrow .Column {
slice := NewChunkedSlice (col .Data (), i , j )
defer slice .Release ()
return arrow .NewColumn (col .Field (), slice )
}
func NewChunkedSlice (a *arrow .Chunked , i , j int64 ) *arrow .Chunked {
if j > int64 (a .Len ()) || i > j || i > int64 (a .Len ()) {
panic ("arrow/array: index out of range" )
}
var (
cur = 0
beg = i
sz = j - i
chunks = make ([]arrow .Array , 0 , len (a .Chunks ()))
)
for cur < len (a .Chunks ()) && beg >= int64 (a .Chunks ()[cur ].Len ()) {
beg -= int64 (a .Chunks ()[cur ].Len ())
cur ++
}
for cur < len (a .Chunks ()) && sz > 0 {
arr := a .Chunks ()[cur ]
end := beg + sz
if end > int64 (arr .Len ()) {
end = int64 (arr .Len ())
}
chunks = append (chunks , NewSlice (arr , beg , end ))
sz -= int64 (arr .Len ()) - beg
beg = 0
cur ++
}
chunks = chunks [:len (chunks ):len (chunks )]
defer func () {
for _ , chunk := range chunks {
chunk .Release ()
}
}()
return arrow .NewChunked (a .DataType (), chunks )
}
type simpleTable struct {
refCount atomic .Int64
rows int64
cols []arrow .Column
schema *arrow .Schema
}
func NewTable (schema *arrow .Schema , cols []arrow .Column , rows int64 ) arrow .Table {
tbl := simpleTable {
rows : rows ,
cols : cols ,
schema : schema ,
}
tbl .refCount .Add (1 )
if tbl .rows < 0 {
switch len (tbl .cols ) {
case 0 :
tbl .rows = 0
default :
tbl .rows = int64 (tbl .cols [0 ].Len ())
}
}
tbl .validate ()
for i := range tbl .cols {
tbl .cols [i ].Retain ()
}
return &tbl
}
func NewTableFromSlice (schema *arrow .Schema , data [][]arrow .Array ) arrow .Table {
if len (data ) != schema .NumFields () {
panic ("array/table: mismatch in number of columns and data for creating a table" )
}
cols := make ([]arrow .Column , schema .NumFields ())
for i , arrs := range data {
field := schema .Field (i )
chunked := arrow .NewChunked (field .Type , arrs )
cols [i ] = *arrow .NewColumn (field , chunked )
chunked .Release ()
}
tbl := simpleTable {
schema : schema ,
cols : cols ,
rows : int64 (cols [0 ].Len ()),
}
tbl .refCount .Add (1 )
defer func () {
if r := recover (); r != nil {
for _ , c := range cols {
c .Release ()
}
panic (r )
}
}()
tbl .validate ()
return &tbl
}
func NewTableFromRecords (schema *arrow .Schema , recs []arrow .RecordBatch ) arrow .Table {
arrs := make ([]arrow .Array , len (recs ))
cols := make ([]arrow .Column , schema .NumFields ())
defer func (cols []arrow .Column ) {
for i := range cols {
cols [i ].Release ()
}
}(cols )
for i := range cols {
field := schema .Field (i )
for j , rec := range recs {
arrs [j ] = rec .Column (i )
}
chunk := arrow .NewChunked (field .Type , arrs )
cols [i ] = *arrow .NewColumn (field , chunk )
chunk .Release ()
}
return NewTable (schema , cols , -1 )
}
func (tbl *simpleTable ) Schema () *arrow .Schema { return tbl .schema }
func (tbl *simpleTable ) AddColumn (i int , field arrow .Field , column arrow .Column ) (arrow .Table , error ) {
if int64 (column .Len ()) != tbl .rows {
return nil , fmt .Errorf ("arrow/array: column length mismatch: %d != %d" , column .Len (), tbl .rows )
}
if field .Type != column .DataType () {
return nil , fmt .Errorf ("arrow/array: column type mismatch: %v != %v" , field .Type , column .DataType ())
}
newSchema , err := tbl .schema .AddField (i , field )
if err != nil {
return nil , err
}
cols := make ([]arrow .Column , len (tbl .cols )+1 )
copy (cols [:i ], tbl .cols [:i ])
cols [i ] = column
copy (cols [i +1 :], tbl .cols [i :])
newTable := NewTable (newSchema , cols , tbl .rows )
return newTable , nil
}
func (tbl *simpleTable ) NumRows () int64 { return tbl .rows }
func (tbl *simpleTable ) NumCols () int64 { return int64 (len (tbl .cols )) }
func (tbl *simpleTable ) Column (i int ) *arrow .Column { return &tbl .cols [i ] }
func (tbl *simpleTable ) validate () {
if len (tbl .cols ) != tbl .schema .NumFields () {
panic (errors .New ("arrow/array: table schema mismatch" ))
}
for i , col := range tbl .cols {
if !col .Field ().Equal (tbl .schema .Field (i )) {
panic (fmt .Errorf ("arrow/array: column field %q is inconsistent with schema" , col .Name ()))
}
if int64 (col .Len ()) < tbl .rows {
panic (fmt .Errorf ("arrow/array: column %q expected length >= %d but got length %d" , col .Name (), tbl .rows , col .Len ()))
}
}
}
func (tbl *simpleTable ) Retain () {
tbl .refCount .Add (1 )
}
func (tbl *simpleTable ) Release () {
debug .Assert (tbl .refCount .Load () > 0 , "too many releases" )
if tbl .refCount .Add (-1 ) == 0 {
for i := range tbl .cols {
tbl .cols [i ].Release ()
}
tbl .cols = nil
}
}
func (tbl *simpleTable ) String () string {
o := new (strings .Builder )
o .WriteString (tbl .Schema ().String ())
o .WriteString ("\n" )
for i := 0 ; i < int (tbl .NumCols ()); i ++ {
col := tbl .Column (i )
o .WriteString (col .Field ().Name + ": [" )
for j , chunk := range col .Data ().Chunks () {
if j != 0 {
o .WriteString (", " )
}
o .WriteString (chunk .String ())
}
o .WriteString ("]\n" )
}
return o .String ()
}
type TableReader struct {
refCount atomic .Int64
tbl arrow .Table
cur int64
max int64
rec arrow .RecordBatch
chksz int64
chunks []*arrow .Chunked
slots []int
offsets []int64
}
func NewTableReader (tbl arrow .Table , chunkSize int64 ) *TableReader {
ncols := tbl .NumCols ()
tr := &TableReader {
tbl : tbl ,
cur : 0 ,
max : int64 (tbl .NumRows ()),
chksz : chunkSize ,
chunks : make ([]*arrow .Chunked , ncols ),
slots : make ([]int , ncols ),
offsets : make ([]int64 , ncols ),
}
tr .refCount .Add (1 )
tr .tbl .Retain ()
if tr .chksz <= 0 {
tr .chksz = math .MaxInt64
}
for i := range tr .chunks {
col := tr .tbl .Column (i )
tr .chunks [i ] = col .Data ()
tr .chunks [i ].Retain ()
}
return tr
}
func (tr *TableReader ) Schema () *arrow .Schema { return tr .tbl .Schema () }
func (tr *TableReader ) RecordBatch () arrow .RecordBatch { return tr .rec }
func (tr *TableReader ) Record () arrow .Record { return tr .RecordBatch () }
func (tr *TableReader ) Next () bool {
if tr .cur >= tr .max {
return false
}
if tr .rec != nil {
tr .rec .Release ()
}
chunksz := imin64 (tr .max , tr .chksz )
chunks := make ([]arrow .Array , len (tr .chunks ))
for i := range chunks {
j := tr .slots [i ]
chunk := tr .chunks [i ].Chunk (j )
remain := int64 (chunk .Len ()) - tr .offsets [i ]
if remain < chunksz {
chunksz = remain
}
chunks [i ] = chunk
}
batch := make ([]arrow .Array , len (tr .chunks ))
for i , chunk := range chunks {
var slice arrow .Array
offset := tr .offsets [i ]
switch int64 (chunk .Len ()) - offset {
case chunksz :
tr .slots [i ]++
tr .offsets [i ] = 0
if offset > 0 {
slice = NewSlice (chunk , offset , offset +chunksz )
} else {
slice = chunk
slice .Retain ()
}
default :
tr .offsets [i ] += chunksz
slice = NewSlice (chunk , offset , offset +chunksz )
}
batch [i ] = slice
}
tr .cur += chunksz
tr .rec = NewRecord (tr .tbl .Schema (), batch , chunksz )
for _ , arr := range batch {
arr .Release ()
}
return true
}
func (tr *TableReader ) Retain () {
tr .refCount .Add (1 )
}
func (tr *TableReader ) Release () {
debug .Assert (tr .refCount .Load () > 0 , "too many releases" )
if tr .refCount .Add (-1 ) == 0 {
tr .tbl .Release ()
for _ , chk := range tr .chunks {
chk .Release ()
}
if tr .rec != nil {
tr .rec .Release ()
}
tr .tbl = nil
tr .chunks = nil
tr .slots = nil
tr .offsets = nil
}
}
func (tr *TableReader ) Err () error { return nil }
func imin64(a , b int64 ) int64 {
if a < b {
return a
}
return b
}
var (
_ arrow .Table = (*simpleTable )(nil )
_ RecordReader = (*TableReader )(nil )
)
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .