package pqarrow
import (
"context"
"errors"
"fmt"
"io"
"sort"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/parquet-go/parquet-go"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/polarsignals/frostdb/pqarrow/builder"
"github.com/polarsignals/frostdb/pqarrow/convert"
"github.com/polarsignals/frostdb/pqarrow/writer"
"github.com/polarsignals/frostdb/query/logicalplan"
)
func ParquetRowGroupToArrowSchema (ctx context .Context , rg parquet .RowGroup , s *dynparquet .Schema , options logicalplan .IterOptions ) (*arrow .Schema , error ) {
return ParquetSchemaToArrowSchema (ctx , rg .Schema (), s , options )
}
func ParquetSchemaToArrowSchema (ctx context .Context , schema *parquet .Schema , s *dynparquet .Schema , options logicalplan .IterOptions ) (*arrow .Schema , error ) {
parquetFields := schema .Fields ()
if len (options .DistinctColumns ) == 1 && options .Filter == nil {
fields := make ([]arrow .Field , 0 , 1 )
for _ , field := range parquetFields {
select {
case <- ctx .Done ():
return nil , ctx .Err ()
default :
af , err := parquetFieldToArrowField ("" , field , options .DistinctColumns )
if err != nil {
return nil , err
}
if af .Name != "" {
fields = append (fields , af )
}
}
}
return arrow .NewSchema (fields , nil ), nil
}
fields := make ([]arrow .Field , 0 , len (parquetFields ))
for _ , f := range parquetFields {
f , err := parquetFieldToArrowField ("" , f , options .PhysicalProjection )
if err != nil {
return nil , err
}
if f .Name != "" {
fields = append (fields , f )
}
}
if len (options .DistinctColumns ) > 0 {
for _ , distinctExpr := range options .DistinctColumns {
if distinctExpr .Computed () {
dataType , err := distinctExpr .DataType (&exprTypeFinder {s : s })
if err != nil {
return nil , err
}
fields = append (fields , arrow .Field {
Name : distinctExpr .Name (),
Type : dataType ,
Nullable : true ,
})
}
}
sort .Slice (fields , func (i , j int ) bool {
return fields [i ].Name < fields [j ].Name
})
}
return arrow .NewSchema (fields , nil ), nil
}
type exprTypeFinder struct {
s *dynparquet .Schema
}
func (e *exprTypeFinder ) DataTypeForExpr (expr logicalplan .Expr ) (arrow .DataType , error ) {
return logicalplan .DataTypeForExprWithSchema (expr , e .s )
}
func parquetFieldToArrowField(prefix string , field parquet .Field , physicalProjections []logicalplan .Expr ) (arrow .Field , error ) {
if includedProjection (physicalProjections , fullPath (prefix , field )) {
af , err := convert .ParquetFieldToArrowField (field )
if err != nil {
return arrow .Field {}, err
}
return af , nil
}
if !field .Leaf () && includedPathProjection (physicalProjections , fullPath (prefix , field )) {
group := []arrow .Field {}
for _ , f := range field .Fields () {
af , err := parquetFieldToArrowField (fullPath (prefix , field ), f , physicalProjections )
if err != nil {
return arrow .Field {}, err
}
if af .Name != "" {
group = append (group , af )
}
}
if len (group ) > 0 {
return arrow .Field {
Name : field .Name (),
Type : arrow .StructOf (group ...),
Nullable : field .Optional (),
}, nil
}
}
return arrow .Field {}, nil
}
func fullPath(prefix string , parquetField parquet .Field ) string {
if prefix == "" {
return parquetField .Name ()
}
return prefix + "." + parquetField .Name ()
}
func includedPathProjection(projections []logicalplan .Expr , name string ) bool {
if len (projections ) == 0 {
return true
}
for _ , p := range projections {
if p .MatchPath (name ) {
return true
}
}
return false
}
func includedProjection(projections []logicalplan .Expr , name string ) bool {
if len (projections ) == 0 {
return true
}
for _ , p := range projections {
if p .MatchColumn (name ) {
return true
}
}
return false
}
type parquetConverterMode int
const (
normal parquetConverterMode = iota
singleDistinctColumn
multiDistinctColumn
)
var _ = singleDistinctColumn
type distinctColInfo struct {
parquetIndex int
v *parquet .Value
w writer .ValueWriter
b builder .ColumnBuilder
}
type ParquetConverter struct {
mode parquetConverterMode
pool memory .Allocator
distinctColInfos []*distinctColInfo
outputSchema *arrow .Schema
iterOpts logicalplan .IterOptions
builder *builder .RecordBuilder
writers []MultiColumnWriter
prevSchema *parquet .Schema
scratchValues []parquet .Value
}
func NewParquetConverter (
pool memory .Allocator ,
iterOpts logicalplan .IterOptions ,
) *ParquetConverter {
c := &ParquetConverter {
mode : normal ,
pool : pool ,
iterOpts : iterOpts ,
distinctColInfos : make ([]*distinctColInfo , len (iterOpts .DistinctColumns )),
}
if iterOpts .Filter == nil && len (iterOpts .DistinctColumns ) != 0 {
simpleDistinctExprs := true
for _ , distinctColumn := range iterOpts .DistinctColumns {
if _ , ok := distinctColumn .(*logicalplan .DynamicColumn ); ok ||
len (distinctColumn .ColumnsUsedExprs ()) != 1 {
simpleDistinctExprs = false
break
}
}
if simpleDistinctExprs {
c .mode = multiDistinctColumn
}
}
return c
}
func (c *ParquetConverter ) Convert (ctx context .Context , rg parquet .RowGroup , s *dynparquet .Schema ) error {
schema , err := ParquetRowGroupToArrowSchema (ctx , rg , s , c .iterOpts )
if err != nil {
return err
}
if schema .NumFields () == 0 {
return nil
}
if c .outputSchema == nil {
c .outputSchema = schema
c .builder = builder .NewRecordBuilder (c .pool , c .outputSchema )
} else if !schema .Equal (c .outputSchema ) {
c .outputSchema = mergeArrowSchemas ([]*arrow .Schema {c .outputSchema , schema })
c .builder .ExpandSchema (c .outputSchema )
if maxLen , _ , anomaly := recordBuilderLength (c .builder ); anomaly {
for _ , field := range c .builder .Fields () {
if fieldLen := field .Len (); fieldLen < maxLen {
if ob , ok := field .(builder .OptimizedBuilder ); ok {
ob .AppendNulls (maxLen - fieldLen )
continue
}
for i := 0 ; i < maxLen -fieldLen ; i ++ {
field .AppendNull ()
}
}
}
}
}
if _ , ok := rg .(*dynparquet .MergedRowGroup ); ok {
return rowBasedParquetRowGroupToArrowRecord (ctx , rg , c .outputSchema , c .builder )
}
parquetSchema := rg .Schema ()
parquetColumns := rg .ColumnChunks ()
parquetFields := parquetSchema .Fields ()
if !parquetSchemaEqual (c .prevSchema , parquetSchema ) {
if err := c .schemaChanged (parquetFields ); err != nil {
return err
}
c .prevSchema = parquetSchema
}
if c .mode == multiDistinctColumn {
appliedOptimization , err := c .writeDistinctAllColumns (
ctx ,
parquetFields ,
parquetColumns ,
)
if err != nil {
return err
}
if appliedOptimization {
return nil
}
}
for _ , w := range c .writers {
for _ , col := range w .colIdx {
select {
case <- ctx .Done ():
return ctx .Err ()
default :
if err := c .writeColumnToArray (
parquetFields [w .fieldIdx ],
parquetColumns [col ],
false ,
w .writer ,
); err != nil {
return fmt .Errorf ("convert parquet column to arrow array: %w" , err )
}
}
}
}
maxLen , _ , anomaly := recordBuilderLength (c .builder )
if !anomaly {
return nil
}
for _ , field := range c .builder .Fields () {
if fieldLen := field .Len (); fieldLen < maxLen {
if ob , ok := field .(builder .OptimizedBuilder ); ok {
ob .AppendNulls (maxLen - fieldLen )
continue
}
for i := 0 ; i < maxLen -fieldLen ; i ++ {
field .AppendNull ()
}
}
}
return nil
}
func (c *ParquetConverter ) Fields () []builder .ColumnBuilder {
if c .builder == nil {
return nil
}
return c .builder .Fields ()
}
func (c *ParquetConverter ) NumRows () int {
return c .builder .Field (0 ).Len ()
}
func (c *ParquetConverter ) NewRecord () arrow .Record {
if c .builder != nil {
return c .builder .NewRecord ()
}
return nil
}
func (c *ParquetConverter ) Reset () {
if c .builder != nil {
c .builder .Reset ()
}
}
func (c *ParquetConverter ) Close () {
if c .builder != nil {
c .builder .Release ()
}
}
func numLeaves(f parquet .Field ) int {
if f .Leaf () {
return 1
}
leaves := 0
for _ , field := range f .Fields () {
switch field .Leaf () {
case true :
leaves ++
default :
leaves += numLeaves (field )
}
}
return leaves
}
func (c *ParquetConverter ) schemaChanged (parquetFields []parquet .Field ) error {
c .writers = c .writers [:0 ]
parquetIndexToWriterMap := make (map [int ]writer .ValueWriter )
colOffset := 0
for i , field := range parquetFields {
indices := c .outputSchema .FieldIndices (field .Name ())
if len (indices ) == 0 {
colOffset += numLeaves (field )
continue
}
newWriter , err := convert .GetWriter (i , field )
if err != nil {
return err
}
writer := newWriter (c .builder .Field (indices [0 ]), 0 )
cols := make ([]int , numLeaves (field ))
for i := range cols {
cols [i ] = colOffset
colOffset ++
}
c .writers = append (c .writers , MultiColumnWriter {
writer : writer ,
fieldIdx : i ,
colIdx : cols ,
})
parquetIndexToWriterMap [i ] = writer
}
if c .mode != multiDistinctColumn {
return nil
}
for i := range c .iterOpts .DistinctColumns {
c .distinctColInfos [i ] = nil
}
for i , expr := range c .iterOpts .DistinctColumns {
for j , field := range parquetFields {
if !expr .ColumnsUsedExprs ()[0 ].MatchColumn (field .Name ()) {
continue
}
c .distinctColInfos [i ] = &distinctColInfo {
parquetIndex : j ,
w : parquetIndexToWriterMap [j ],
b : c .builder .Field (c .outputSchema .FieldIndices (expr .Name ())[0 ]),
}
}
}
return nil
}
func (c *ParquetConverter ) writeDistinctAllColumns (
ctx context .Context ,
parquetFields []parquet .Field ,
parquetColumns []parquet .ColumnChunk ,
) (bool , error ) {
initialLength := c .NumRows ()
for i , info := range c .distinctColInfos {
if info == nil {
continue
}
select {
case <- ctx .Done ():
return false , ctx .Err ()
default :
optimizationApplied , err := c .writeDistinctSingleColumn (
parquetFields [info .parquetIndex ],
parquetColumns [info .parquetIndex ],
c .iterOpts .DistinctColumns [i ],
info ,
)
if err != nil {
return false , err
}
if !optimizationApplied {
for _ , field := range c .builder .Fields () {
if field .Len () == initialLength {
continue
}
resetBuilderToLength (field , initialLength )
}
return false , nil
}
}
}
atLeastOneRow := false
for _ , field := range c .builder .Fields () {
if field .Len () > initialLength {
atLeastOneRow = true
break
}
}
if !atLeastOneRow {
return true , nil
}
countRowsLargerThanOne := 0
maxLen := 0
for _ , field := range c .builder .Fields () {
fieldLen := field .Len ()
if fieldLen > initialLength +1 {
countRowsLargerThanOne ++
if countRowsLargerThanOne > 1 {
break
}
}
if fieldLen > maxLen {
maxLen = fieldLen
}
}
if countRowsLargerThanOne > 1 {
for _ , field := range c .builder .Fields () {
if field .Len () == initialLength {
continue
}
resetBuilderToLength (field , initialLength )
}
return false , nil
}
for _ , field := range c .builder .Fields () {
if fieldLen := field .Len (); fieldLen == initialLength {
if ob , ok := field .(builder .OptimizedBuilder ); ok {
ob .AppendNulls (maxLen - initialLength )
continue
}
for j := initialLength ; j < maxLen ; j ++ {
field .AppendNull ()
}
} else if fieldLen < maxLen {
repeatTimes := maxLen - fieldLen
if ob , ok := field .(builder .OptimizedBuilder ); ok {
if err := ob .RepeatLastValue (repeatTimes ); err != nil {
return false , err
}
continue
}
arr := field .NewArray ()
copyArrToBuilder (field , arr , fieldLen )
repeatLastValue (field , arr , repeatTimes )
arr .Release ()
}
}
return true , nil
}
func (c *ParquetConverter ) writeDistinctSingleColumn (
node parquet .Node ,
columnChunk parquet .ColumnChunk ,
distinctExpr logicalplan .Expr ,
info *distinctColInfo ,
) (bool , error ) {
switch expr := distinctExpr .(type ) {
case *logicalplan .BinaryExpr :
return binaryDistinctExpr (
node .Type (),
columnChunk ,
expr ,
info ,
)
case *logicalplan .Column :
if err := c .writeColumnToArray (
node ,
columnChunk ,
true ,
info .w ,
); err != nil {
return false , err
}
return true , nil
default :
return false , nil
}
}
var rowBufPool = &sync .Pool {
New : func () interface {} {
return make ([]parquet .Row , 64 )
},
}
func rowBasedParquetRowGroupToArrowRecord(
ctx context .Context ,
rg parquet .RowGroup ,
schema *arrow .Schema ,
builder *builder .RecordBuilder ,
) error {
parquetFields := rg .Schema ().Fields ()
if schema .NumFields () != len (parquetFields ) {
return fmt .Errorf ("inconsistent schema between arrow and parquet" )
}
writers := make ([]writer .ValueWriter , len (parquetFields ))
for i , field := range builder .Fields () {
newValueWriter , err := convert .GetWriter (i , parquetFields [i ])
if err != nil {
return err
}
writers [i ] = newValueWriter (field , 0 )
}
rows := rg .Rows ()
defer rows .Close ()
rowBuf := rowBufPool .Get ().([]parquet .Row )
defer rowBufPool .Put (rowBuf [:cap (rowBuf )])
for {
select {
case <- ctx .Done ():
return ctx .Err ()
default :
}
rowBuf = rowBuf [:cap (rowBuf )]
n , err := rows .ReadRows (rowBuf )
if err == io .EOF && n == 0 {
break
}
if err != nil && err != io .EOF {
return fmt .Errorf ("read row: %w" , err )
}
rowBuf = rowBuf [:n ]
for i , writer := range writers {
for _ , row := range rowBuf {
values := dynparquet .ValuesForIndex (row , i )
writer .Write (values )
}
}
if err == io .EOF {
break
}
}
return nil
}
func (c *ParquetConverter ) writeColumnToArray (
n parquet .Node ,
columnChunk parquet .ColumnChunk ,
dictionaryOnly bool ,
w writer .ValueWriter ,
) error {
repeated := n .Repeated ()
if !repeated && dictionaryOnly {
columnIndex , err := columnChunk .ColumnIndex ()
if err != nil {
return err
}
columnType := columnChunk .Type ()
globalMinValue := columnIndex .MinValue (0 )
readPages := false
for pageIdx := 0 ; pageIdx < columnIndex .NumPages (); pageIdx ++ {
if columnIndex .NullCount (pageIdx ) > 0 {
readPages = true
break
}
minValue := globalMinValue
if pageIdx != 0 {
minValue = columnIndex .MinValue (pageIdx )
}
maxValue := columnIndex .MaxValue (pageIdx )
if columnType .Length () == 0 {
if len (minValue .Bytes ()) >= dynparquet .ColumnIndexSize ||
len (maxValue .Bytes ()) >= dynparquet .ColumnIndexSize {
readPages = true
break
}
}
if columnType .Compare (minValue , maxValue ) != 0 ||
(pageIdx != 0 && columnType .Compare (globalMinValue , minValue ) != 0 ) {
readPages = true
break
}
}
if !readPages {
w .Write ([]parquet .Value {globalMinValue })
return nil
}
}
pages := columnChunk .Pages ()
defer pages .Close ()
for {
p , err := pages .ReadPage ()
if err != nil {
if err == io .EOF {
break
}
return fmt .Errorf ("read page: %w" , err )
}
dict := p .Dictionary ()
if dict != nil && dictionaryOnly {
if p .NumNulls () > 0 {
w .Write ([]parquet .Value {parquet .NullValue ()})
}
p = dict .Page ()
}
if pw , ok := w .(writer .PageWriter ); ok {
err := pw .WritePage (p )
if err == nil {
continue
} else if err != nil && !errors .Is (err , writer .ErrCannotWritePageDirectly ) {
return fmt .Errorf ("write page: %w" , err )
}
}
n := p .NumValues ()
if int64 (cap (c .scratchValues )) < n {
c .scratchValues = make ([]parquet .Value , n )
}
c .scratchValues = c .scratchValues [:n ]
reader := p .Values ()
if _ , err := reader .ReadValues (c .scratchValues ); err != nil && err != io .EOF {
return fmt .Errorf ("read values: %w" , err )
}
w .Write (c .scratchValues )
}
return nil
}
func SingleMatchingColumn (distinctColumns []logicalplan .Expr , fields []parquet .Field ) bool {
count := 0
for _ , col := range distinctColumns {
for _ , field := range fields {
name := field .Name ()
if col .MatchColumn (name ) {
count ++
if count > 1 {
return false
}
}
}
}
return count == 1
}
func recordBuilderLength(rb *builder .RecordBuilder ) (maxLength , maxLengthFields int , anomaly bool ) {
fields := rb .Fields ()
maxLength = fields [0 ].Len ()
maxLengthFields = 0
for _ , field := range fields {
if fieldLen := field .Len (); fieldLen != maxLength {
if fieldLen > maxLength {
maxLengthFields = 1
maxLength = fieldLen
}
} else {
maxLengthFields ++
}
}
return maxLength , maxLengthFields , maxLengthFields != len (rb .Fields ())
}
func parquetSchemaEqual(schema1 , schema2 *parquet .Schema ) bool {
switch {
case schema1 == schema2 :
return true
case schema1 == nil || schema2 == nil :
return false
case len (schema1 .Fields ()) != len (schema2 .Fields ()):
return false
}
s1Fields := schema1 .Fields ()
s2Fields := schema2 .Fields ()
for i := range s1Fields {
if s1Fields [i ].Name () != s2Fields [i ].Name () {
return false
}
}
return true
}
type PreExprVisitorFunc func (expr logicalplan .Expr ) bool
func (f PreExprVisitorFunc ) PreVisit (expr logicalplan .Expr ) bool {
return f (expr )
}
func (f PreExprVisitorFunc ) Visit (_ logicalplan .Expr ) bool {
return false
}
func (f PreExprVisitorFunc ) PostVisit (_ logicalplan .Expr ) bool {
return false
}
func binaryDistinctExpr(
typ parquet .Type ,
columnChunk parquet .ColumnChunk ,
expr *logicalplan .BinaryExpr ,
info *distinctColInfo ,
) (bool , error ) {
if info .v == nil {
var (
value parquet .Value
err error
)
expr .Right .Accept (PreExprVisitorFunc (func (expr logicalplan .Expr ) bool {
switch e := expr .(type ) {
case *logicalplan .LiteralExpr :
value , err = ArrowScalarToParquetValue (e .Value )
return false
}
return true
}))
if err != nil {
return false , err
}
info .v = &value
}
value := *info .v
switch expr .Op {
case logicalplan .OpGt :
index , err := columnChunk .ColumnIndex ()
if err != nil {
return false , err
}
allGreater , noneGreater := allOrNoneGreaterThan (
typ ,
index ,
value ,
)
if allGreater || noneGreater {
b := info .b .(*builder .OptBooleanBuilder )
if allGreater {
b .AppendParquetValues ([]parquet .Value {parquet .ValueOf (true )})
}
if noneGreater {
b .AppendParquetValues ([]parquet .Value {parquet .ValueOf (false )})
}
return true , nil
}
default :
return false , nil
}
return false , nil
}
func allOrNoneGreaterThan(
typ parquet .Type ,
index parquet .ColumnIndex ,
value parquet .Value ,
) (bool , bool ) {
numPages := index .NumPages ()
allTrue := true
allFalse := true
for i := 0 ; i < numPages ; i ++ {
minValue := index .MinValue (i )
maxValue := index .MaxValue (i )
if typ .Compare (maxValue , value ) <= 0 {
allTrue = false
}
if typ .Compare (minValue , value ) > 0 {
allFalse = false
}
}
return allTrue , allFalse
}
func resetBuilderToLength(b builder .ColumnBuilder , l int ) {
if ob , ok := b .(builder .OptimizedBuilder ); ok {
ob .ResetToLength (l )
return
}
arr := b .NewArray ()
copyArrToBuilder (b , arr , l )
arr .Release ()
}
func copyArrToBuilder(builder builder .ColumnBuilder , arr arrow .Array , toCopy int ) {
builder .Reserve (toCopy )
switch arr := arr .(type ) {
case *array .Boolean :
b := builder .(*array .BooleanBuilder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .UnsafeAppendBoolToBitmap (false )
} else {
b .UnsafeAppend (arr .Value (i ))
}
}
case *array .Binary :
b := builder .(*array .BinaryBuilder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .AppendNull ()
} else {
b .Append (arr .Value (i ))
}
}
case *array .String :
b := builder .(*array .BinaryBuilder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .AppendNull ()
} else {
b .AppendString (arr .Value (i ))
}
}
case *array .Int64 :
b := builder .(*array .Int64Builder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .UnsafeAppendBoolToBitmap (false )
} else {
b .UnsafeAppend (arr .Value (i ))
}
}
case *array .Uint64 :
b := builder .(*array .Uint64Builder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .UnsafeAppendBoolToBitmap (false )
} else {
b .UnsafeAppend (arr .Value (i ))
}
}
case *array .Float64 :
b := builder .(*array .Float64Builder )
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .UnsafeAppendBoolToBitmap (false )
} else {
b .UnsafeAppend (arr .Value (i ))
}
}
case *array .Dictionary :
b := builder .(*array .BinaryDictionaryBuilder )
switch dict := arr .Dictionary ().(type ) {
case *array .Binary :
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .AppendNull ()
} else {
if err := b .Append (dict .Value (arr .GetValueIndex (i ))); err != nil {
panic ("failed to append to dictionary" )
}
}
}
case *array .String :
for i := 0 ; i < toCopy ; i ++ {
if arr .IsNull (i ) {
b .AppendNull ()
} else {
if err := b .AppendString (dict .Value (arr .GetValueIndex (i ))); err != nil {
panic ("failed to append to dictionary" )
}
}
}
default :
panic (fmt .Sprintf ("unsupported dictionary type: %T" , dict ))
}
default :
panic (fmt .Sprintf ("unsupported array type: %T" , arr ))
}
}
func repeatLastValue(
builder builder .ColumnBuilder ,
arr arrow .Array ,
count int ,
) {
switch arr := arr .(type ) {
case *array .Boolean :
repeatBooleanArray (builder .(*array .BooleanBuilder ), arr , count )
case *array .Binary :
repeatBinaryArray (builder .(*array .BinaryBuilder ), arr , count )
case *array .Int64 :
repeatInt64Array (builder .(*array .Int64Builder ), arr , count )
case *array .Uint64 :
repeatUint64Array (builder .(*array .Uint64Builder ), arr , count )
case *array .Float64 :
repeatFloat64Array (builder .(*array .Float64Builder ), arr , count )
case *array .Dictionary :
repeatDictionaryArray (builder .(array .DictionaryBuilder ), arr , count )
default :
panic (fmt .Sprintf ("unsupported array type: %T" , arr ))
}
}
func repeatBooleanArray(
b *array .BooleanBuilder ,
arr *array .Boolean ,
count int ,
) {
val := arr .Value (arr .Len () - 1 )
vals := make ([]bool , count )
for i := 0 ; i < count ; i ++ {
vals [i ] = val
}
b .AppendValues (vals , nil )
}
func repeatBinaryArray(
b *array .BinaryBuilder ,
arr *array .Binary ,
count int ,
) {
val := arr .Value (arr .Len () - 1 )
vals := make ([][]byte , count )
for i := 0 ; i < count ; i ++ {
vals [i ] = val
}
b .AppendValues (vals , nil )
}
func repeatInt64Array(
b *array .Int64Builder ,
arr *array .Int64 ,
count int ,
) {
val := arr .Value (arr .Len () - 1 )
vals := make ([]int64 , count )
for i := 0 ; i < count ; i ++ {
vals [i ] = val
}
b .AppendValues (vals , nil )
}
func repeatUint64Array(
b *array .Uint64Builder ,
arr *array .Uint64 ,
count int ,
) {
val := arr .Value (arr .Len () - 1 )
vals := make ([]uint64 , count )
for i := 0 ; i < count ; i ++ {
vals [i ] = val
}
b .AppendValues (vals , nil )
}
func repeatFloat64Array(
b *array .Float64Builder ,
arr *array .Float64 ,
count int ,
) {
val := arr .Value (arr .Len () - 1 )
vals := make ([]float64 , count )
for i := 0 ; i < count ; i ++ {
vals [i ] = val
}
b .AppendValues (vals , nil )
}
func repeatDictionaryArray(b array .DictionaryBuilder , arr *array .Dictionary , count int ) {
switch db := b .(type ) {
case *array .BinaryDictionaryBuilder :
switch dict := arr .Dictionary ().(type ) {
case *array .Binary :
if arr .IsNull (arr .Len () - 1 ) {
for i := 0 ; i < count ; i ++ {
db .AppendNull ()
}
} else {
val := dict .Value (arr .GetValueIndex (arr .Len () - 1 ))
for i := 0 ; i < count ; i ++ {
if err := db .Append (val ); err != nil {
panic ("failed to append value to dict" )
}
}
}
default :
panic (fmt .Sprintf ("unsuported dictionary array for builder %T" , dict ))
}
default :
panic (fmt .Sprintf ("unsuported dictionary builder %T" , db ))
}
}
func mergeArrowSchemas(schemas []*arrow .Schema ) *arrow .Schema {
fieldNames := make ([]string , 0 , 16 )
fieldsMap := make (map [string ]arrow .Field )
for _ , schema := range schemas {
for i := 0 ; i < schema .NumFields (); i ++ {
f := schema .Field (i )
if _ , ok := fieldsMap [f .Name ]; !ok {
fieldNames = append (fieldNames , f .Name )
fieldsMap [f .Name ] = f
}
}
}
sort .Strings (fieldNames )
fields := make ([]arrow .Field , 0 , len (fieldNames ))
for _ , name := range fieldNames {
fields = append (fields , fieldsMap [name ])
}
return arrow .NewSchema (fields , nil )
}
type MultiColumnWriter struct {
writer writer .ValueWriter
fieldIdx int
colIdx []int
}
func ColToWriter (col int , writers []MultiColumnWriter ) writer .ValueWriter {
for _ , w := range writers {
for _ , idx := range w .colIdx {
if col == idx {
return w .writer
}
}
}
return nil
}
func Project (r arrow .Record , projections []logicalplan .Expr ) arrow .Record {
if len (projections ) == 0 {
r .Retain ()
return r
}
cols := make ([]arrow .Array , 0 , r .Schema ().NumFields ())
fields := make ([]arrow .Field , 0 , r .Schema ().NumFields ())
for i := 0 ; i < r .Schema ().NumFields (); i ++ {
for _ , projection := range projections {
if projection .MatchColumn (r .Schema ().Field (i ).Name ) {
cols = append (cols , r .Column (i ))
fields = append (fields , r .Schema ().Field (i ))
break
}
}
}
if len (cols ) == r .Schema ().NumFields () {
r .Retain ()
return r
}
return array .NewRecord (arrow .NewSchema (fields , nil ), cols , r .NumRows ())
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .