package parts
import (
"io"
"sort"
"github.com/apache/arrow-go/v18/arrow"
"github.com/polarsignals/frostdb/dynparquet"
)
type Part interface {
Record () arrow .Record
Release ()
Retain ()
SerializeBuffer (schema *dynparquet .Schema , w dynparquet .ParquetWriter ) error
AsSerializedBuffer (schema *dynparquet .Schema ) (*dynparquet .SerializedBuffer , error )
NumRows () int64
Size () int64
CompactionLevel () int
TX () uint64
Least () (*dynparquet .DynamicRow , error )
Most () (*dynparquet .DynamicRow , error )
OverlapsWith (schema *dynparquet .Schema , otherPart Part ) (bool , error )
Write (io .Writer ) error
}
type basePart struct {
tx uint64
compactionLevel int
minRow *dynparquet .DynamicRow
maxRow *dynparquet .DynamicRow
release func ()
}
func (p *basePart ) CompactionLevel () int {
return p .compactionLevel
}
func (p *basePart ) TX () uint64 { return p .tx }
type Option func (*basePart )
func WithCompactionLevel (level int ) Option {
return func (p *basePart ) {
p .compactionLevel = level
}
}
func WithRelease (release func ()) Option {
return func (p *basePart ) {
p .release = release
}
}
type PartSorter struct {
schema *dynparquet .Schema
parts []Part
err error
}
func NewPartSorter (schema *dynparquet .Schema , parts []Part ) *PartSorter {
return &PartSorter {
schema : schema ,
parts : parts ,
}
}
func (p *PartSorter ) Len () int {
return len (p .parts )
}
func (p *PartSorter ) Less (i , j int ) bool {
a , err := p .parts [i ].Least ()
if err != nil {
p .err = err
return false
}
b , err := p .parts [j ].Least ()
if err != nil {
p .err = err
return false
}
return p .schema .RowLessThan (a , b )
}
func (p *PartSorter ) Swap (i , j int ) {
p .parts [i ], p .parts [j ] = p .parts [j ], p .parts [i ]
}
func (p *PartSorter ) Err () error {
return p .err
}
func FindMaximumNonOverlappingSet (schema *dynparquet .Schema , parts []Part ) ([]Part , []Part , error ) {
if len (parts ) < 2 {
return parts , nil , nil
}
sorter := NewPartSorter (schema , parts )
sort .Sort (sorter )
if sorter .Err () != nil {
return nil , nil , sorter .Err ()
}
prev := 0
prevEnd , err := parts [0 ].Most ()
if err != nil {
return nil , nil , err
}
nonOverlapping := make ([]Part , 0 , len (parts ))
overlapping := make ([]Part , 0 , len (parts ))
var missing Part
for i := 1 ; i < len (parts ); i ++ {
start , err := parts [i ].Least ()
if err != nil {
return nil , nil , err
}
curEnd , err := parts [i ].Most ()
if err != nil {
return nil , nil , err
}
if schema .Cmp (prevEnd , start ) <= 0 {
nonOverlapping = append (nonOverlapping , parts [prev ])
prevEnd = curEnd
prev = i
continue
}
if schema .Cmp (prevEnd , curEnd ) >= 0 {
overlapping = append (overlapping , parts [prev ])
prevEnd = curEnd
prev = i
} else {
overlapping = append (overlapping , parts [i ])
if i == len (parts )-1 {
missing = parts [prev ]
}
}
}
if len (overlapping ) == 0 || overlapping [len (overlapping )-1 ] != parts [len (parts )-1 ] {
nonOverlapping = append (nonOverlapping , parts [len (parts )-1 ])
} else if missing != nil {
overlapping = append (overlapping , missing )
}
return nonOverlapping , overlapping , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .