package writer
import (
"errors"
"fmt"
"io"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/parquet-go/parquet-go"
"github.com/polarsignals/frostdb/pqarrow/builder"
)
type ValueWriter interface {
Write ([]parquet .Value )
}
type PageWriter interface {
ValueWriter
WritePage (parquet .Page ) error
}
var ErrCannotWritePageDirectly = errors .New ("cannot write page directly" )
type writerBase struct {}
func (w writerBase ) canWritePageDirectly (p parquet .Page ) bool {
return p .NumNulls () == 0 && p .Dictionary () == nil
}
type binaryValueWriter struct {
writerBase
b *builder .OptBinaryBuilder
}
type NewWriterFunc func (b builder .ColumnBuilder , numValues int ) ValueWriter
func NewBinaryValueWriter (b builder .ColumnBuilder , _ int ) ValueWriter {
return &binaryValueWriter {
b : b .(*builder .OptBinaryBuilder ),
}
}
func (w *binaryValueWriter ) Write (values []parquet .Value ) {
if err := w .b .AppendParquetValues (values ); err != nil {
panic ("unable to write value" )
}
}
func (w *binaryValueWriter ) WritePage (p parquet .Page ) error {
if !w .canWritePageDirectly (p ) {
return ErrCannotWritePageDirectly
}
values := p .Data ()
return w .b .AppendData (values .ByteArray ())
}
type int64ValueWriter struct {
writerBase
b *builder .OptInt64Builder
}
func NewInt64ValueWriter (b builder .ColumnBuilder , _ int ) ValueWriter {
res := &int64ValueWriter {
b : b .(*builder .OptInt64Builder ),
}
return res
}
func (w *int64ValueWriter ) Write (values []parquet .Value ) {
w .b .AppendParquetValues (values )
}
func (w *int64ValueWriter ) WritePage (p parquet .Page ) error {
if !w .canWritePageDirectly (p ) {
return ErrCannotWritePageDirectly
}
values := p .Data ()
w .b .AppendData (values .Int64 ())
return nil
}
type uint64ValueWriter struct {
b *array .Uint64Builder
}
func NewUint64ValueWriter (b builder .ColumnBuilder , numValues int ) ValueWriter {
res := &uint64ValueWriter {
b : b .(*array .Uint64Builder ),
}
res .b .Reserve (numValues )
return res
}
func (w *uint64ValueWriter ) Write (values []parquet .Value ) {
for _ , v := range values {
if v .IsNull () {
w .b .AppendNull ()
} else {
w .b .Append (uint64 (v .Int64 ()))
}
}
}
type repeatedValueWriter struct {
b *builder .ListBuilder
values ValueWriter
}
func NewListValueWriter (newValueWriter func (b builder .ColumnBuilder , numValues int ) ValueWriter ) func (b builder .ColumnBuilder , numValues int ) ValueWriter {
return func (b builder .ColumnBuilder , numValues int ) ValueWriter {
builder := b .(*builder .ListBuilder )
return &repeatedValueWriter {
b : builder ,
values : newValueWriter (builder .ValueBuilder (), numValues ),
}
}
}
func (w *repeatedValueWriter ) Write (values []parquet .Value ) {
listStart := false
start := 0
for i , v := range values {
if v .RepetitionLevel () == 0 {
if listStart {
w .b .Append (true )
w .values .Write (values [start :i ])
}
if v .DefinitionLevel () == 0 {
w .b .AppendNull ()
listStart = false
start = i + 1
} else {
listStart = true
start = i
}
}
}
if len (values [start :]) > 0 {
w .b .Append (true )
w .values .Write (values [start :])
}
}
type float64ValueWriter struct {
b *array .Float64Builder
buf []float64
}
func NewFloat64ValueWriter (b builder .ColumnBuilder , numValues int ) ValueWriter {
res := &float64ValueWriter {
b : b .(*array .Float64Builder ),
}
res .b .Reserve (numValues )
return res
}
func (w *float64ValueWriter ) Write (values []parquet .Value ) {
for _ , v := range values {
if v .IsNull () {
w .b .AppendNull ()
} else {
w .b .Append (v .Double ())
}
}
}
func (w *float64ValueWriter ) WritePage (p parquet .Page ) error {
ireader , ok := p .Values ().(parquet .DoubleReader )
if !ok {
return ErrCannotWritePageDirectly
}
if w .buf == nil {
w .buf = make ([]float64 , p .NumValues ())
}
values := w .buf
for {
n , err := ireader .ReadDoubles (values )
if err != nil && err != io .EOF {
return fmt .Errorf ("read values: %w" , err )
}
w .b .AppendValues (values [:n ], nil )
if err == io .EOF {
break
}
}
return nil
}
type booleanValueWriter struct {
writerBase
b *builder .OptBooleanBuilder
}
func NewBooleanValueWriter (b builder .ColumnBuilder , numValues int ) ValueWriter {
res := &booleanValueWriter {
b : b .(*builder .OptBooleanBuilder ),
}
res .b .Reserve (numValues )
return res
}
func (w *booleanValueWriter ) Write (values []parquet .Value ) {
w .b .AppendParquetValues (values )
}
func (w *booleanValueWriter ) WritePage (p parquet .Page ) error {
if !w .canWritePageDirectly (p ) {
return ErrCannotWritePageDirectly
}
values := p .Data ()
w .b .Append (values .Boolean (), int (p .NumValues ()))
return nil
}
type structWriter struct {
offset int
b *array .StructBuilder
}
func NewStructWriterFromOffset (offset int ) NewWriterFunc {
return func (b builder .ColumnBuilder , _ int ) ValueWriter {
return &structWriter {
offset : offset ,
b : b .(*array .StructBuilder ),
}
}
}
func (s *structWriter ) Write (values []parquet .Value ) {
total := 0
for _ , v := range values {
if v .RepetitionLevel () == 0 {
total ++
if total > s .b .Len () {
s .b .Append (true )
}
}
}
_ , ok := s .findLeafBuilder (values [0 ].Column (), s .offset , s .b , values )
if !ok {
panic ("unable to write values to builder" )
}
}
func (s *structWriter ) findLeafBuilder (searchIndex , currentIndex int , builder array .Builder , values []parquet .Value ) (int , bool ) {
switch b := builder .(type ) {
case *array .StructBuilder :
totalLeaves := 0
for i := 0 ; i < b .NumField (); i ++ {
leaves , appended := s .findLeafBuilder (searchIndex , currentIndex +totalLeaves , b .FieldBuilder (i ), values )
if appended {
if b .FieldBuilder (i ).Len () != b .Len () {
b .Append (true )
}
return b .NumField (), true
}
totalLeaves += leaves
}
return totalLeaves , false
case *array .Int64Builder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
b .Append (v .Int64 ())
}
}
return 1 , true
}
case *array .Uint64Builder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
b .Append (v .Uint64 ())
}
}
return 1 , true
}
case *array .Float64Builder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
b .Append (v .Double ())
}
}
return 1 , true
}
case *array .StringBuilder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
b .Append (string (v .ByteArray ()))
}
}
return 1 , true
}
case *array .BinaryBuilder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
b .Append (v .ByteArray ())
}
}
return 1 , true
}
case *array .BinaryDictionaryBuilder :
if searchIndex == currentIndex {
for _ , v := range values {
switch v .IsNull () {
case true :
b .AppendNull ()
default :
if err := b .Append (v .ByteArray ()); err != nil {
panic ("failed to append to dictionary" )
}
}
}
return 1 , true
}
default :
panic (fmt .Sprintf ("unsuported value type: %v" , b ))
}
return 1 , false
}
type mapWriter struct {
b *array .MapBuilder
}
func NewMapWriter (b builder .ColumnBuilder , _ int ) ValueWriter {
return &mapWriter {
b : b .(*array .MapBuilder ),
}
}
func (m *mapWriter ) Write (_ []parquet .Value ) {
panic ("not implemented" )
}
type dictionaryValueWriter struct {
b builder .ColumnBuilder
}
func NewDictionaryValueWriter (b builder .ColumnBuilder , _ int ) ValueWriter {
res := &dictionaryValueWriter {
b : b ,
}
return res
}
func (w *dictionaryValueWriter ) Write (values []parquet .Value ) {
switch db := w .b .(type ) {
case *array .BinaryDictionaryBuilder :
for _ , v := range values {
if v .IsNull () {
db .AppendNull ()
} else {
if err := db .Append (v .Bytes ()); err != nil {
panic ("failed to append to dictionary" )
}
}
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .