package parquet
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"reflect"
"strconv"
"time"
"unsafe"
"github.com/google/uuid"
"github.com/parquet-go/parquet-go/deprecated"
"github.com/parquet-go/parquet-go/format"
)
const (
defaultValueBufferSize = 170
offsetOfPtr = unsafe .Offsetof (Value {}.ptr )
offsetOfU64 = unsafe .Offsetof (Value {}.u64 )
offsetOfU32 = offsetOfU64 + firstByteOffsetOf32BitsValue
offsetOfBool = offsetOfU64 + firstByteOffsetOfBooleanValue
)
type Value struct {
ptr *byte
u64 uint64
kind int8
definitionLevel byte
repetitionLevel byte
columnIndex int16
}
type ValueReader interface {
ReadValues ([]Value ) (int , error )
}
type ValueReaderAt interface {
ReadValuesAt ([]Value , int64 ) (int , error )
}
type ValueReaderFrom interface {
ReadValuesFrom (ValueReader ) (int64 , error )
}
type ValueWriter interface {
WriteValues ([]Value ) (int , error )
}
type ValueWriterTo interface {
WriteValuesTo (ValueWriter ) (int64 , error )
}
type ValueReaderFunc func ([]Value ) (int , error )
func (f ValueReaderFunc ) ReadValues (values []Value ) (int , error ) { return f (values ) }
type ValueWriterFunc func ([]Value ) (int , error )
func (f ValueWriterFunc ) WriteValues (values []Value ) (int , error ) { return f (values ) }
func CopyValues (dst ValueWriter , src ValueReader ) (int64 , error ) {
return copyValues (dst , src , nil )
}
func copyValues(dst ValueWriter , src ValueReader , buf []Value ) (written int64 , err error ) {
if wt , ok := src .(ValueWriterTo ); ok {
return wt .WriteValuesTo (dst )
}
if rf , ok := dst .(ValueReaderFrom ); ok {
return rf .ReadValuesFrom (src )
}
if len (buf ) == 0 {
buf = make ([]Value , defaultValueBufferSize )
}
defer clearValues (buf )
for {
n , err := src .ReadValues (buf )
if n > 0 {
wn , werr := dst .WriteValues (buf [:n ])
written += int64 (wn )
if werr != nil {
return written , werr
}
}
if err != nil {
if err == io .EOF {
err = nil
}
return written , err
}
if n == 0 {
return written , io .ErrNoProgress
}
}
}
func ValueOf (v interface {}) Value {
k := Kind (-1 )
t := reflect .TypeOf (v )
switch value := v .(type ) {
case nil :
return Value {}
case uuid .UUID :
return makeValueBytes (FixedLenByteArray , value [:])
case deprecated .Int96 :
return makeValueInt96 (value )
case time .Time :
k = Int64
}
switch t .Kind () {
case reflect .Bool :
k = Boolean
case reflect .Int8 , reflect .Int16 , reflect .Int32 , reflect .Uint8 , reflect .Uint16 , reflect .Uint32 :
k = Int32
case reflect .Int64 , reflect .Int , reflect .Uint64 , reflect .Uint , reflect .Uintptr :
k = Int64
case reflect .Float32 :
k = Float
case reflect .Float64 :
k = Double
case reflect .String :
k = ByteArray
case reflect .Slice :
if t .Elem ().Kind () == reflect .Uint8 {
k = ByteArray
}
case reflect .Array :
if t .Elem ().Kind () == reflect .Uint8 {
k = FixedLenByteArray
}
}
if k < 0 {
panic ("cannot create parquet value from go value of type " + t .String ())
}
return makeValue (k , nil , reflect .ValueOf (v ))
}
func NullValue () Value { return Value {} }
func ZeroValue (kind Kind ) Value { return makeValueKind (kind ) }
func BooleanValue (value bool ) Value { return makeValueBoolean (value ) }
func Int32Value (value int32 ) Value { return makeValueInt32 (value ) }
func Int64Value (value int64 ) Value { return makeValueInt64 (value ) }
func Int96Value (value deprecated .Int96 ) Value { return makeValueInt96 (value ) }
func FloatValue (value float32 ) Value { return makeValueFloat (value ) }
func DoubleValue (value float64 ) Value { return makeValueDouble (value ) }
func ByteArrayValue (value []byte ) Value { return makeValueBytes (ByteArray , value ) }
func FixedLenByteArrayValue (value []byte ) Value { return makeValueBytes (FixedLenByteArray , value ) }
func makeValue(k Kind , lt *format .LogicalType , v reflect .Value ) Value {
if v .Kind () == reflect .Interface {
if v .IsNil () {
return Value {}
}
if v = v .Elem (); v .Kind () == reflect .Pointer && v .IsNil () {
return Value {}
}
}
switch v .Type () {
case reflect .TypeOf (time .Time {}):
unit := Nanosecond .TimeUnit ()
if lt != nil && lt .Timestamp != nil {
unit = lt .Timestamp .Unit
}
t := v .Interface ().(time .Time )
var val int64
switch {
case unit .Millis != nil :
val = t .UnixMilli ()
case unit .Micros != nil :
val = t .UnixMicro ()
default :
val = t .UnixNano ()
}
return makeValueInt64 (val )
}
switch k {
case Boolean :
return makeValueBoolean (v .Bool ())
case Int32 :
switch v .Kind () {
case reflect .Int8 , reflect .Int16 , reflect .Int32 :
return makeValueInt32 (int32 (v .Int ()))
case reflect .Uint8 , reflect .Uint16 , reflect .Uint32 :
return makeValueInt32 (int32 (v .Uint ()))
}
case Int64 :
switch v .Kind () {
case reflect .Int8 , reflect .Int16 , reflect .Int32 , reflect .Int64 , reflect .Int :
return makeValueInt64 (v .Int ())
case reflect .Uint8 , reflect .Uint16 , reflect .Uint32 , reflect .Uint64 , reflect .Uint , reflect .Uintptr :
return makeValueUint64 (v .Uint ())
}
case Int96 :
switch v .Type () {
case reflect .TypeOf (deprecated .Int96 {}):
return makeValueInt96 (v .Interface ().(deprecated .Int96 ))
}
case Float :
switch v .Kind () {
case reflect .Float32 :
return makeValueFloat (float32 (v .Float ()))
}
case Double :
switch v .Kind () {
case reflect .Float32 , reflect .Float64 :
return makeValueDouble (v .Float ())
}
case ByteArray :
switch v .Kind () {
case reflect .String :
return makeValueString (k , v .String ())
case reflect .Slice :
if v .Type ().Elem ().Kind () == reflect .Uint8 {
return makeValueBytes (k , v .Bytes ())
}
}
case FixedLenByteArray :
switch v .Kind () {
case reflect .String :
return makeValueString (k , v .String ())
case reflect .Array :
if v .Type ().Elem ().Kind () == reflect .Uint8 {
return makeValueFixedLenByteArray (v )
}
case reflect .Slice :
if v .Type ().Elem ().Kind () == reflect .Uint8 {
return makeValueBytes (k , v .Bytes ())
}
}
}
panic ("cannot create parquet value of type " + k .String () + " from go value of type " + v .Type ().String ())
}
func makeValueKind(kind Kind ) Value {
return Value {kind : ^int8 (kind )}
}
func makeValueBoolean(value bool ) Value {
v := Value {kind : ^int8 (Boolean )}
if value {
v .u64 = 1
}
return v
}
func makeValueInt32(value int32 ) Value {
return Value {
kind : ^int8 (Int32 ),
u64 : uint64 (value ),
}
}
func makeValueInt64(value int64 ) Value {
return Value {
kind : ^int8 (Int64 ),
u64 : uint64 (value ),
}
}
func makeValueInt96(value deprecated .Int96 ) Value {
bits := [12 ]byte {}
binary .LittleEndian .PutUint32 (bits [0 :4 ], value [0 ])
binary .LittleEndian .PutUint32 (bits [4 :8 ], value [1 ])
binary .LittleEndian .PutUint32 (bits [8 :12 ], value [2 ])
return Value {
kind : ^int8 (Int96 ),
ptr : &bits [0 ],
u64 : 12 ,
}
}
func makeValueUint32(value uint32 ) Value {
return Value {
kind : ^int8 (Int32 ),
u64 : uint64 (value ),
}
}
func makeValueUint64(value uint64 ) Value {
return Value {
kind : ^int8 (Int64 ),
u64 : value ,
}
}
func makeValueFloat(value float32 ) Value {
return Value {
kind : ^int8 (Float ),
u64 : uint64 (math .Float32bits (value )),
}
}
func makeValueDouble(value float64 ) Value {
return Value {
kind : ^int8 (Double ),
u64 : math .Float64bits (value ),
}
}
func makeValueBytes(kind Kind , value []byte ) Value {
return makeValueByteArray (kind , unsafe .SliceData (value ), len (value ))
}
func makeValueString(kind Kind , value string ) Value {
return makeValueByteArray (kind , unsafe .StringData (value ), len (value ))
}
func makeValueFixedLenByteArray(v reflect .Value ) Value {
t := v .Type ()
if v .CanAddr () {
v = v .Addr ()
} else {
u := reflect .New (t )
u .Elem ().Set (v )
v = u
}
return makeValueByteArray (FixedLenByteArray , (*byte )(v .UnsafePointer ()), t .Len ())
}
func makeValueByteArray(kind Kind , data *byte , size int ) Value {
return Value {
kind : ^int8 (kind ),
ptr : data ,
u64 : uint64 (size ),
}
}
func (v *Value ) isNull () bool { return v .kind == 0 }
func (v *Value ) byte () byte { return byte (v .u64 ) }
func (v *Value ) boolean () bool { return v .u64 != 0 }
func (v *Value ) int32 () int32 { return int32 (v .u64 ) }
func (v *Value ) int64 () int64 { return int64 (v .u64 ) }
func (v *Value ) int96 () deprecated .Int96 { return makeInt96 (v .byteArray ()) }
func (v *Value ) float () float32 { return math .Float32frombits (uint32 (v .u64 )) }
func (v *Value ) double () float64 { return math .Float64frombits (uint64 (v .u64 )) }
func (v *Value ) uint32 () uint32 { return uint32 (v .u64 ) }
func (v *Value ) uint64 () uint64 { return v .u64 }
func (v *Value ) byteArray () []byte { return unsafe .Slice (v .ptr , v .u64 ) }
func (v *Value ) string () string { return unsafe .String (v .ptr , v .u64 ) }
func (v *Value ) be128 () *[16 ]byte { return (*[16 ]byte )(unsafe .Pointer (v .ptr )) }
func (v *Value ) column () int { return int (^v .columnIndex ) }
func (v Value ) convertToBoolean (x bool ) Value {
v .kind = ^int8 (Boolean )
v .ptr = nil
v .u64 = 0
if x {
v .u64 = 1
}
return v
}
func (v Value ) convertToInt32 (x int32 ) Value {
v .kind = ^int8 (Int32 )
v .ptr = nil
v .u64 = uint64 (x )
return v
}
func (v Value ) convertToInt64 (x int64 ) Value {
v .kind = ^int8 (Int64 )
v .ptr = nil
v .u64 = uint64 (x )
return v
}
func (v Value ) convertToInt96 (x deprecated .Int96 ) Value {
i96 := makeValueInt96 (x )
v .kind = i96 .kind
v .ptr = i96 .ptr
v .u64 = i96 .u64
return v
}
func (v Value ) convertToFloat (x float32 ) Value {
v .kind = ^int8 (Float )
v .ptr = nil
v .u64 = uint64 (math .Float32bits (x ))
return v
}
func (v Value ) convertToDouble (x float64 ) Value {
v .kind = ^int8 (Double )
v .ptr = nil
v .u64 = math .Float64bits (x )
return v
}
func (v Value ) convertToByteArray (x []byte ) Value {
v .kind = ^int8 (ByteArray )
v .ptr = unsafe .SliceData (x )
v .u64 = uint64 (len (x ))
return v
}
func (v Value ) convertToFixedLenByteArray (x []byte ) Value {
v .kind = ^int8 (FixedLenByteArray )
v .ptr = unsafe .SliceData (x )
v .u64 = uint64 (len (x ))
return v
}
func (v Value ) Kind () Kind { return ^Kind (v .kind ) }
func (v Value ) IsNull () bool { return v .isNull () }
func (v Value ) Byte () byte { return v .byte () }
func (v Value ) Boolean () bool { return v .boolean () }
func (v Value ) Int32 () int32 { return v .int32 () }
func (v Value ) Int64 () int64 { return v .int64 () }
func (v Value ) Int96 () deprecated .Int96 {
var val deprecated .Int96
if !v .isNull () {
val = v .int96 ()
}
return val
}
func (v Value ) Float () float32 { return v .float () }
func (v Value ) Double () float64 { return v .double () }
func (v Value ) Uint32 () uint32 { return v .uint32 () }
func (v Value ) Uint64 () uint64 { return v .uint64 () }
func (v Value ) ByteArray () []byte { return v .byteArray () }
func (v Value ) RepetitionLevel () int { return int (v .repetitionLevel ) }
func (v Value ) DefinitionLevel () int { return int (v .definitionLevel ) }
func (v Value ) Column () int { return v .column () }
func (v Value ) Bytes () []byte {
switch v .Kind () {
case Boolean :
buf := [8 ]byte {}
binary .LittleEndian .PutUint32 (buf [:4 ], v .uint32 ())
return buf [0 :1 ]
case Int32 , Float :
buf := [8 ]byte {}
binary .LittleEndian .PutUint32 (buf [:4 ], v .uint32 ())
return buf [:4 ]
case Int64 , Double :
buf := [8 ]byte {}
binary .LittleEndian .PutUint64 (buf [:8 ], v .uint64 ())
return buf [:8 ]
case ByteArray , FixedLenByteArray , Int96 :
return v .byteArray ()
default :
return nil
}
}
func (v Value ) AppendBytes (b []byte ) []byte {
buf := [8 ]byte {}
switch v .Kind () {
case Boolean :
binary .LittleEndian .PutUint32 (buf [:4 ], v .uint32 ())
return append (b , buf [0 ])
case Int32 , Float :
binary .LittleEndian .PutUint32 (buf [:4 ], v .uint32 ())
return append (b , buf [:4 ]...)
case Int64 , Double :
binary .LittleEndian .PutUint64 (buf [:8 ], v .uint64 ())
return append (b , buf [:8 ]...)
case ByteArray , FixedLenByteArray , Int96 :
return append (b , v .byteArray ()...)
default :
return b
}
}
func (v Value ) Format (w fmt .State , r rune ) {
switch r {
case 'c' :
if w .Flag ('+' ) {
io .WriteString (w , "C:" )
}
fmt .Fprint (w , v .column ())
case 'd' :
if w .Flag ('+' ) {
io .WriteString (w , "D:" )
}
fmt .Fprint (w , v .definitionLevel )
case 'r' :
if w .Flag ('+' ) {
io .WriteString (w , "R:" )
}
fmt .Fprint (w , v .repetitionLevel )
case 'q' :
if w .Flag ('+' ) {
io .WriteString (w , "V:" )
}
switch v .Kind () {
case ByteArray , FixedLenByteArray :
fmt .Fprintf (w , "%q" , v .byteArray ())
default :
fmt .Fprintf (w , `"%s"` , v )
}
case 's' :
if w .Flag ('+' ) {
io .WriteString (w , "V:" )
}
switch v .Kind () {
case Boolean :
fmt .Fprint (w , v .boolean ())
case Int32 :
fmt .Fprint (w , v .int32 ())
case Int64 :
fmt .Fprint (w , v .int64 ())
case Int96 :
fmt .Fprint (w , v .int96 ())
case Float :
fmt .Fprint (w , v .float ())
case Double :
fmt .Fprint (w , v .double ())
case ByteArray , FixedLenByteArray :
w .Write (v .byteArray ())
default :
io .WriteString (w , "<null>" )
}
case 'v' :
switch {
case w .Flag ('+' ):
fmt .Fprintf (w , "%+[1]c %+[1]d %+[1]r %+[1]s" , v )
case w .Flag ('#' ):
v .formatGoString (w )
default :
v .Format (w , 's' )
}
}
}
func (v Value ) formatGoString (w fmt .State ) {
io .WriteString (w , "parquet." )
switch v .Kind () {
case Boolean :
fmt .Fprintf (w , "BooleanValue(%t)" , v .boolean ())
case Int32 :
fmt .Fprintf (w , "Int32Value(%d)" , v .int32 ())
case Int64 :
fmt .Fprintf (w , "Int64Value(%d)" , v .int64 ())
case Int96 :
fmt .Fprintf (w , "Int96Value(%#v)" , v .int96 ())
case Float :
fmt .Fprintf (w , "FloatValue(%g)" , v .float ())
case Double :
fmt .Fprintf (w , "DoubleValue(%g)" , v .double ())
case ByteArray :
fmt .Fprintf (w , "ByteArrayValue(%q)" , v .byteArray ())
case FixedLenByteArray :
fmt .Fprintf (w , "FixedLenByteArrayValue(%#v)" , v .byteArray ())
default :
io .WriteString (w , "Value{}" )
return
}
fmt .Fprintf (w , ".Level(%d,%d,%d)" ,
v .RepetitionLevel (),
v .DefinitionLevel (),
v .Column (),
)
}
func (v Value ) String () string {
switch v .Kind () {
case Boolean :
return strconv .FormatBool (v .boolean ())
case Int32 :
return strconv .FormatInt (int64 (v .int32 ()), 10 )
case Int64 :
return strconv .FormatInt (v .int64 (), 10 )
case Int96 :
return v .Int96 ().String ()
case Float :
return strconv .FormatFloat (float64 (v .float ()), 'g' , -1 , 32 )
case Double :
return strconv .FormatFloat (v .double (), 'g' , -1 , 32 )
case ByteArray , FixedLenByteArray :
return string (v .byteArray ())
default :
return "<null>"
}
}
func (v Value ) GoString () string { return fmt .Sprintf ("%#v" , v ) }
func (v Value ) Level (repetitionLevel , definitionLevel , columnIndex int ) Value {
v .repetitionLevel = makeRepetitionLevel (repetitionLevel )
v .definitionLevel = makeDefinitionLevel (definitionLevel )
v .columnIndex = ^makeColumnIndex (columnIndex )
return v
}
func (v Value ) Clone () Value {
switch k := v .Kind (); k {
case ByteArray , FixedLenByteArray :
v .ptr = unsafe .SliceData (copyBytes (v .byteArray ()))
}
return v
}
func makeInt96(bits []byte ) (i96 deprecated .Int96 ) {
return deprecated .Int96 {
2 : binary .LittleEndian .Uint32 (bits [8 :12 ]),
1 : binary .LittleEndian .Uint32 (bits [4 :8 ]),
0 : binary .LittleEndian .Uint32 (bits [0 :4 ]),
}
}
func parseValue(kind Kind , data []byte ) (val Value , err error ) {
switch kind {
case Boolean :
if len (data ) == 1 {
val = makeValueBoolean (data [0 ] != 0 )
}
case Int32 :
if len (data ) == 4 {
val = makeValueInt32 (int32 (binary .LittleEndian .Uint32 (data )))
}
case Int64 :
if len (data ) == 8 {
val = makeValueInt64 (int64 (binary .LittleEndian .Uint64 (data )))
}
case Int96 :
if len (data ) == 12 {
val = makeValueInt96 (makeInt96 (data ))
}
case Float :
if len (data ) == 4 {
val = makeValueFloat (float32 (math .Float32frombits (binary .LittleEndian .Uint32 (data ))))
}
case Double :
if len (data ) == 8 {
val = makeValueDouble (float64 (math .Float64frombits (binary .LittleEndian .Uint64 (data ))))
}
case ByteArray , FixedLenByteArray :
val = makeValueBytes (kind , data )
}
if val .isNull () {
err = fmt .Errorf ("cannot decode %s value from input of length %d" , kind , len (data ))
}
return val , err
}
func copyBytes(b []byte ) []byte {
c := make ([]byte , len (b ))
copy (c , b )
return c
}
func Equal (v1 , v2 Value ) bool {
if v1 .kind != v2 .kind {
return false
}
switch ^Kind (v1 .kind ) {
case Boolean :
return v1 .boolean () == v2 .boolean ()
case Int32 :
return v1 .int32 () == v2 .int32 ()
case Int64 :
return v1 .int64 () == v2 .int64 ()
case Int96 :
return v1 .int96 () == v2 .int96 ()
case Float :
return v1 .float () == v2 .float ()
case Double :
return v1 .double () == v2 .double ()
case ByteArray , FixedLenByteArray :
return bytes .Equal (v1 .byteArray (), v2 .byteArray ())
case -1 :
return true
default :
return false
}
}
func DeepEqual (v1 , v2 Value ) bool {
return Equal (v1 , v2 ) &&
v1 .repetitionLevel == v2 .repetitionLevel &&
v1 .definitionLevel == v2 .definitionLevel &&
v1 .columnIndex == v2 .columnIndex
}
var (
_ fmt .Formatter = Value {}
_ fmt .Stringer = Value {}
)
func clearValues(values []Value ) {
for i := range values {
values [i ] = Value {}
}
}
type BooleanReader interface {
ReadBooleans (values []bool ) (int , error )
}
type BooleanWriter interface {
WriteBooleans (values []bool ) (int , error )
}
type Int32Reader interface {
ReadInt32s (values []int32 ) (int , error )
}
type Int32Writer interface {
WriteInt32s (values []int32 ) (int , error )
}
type Int64Reader interface {
ReadInt64s (values []int64 ) (int , error )
}
type Int64Writer interface {
WriteInt64s (values []int64 ) (int , error )
}
type Int96Reader interface {
ReadInt96s (values []deprecated .Int96 ) (int , error )
}
type Int96Writer interface {
WriteInt96s (values []deprecated .Int96 ) (int , error )
}
type FloatReader interface {
ReadFloats (values []float32 ) (int , error )
}
type FloatWriter interface {
WriteFloats (values []float32 ) (int , error )
}
type DoubleReader interface {
ReadDoubles (values []float64 ) (int , error )
}
type DoubleWriter interface {
WriteDoubles (values []float64 ) (int , error )
}
type ByteArrayReader interface {
ReadByteArrays (values []byte ) (int , error )
}
type ByteArrayWriter interface {
WriteByteArrays (values []byte ) (int , error )
}
type FixedLenByteArrayReader interface {
ReadFixedLenByteArrays (values []byte ) (int , error )
}
type FixedLenByteArrayWriter interface {
WriteFixedLenByteArrays (values []byte ) (int , error )
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .