package array
import (
"bytes"
"fmt"
"math"
"reflect"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/encoded"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/json"
"github.com/apache/arrow-go/v18/internal/utils"
)
type RunEndEncoded struct {
array
ends arrow .Array
values arrow .Array
}
func NewRunEndEncodedArray (runEnds , values arrow .Array , logicalLength , offset int ) *RunEndEncoded {
data := NewData (arrow .RunEndEncodedOf (runEnds .DataType (), values .DataType ()), logicalLength ,
[]*memory .Buffer {nil }, []arrow .ArrayData {runEnds .Data (), values .Data ()}, 0 , offset )
defer data .Release ()
return NewRunEndEncodedData (data )
}
func NewRunEndEncodedData (data arrow .ArrayData ) *RunEndEncoded {
r := &RunEndEncoded {}
r .refCount .Add (1 )
r .setData (data .(*Data ))
return r
}
func (r *RunEndEncoded ) Values () arrow .Array { return r .values }
func (r *RunEndEncoded ) RunEndsArr () arrow .Array { return r .ends }
func (r *RunEndEncoded ) Retain () {
r .array .Retain ()
r .values .Retain ()
r .ends .Retain ()
}
func (r *RunEndEncoded ) Release () {
r .array .Release ()
r .values .Release ()
r .ends .Release ()
}
func (r *RunEndEncoded ) LogicalValuesArray () arrow .Array {
physOffset := r .GetPhysicalOffset ()
physLength := r .GetPhysicalLength ()
data := NewSliceData (r .data .Children ()[1 ], int64 (physOffset ), int64 (physOffset +physLength ))
defer data .Release ()
return MakeFromData (data )
}
func (r *RunEndEncoded ) LogicalRunEndsArray (mem memory .Allocator ) arrow .Array {
physOffset := r .GetPhysicalOffset ()
physLength := r .GetPhysicalLength ()
if r .data .offset == 0 {
data := NewSliceData (r .data .childData [0 ], 0 , int64 (physLength ))
defer data .Release ()
return MakeFromData (data )
}
bldr := NewBuilder (mem , r .data .childData [0 ].DataType ())
defer bldr .Release ()
bldr .Resize (physLength )
switch e := r .ends .(type ) {
case *Int16 :
for _ , v := range e .Int16Values ()[physOffset : physOffset +physLength ] {
v -= int16 (r .data .offset )
v = int16 (utils .Min (int (v ), r .data .length ))
bldr .(*Int16Builder ).Append (v )
}
case *Int32 :
for _ , v := range e .Int32Values ()[physOffset : physOffset +physLength ] {
v -= int32 (r .data .offset )
v = int32 (utils .Min (int (v ), r .data .length ))
bldr .(*Int32Builder ).Append (v )
}
case *Int64 :
for _ , v := range e .Int64Values ()[physOffset : physOffset +physLength ] {
v -= int64 (r .data .offset )
v = int64 (utils .Min (int (v ), r .data .length ))
bldr .(*Int64Builder ).Append (v )
}
}
return bldr .NewArray ()
}
func (r *RunEndEncoded ) setData (data *Data ) {
if len (data .childData ) != 2 {
panic (fmt .Errorf ("%w: arrow/array: RLE array must have exactly 2 children" , arrow .ErrInvalid ))
}
debug .Assert (data .dtype .ID () == arrow .RUN_END_ENCODED , "invalid type for RunLengthEncoded" )
if !data .dtype .(*arrow .RunEndEncodedType ).ValidRunEndsType (data .childData [0 ].DataType ()) {
panic (fmt .Errorf ("%w: arrow/array: run ends array must be int16, int32, or int64" , arrow .ErrInvalid ))
}
if data .childData [0 ].NullN () > 0 {
panic (fmt .Errorf ("%w: arrow/array: run ends array cannot contain nulls" , arrow .ErrInvalid ))
}
r .array .setData (data )
r .ends = MakeFromData (r .data .childData [0 ])
r .values = MakeFromData (r .data .childData [1 ])
}
func (r *RunEndEncoded ) GetPhysicalOffset () int {
return encoded .FindPhysicalOffset (r .data )
}
func (r *RunEndEncoded ) GetPhysicalLength () int {
return encoded .GetPhysicalLength (r .data )
}
func (r *RunEndEncoded ) GetPhysicalIndex (i int ) int {
return encoded .FindPhysicalIndex (r .data , i +r .data .offset )
}
func (r *RunEndEncoded ) ValueStr (i int ) string {
return r .values .ValueStr (r .GetPhysicalIndex (i ))
}
func (r *RunEndEncoded ) String () string {
var buf bytes .Buffer
buf .WriteByte ('[' )
for i := 0 ; i < r .ends .Len (); i ++ {
if i != 0 {
buf .WriteByte (',' )
}
value := r .values .GetOneForMarshal (i )
if byts , ok := value .(json .RawMessage ); ok {
value = string (byts )
}
fmt .Fprintf (&buf , "{%d -> %v}" , r .ends .GetOneForMarshal (i ), value )
}
buf .WriteByte (']' )
return buf .String ()
}
func (r *RunEndEncoded ) GetOneForMarshal (i int ) interface {} {
return r .values .GetOneForMarshal (r .GetPhysicalIndex (i ))
}
func (r *RunEndEncoded ) MarshalJSON () ([]byte , error ) {
var buf bytes .Buffer
enc := json .NewEncoder (&buf )
buf .WriteByte ('[' )
for i := 0 ; i < r .Len (); i ++ {
if i != 0 {
buf .WriteByte (',' )
}
if err := enc .Encode (r .GetOneForMarshal (i )); err != nil {
return nil , err
}
}
buf .WriteByte (']' )
return buf .Bytes (), nil
}
func arrayRunEndEncodedEqual(l , r *RunEndEncoded ) bool {
mr := encoded .NewMergedRuns ([2 ]arrow .Array {l , r })
for mr .Next () {
lIndex := mr .IndexIntoArray (0 )
rIndex := mr .IndexIntoArray (1 )
if !SliceEqual (l .values , lIndex , lIndex +1 , r .values , rIndex , rIndex +1 ) {
return false
}
}
return true
}
func arrayRunEndEncodedApproxEqual(l , r *RunEndEncoded , opt equalOption ) bool {
mr := encoded .NewMergedRuns ([2 ]arrow .Array {l , r })
for mr .Next () {
lIndex := mr .IndexIntoArray (0 )
rIndex := mr .IndexIntoArray (1 )
if !sliceApproxEqual (l .values , lIndex , lIndex +1 , r .values , rIndex , rIndex +1 , opt ) {
return false
}
}
return true
}
type RunEndEncodedBuilder struct {
builder
dt arrow .DataType
runEnds Builder
values Builder
maxRunEnd uint64
lastUnmarshalled interface {}
unmarshalled bool
lastStr *string
}
func NewRunEndEncodedBuilder (mem memory .Allocator , runEnds , encoded arrow .DataType ) *RunEndEncodedBuilder {
dt := arrow .RunEndEncodedOf (runEnds , encoded )
if !dt .ValidRunEndsType (runEnds ) {
panic ("arrow/ree: invalid runEnds type for run length encoded array" )
}
var maxEnd uint64
switch runEnds .ID () {
case arrow .INT16 :
maxEnd = math .MaxInt16
case arrow .INT32 :
maxEnd = math .MaxInt32
case arrow .INT64 :
maxEnd = math .MaxInt64
}
reb := &RunEndEncodedBuilder {
builder : builder {mem : mem },
dt : dt ,
runEnds : NewBuilder (mem , runEnds ),
values : NewBuilder (mem , encoded ),
maxRunEnd : maxEnd ,
lastUnmarshalled : nil ,
}
reb .refCount .Add (1 )
return reb
}
func (b *RunEndEncodedBuilder ) Type () arrow .DataType {
return b .dt
}
func (b *RunEndEncodedBuilder ) Release () {
debug .Assert (b .refCount .Load () > 0 , "too many releases" )
if b .refCount .Add (-1 ) == 0 {
b .values .Release ()
b .runEnds .Release ()
}
}
func (b *RunEndEncodedBuilder ) addLength (n uint64 ) {
if uint64 (b .length )+n > b .maxRunEnd {
panic (fmt .Errorf ("%w: %s array length must fit be less than %d" , arrow .ErrInvalid , b .dt , b .maxRunEnd ))
}
b .length += int (n )
}
func (b *RunEndEncodedBuilder ) finishRun () {
b .lastUnmarshalled = nil
b .lastStr = nil
b .unmarshalled = false
if b .length == 0 {
return
}
switch bldr := b .runEnds .(type ) {
case *Int16Builder :
bldr .Append (int16 (b .length ))
case *Int32Builder :
bldr .Append (int32 (b .length ))
case *Int64Builder :
bldr .Append (int64 (b .length ))
}
}
func (b *RunEndEncodedBuilder ) ValueBuilder () Builder { return b .values }
func (b *RunEndEncodedBuilder ) Append (n uint64 ) {
b .finishRun ()
b .addLength (n )
}
func (b *RunEndEncodedBuilder ) AppendRuns (runs []uint64 ) {
for _ , r := range runs {
b .finishRun ()
b .addLength (r )
}
}
func (b *RunEndEncodedBuilder ) ContinueRun (n uint64 ) {
b .addLength (n )
}
func (b *RunEndEncodedBuilder ) AppendNull () {
b .finishRun ()
b .values .AppendNull ()
b .addLength (1 )
}
func (b *RunEndEncodedBuilder ) AppendNulls (n int ) {
for i := 0 ; i < n ; i ++ {
b .AppendNull ()
}
}
func (b *RunEndEncodedBuilder ) NullN () int {
return UnknownNullCount
}
func (b *RunEndEncodedBuilder ) AppendEmptyValue () {
b .AppendNull ()
}
func (b *RunEndEncodedBuilder ) AppendEmptyValues (n int ) {
b .AppendNulls (n )
}
func (b *RunEndEncodedBuilder ) Reserve (n int ) {
b .values .Reserve (n )
b .runEnds .Reserve (n )
}
func (b *RunEndEncodedBuilder ) Resize (n int ) {
b .values .Resize (n )
b .runEnds .Resize (n )
}
func (b *RunEndEncodedBuilder ) NewRunEndEncodedArray () *RunEndEncoded {
data := b .newData ()
defer data .Release ()
return NewRunEndEncodedData (data )
}
func (b *RunEndEncodedBuilder ) NewArray () arrow .Array {
return b .NewRunEndEncodedArray ()
}
func (b *RunEndEncodedBuilder ) newData () (data *Data ) {
b .finishRun ()
values := b .values .NewArray ()
defer values .Release ()
runEnds := b .runEnds .NewArray ()
defer runEnds .Release ()
data = NewData (
b .dt , b .length , []*memory .Buffer {},
[]arrow .ArrayData {runEnds .Data (), values .Data ()}, 0 , 0 )
b .reset ()
return
}
func (b *RunEndEncodedBuilder ) AppendValueFromString (s string ) error {
if b .unmarshalled {
return fmt .Errorf ("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented" , arrow .ErrNotImplemented )
}
if s == NullValueStr {
b .AppendNull ()
return nil
}
if b .lastStr != nil && s == *b .lastStr {
b .ContinueRun (1 )
return nil
}
b .Append (1 )
lastStr := s
b .lastStr = &lastStr
return b .ValueBuilder ().AppendValueFromString (s )
}
func (b *RunEndEncodedBuilder ) UnmarshalOne (dec *json .Decoder ) error {
if b .lastStr != nil {
return fmt .Errorf ("%w: mixing AppendValueFromString & UnmarshalOne not yet implemented" , arrow .ErrNotImplemented )
}
var value interface {}
if err := dec .Decode (&value ); err != nil {
return err
}
if reflect .DeepEqual (value , b .lastUnmarshalled ) && (value != nil || b .runEnds .Len () != b .values .Len ()) {
b .ContinueRun (1 )
return nil
}
data , err := json .Marshal (value )
if err != nil {
return err
}
b .Append (1 )
b .lastUnmarshalled = value
b .unmarshalled = true
return b .ValueBuilder ().UnmarshalOne (json .NewDecoder (bytes .NewReader (data )))
}
func (b *RunEndEncodedBuilder ) Unmarshal (dec *json .Decoder ) error {
b .finishRun ()
for dec .More () {
if err := b .UnmarshalOne (dec ); err != nil {
return err
}
}
return nil
}
func (b *RunEndEncodedBuilder ) UnmarshalJSON (data []byte ) error {
dec := json .NewDecoder (bytes .NewReader (data ))
t , err := dec .Token ()
if err != nil {
return err
}
if delim , ok := t .(json .Delim ); !ok || delim != '[' {
return fmt .Errorf ("list builder must unpack from json array, found %s" , delim )
}
return b .Unmarshal (dec )
}
var (
_ arrow .Array = (*RunEndEncoded )(nil )
_ Builder = (*RunEndEncodedBuilder )(nil )
)
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .