package dynparquet
import (
"encoding/binary"
"fmt"
"math"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cespare/xxhash/v2"
"github.com/dgryski/go-metro"
)
const prehashedPrefix = "hashed"
func HashedColumnName (col string ) string {
return prehashedPrefix + "." + col
}
func IsHashedColumn (col string ) bool {
return strings .HasPrefix (col , prehashedPrefix )
}
func FindHashedColumn (col string , fields []arrow .Field ) int {
for j , f := range fields {
if HashedColumnName (col ) == f .Name {
return j
}
}
return -1
}
func PrehashColumns (schema *Schema , r arrow .Record ) arrow .Record {
bldr := array .NewInt64Builder (memory .DefaultAllocator )
defer bldr .Release ()
fields := r .Schema ().Fields ()
additionalFields := make ([]arrow .Field , 0 , len (fields ))
additionalColumns := make ([]arrow .Array , 0 , len (fields ))
defer func () {
for _ , col := range additionalColumns {
col .Release ()
}
}()
for _ , col := range schema .Columns () {
if !col .PreHash {
continue
}
for i , f := range fields {
if col .Name == f .Name || (col .Dynamic && strings .HasPrefix (f .Name , col .Name )) {
additionalFields = append (additionalFields , arrow .Field {
Name : HashedColumnName (f .Name ),
Type : arrow .PrimitiveTypes .Int64 ,
})
hashed := HashArray (r .Column (i ))
bldr .Reserve (len (hashed ))
for _ , v := range hashed {
bldr .UnsafeAppend (int64 (v ))
}
additionalColumns = append (additionalColumns , bldr .NewArray ())
}
}
}
if len (additionalColumns ) == 0 {
r .Retain ()
return r
}
sch := arrow .NewSchema (append (fields , additionalFields ...), nil )
return array .NewRecord (sch , append (r .Columns (), additionalColumns ...), r .NumRows ())
}
func HashArray (arr arrow .Array ) []uint64 {
switch ar := arr .(type ) {
case *array .String :
return hashStringArray (ar )
case *array .Binary :
return hashBinaryArray (ar )
case *array .Int64 :
return hashInt64Array (ar )
case *array .Uint64 :
return hashUint64Array (ar )
case *array .Boolean :
return hashBooleanArray (ar )
case *array .Dictionary :
return hashDictionaryArray (ar )
case *array .List :
return hashListArray (ar )
default :
panic ("unsupported array type " + fmt .Sprintf ("%T" , arr ))
}
}
func hashListArray(arr *array .List ) (res []uint64 ) {
res = make ([]uint64 , arr .Len ())
digest := xxhash .New ()
switch e := arr .ListValues ().(type ) {
case *array .Int64 :
var buf [8 ]byte
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .Write (binary .BigEndian .AppendUint64 (buf [:0 ],
uint64 (e .Value (int (j )))))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .Float64 :
var buf [8 ]byte
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .Write (binary .BigEndian .AppendUint64 (buf [:0 ],
math .Float64bits (e .Value (int (j )))))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .Boolean :
var buf [1 ]byte
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
if e .Value (int (j )) {
buf [0 ] = 2
} else {
buf [0 ] = 1
}
_, _ = digest .Write (buf [:])
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .Binary :
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .Write (e .Value (int (j )))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .String :
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .WriteString (e .Value (int (j )))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .Dictionary :
switch dict := e .Dictionary ().(type ) {
case *array .Binary :
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .Write (dict .Value (e .GetValueIndex (int (j ))))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
case *array .String :
for i := 0 ; i < arr .Len (); i ++ {
start , end := arr .ValueOffsets (i )
for j := start ; j < end ; j ++ {
_, _ = digest .WriteString (dict .Value (e .GetValueIndex (int (j ))))
}
res [i ] = digest .Sum64 ()
digest .Reset ()
}
return
default :
panic (fmt .Sprintf ("list dictionary not of expected type: %T" , dict ))
}
default :
panic (fmt .Sprintf ("list not of expected type: %T" , e ))
}
}
func hashDictionaryArray(arr *array .Dictionary ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if !arr .IsNull (i ) {
switch dict := arr .Dictionary ().(type ) {
case *array .Binary :
res [i ] = metro .Hash64 (dict .Value (arr .GetValueIndex (i )), 0 )
case *array .String :
res [i ] = metro .Hash64 ([]byte (dict .Value (arr .GetValueIndex (i ))), 0 )
default :
panic ("unsupported dictionary type " + fmt .Sprintf ("%T" , dict ))
}
}
}
return res
}
func hashBinaryArray(arr *array .Binary ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if !arr .IsNull (i ) {
res [i ] = metro .Hash64 (arr .Value (i ), 0 )
}
}
return res
}
func hashBooleanArray(arr *array .Boolean ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if arr .IsNull (i ) {
res [i ] = 0
continue
}
if arr .Value (i ) {
res [i ] = 2
} else {
res [i ] = 1
}
}
return res
}
func hashStringArray(arr *array .String ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if !arr .IsNull (i ) {
res [i ] = metro .Hash64 ([]byte (arr .Value (i )), 0 )
}
}
return res
}
func hashInt64Array(arr *array .Int64 ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if !arr .IsNull (i ) {
res [i ] = uint64 (arr .Value (i ))
}
}
return res
}
func hashUint64Array(arr *array .Uint64 ) []uint64 {
res := make ([]uint64 , arr .Len ())
for i := 0 ; i < arr .Len (); i ++ {
if !arr .IsNull (i ) {
res [i ] = arr .Value (i )
}
}
return res
}
func RemoveHashedColumns (r arrow .Record ) arrow .Record {
cols := make ([]arrow .Array , 0 , r .Schema ().NumFields ())
fields := make ([]arrow .Field , 0 , r .Schema ().NumFields ())
for i := 0 ; i < r .Schema ().NumFields (); i ++ {
if !IsHashedColumn (r .Schema ().Field (i ).Name ) {
cols = append (cols , r .Column (i ))
fields = append (fields , r .Schema ().Field (i ))
}
}
return array .NewRecord (arrow .NewSchema (fields , nil ), cols , r .NumRows ())
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .