package parquet
import (
"fmt"
"math"
"runtime/debug"
"strings"
"sync"
"github.com/parquet-go/parquet-go/compress"
)
type ReadMode int
const (
ReadModeSync ReadMode = iota
ReadModeAsync
)
const (
DefaultColumnIndexSizeLimit = 16
DefaultColumnBufferCapacity = 16 * 1024
DefaultPageBufferSize = 256 * 1024
DefaultWriteBufferSize = 32 * 1024
DefaultDataPageVersion = 2
DefaultDataPageStatistics = false
DefaultSkipPageIndex = false
DefaultSkipBloomFilters = false
DefaultMaxRowsPerRowGroup = math .MaxInt64
DefaultReadMode = ReadModeSync
)
const (
parquetGoModulePath = "github.com/parquet-go/parquet-go"
)
var (
defaultCreatedByInfo string
defaultCreatedByOnce sync .Once
)
func defaultCreatedBy() string {
defaultCreatedByOnce .Do (func () {
createdBy := parquetGoModulePath
build , ok := debug .ReadBuildInfo ()
if ok {
for _ , mod := range build .Deps {
if mod .Replace == nil && mod .Path == parquetGoModulePath {
semver , _ , buildsha := parseModuleVersion (mod .Version )
createdBy = formatCreatedBy (createdBy , semver , buildsha )
break
}
}
}
defaultCreatedByInfo = createdBy
})
return defaultCreatedByInfo
}
func parseModuleVersion(version string ) (semver , datetime , buildsha string ) {
semver , version = splitModuleVersion (version )
datetime , version = splitModuleVersion (version )
buildsha , _ = splitModuleVersion (version )
semver = strings .TrimPrefix (semver , "v" )
return
}
func splitModuleVersion(s string ) (head , tail string ) {
if i := strings .IndexByte (s , '-' ); i < 0 {
head = s
} else {
head , tail = s [:i ], s [i +1 :]
}
return
}
func formatCreatedBy(application , version , build string ) string {
return application + " version " + version + "(build " + build + ")"
}
type FileConfig struct {
SkipPageIndex bool
SkipBloomFilters bool
ReadBufferSize int
ReadMode ReadMode
Schema *Schema
}
func DefaultFileConfig () *FileConfig {
return &FileConfig {
SkipPageIndex : DefaultSkipPageIndex ,
SkipBloomFilters : DefaultSkipBloomFilters ,
ReadBufferSize : defaultReadBufferSize ,
ReadMode : DefaultReadMode ,
Schema : nil ,
}
}
func NewFileConfig (options ...FileOption ) (*FileConfig , error ) {
config := DefaultFileConfig ()
config .Apply (options ...)
return config , config .Validate ()
}
func (c *FileConfig ) Apply (options ...FileOption ) {
for _ , opt := range options {
opt .ConfigureFile (c )
}
}
func (c *FileConfig ) ConfigureFile (config *FileConfig ) {
*config = FileConfig {
SkipPageIndex : c .SkipPageIndex ,
SkipBloomFilters : c .SkipBloomFilters ,
ReadBufferSize : coalesceInt (c .ReadBufferSize , config .ReadBufferSize ),
ReadMode : ReadMode (coalesceInt (int (c .ReadMode ), int (config .ReadMode ))),
Schema : coalesceSchema (c .Schema , config .Schema ),
}
}
func (c *FileConfig ) Validate () error {
return nil
}
type ReaderConfig struct {
Schema *Schema
}
func DefaultReaderConfig () *ReaderConfig {
return &ReaderConfig {}
}
func NewReaderConfig (options ...ReaderOption ) (*ReaderConfig , error ) {
config := DefaultReaderConfig ()
config .Apply (options ...)
return config , config .Validate ()
}
func (c *ReaderConfig ) Apply (options ...ReaderOption ) {
for _ , opt := range options {
opt .ConfigureReader (c )
}
}
func (c *ReaderConfig ) ConfigureReader (config *ReaderConfig ) {
*config = ReaderConfig {
Schema : coalesceSchema (c .Schema , config .Schema ),
}
}
func (c *ReaderConfig ) Validate () error {
return nil
}
type WriterConfig struct {
CreatedBy string
ColumnPageBuffers BufferPool
ColumnIndexSizeLimit int
PageBufferSize int
WriteBufferSize int
DataPageVersion int
DataPageStatistics bool
MaxRowsPerRowGroup int64
KeyValueMetadata map [string ]string
Schema *Schema
BloomFilters []BloomFilterColumn
Compression compress .Codec
Sorting SortingConfig
SkipPageBounds [][]string
}
func DefaultWriterConfig () *WriterConfig {
return &WriterConfig {
CreatedBy : defaultCreatedBy (),
ColumnPageBuffers : &defaultColumnBufferPool ,
ColumnIndexSizeLimit : DefaultColumnIndexSizeLimit ,
PageBufferSize : DefaultPageBufferSize ,
WriteBufferSize : DefaultWriteBufferSize ,
DataPageVersion : DefaultDataPageVersion ,
DataPageStatistics : DefaultDataPageStatistics ,
MaxRowsPerRowGroup : DefaultMaxRowsPerRowGroup ,
Sorting : SortingConfig {
SortingBuffers : &defaultSortingBufferPool ,
},
}
}
func NewWriterConfig (options ...WriterOption ) (*WriterConfig , error ) {
config := DefaultWriterConfig ()
config .Apply (options ...)
return config , config .Validate ()
}
func (c *WriterConfig ) Apply (options ...WriterOption ) {
for _ , opt := range options {
opt .ConfigureWriter (c )
}
}
func (c *WriterConfig ) ConfigureWriter (config *WriterConfig ) {
keyValueMetadata := config .KeyValueMetadata
if len (c .KeyValueMetadata ) > 0 {
if keyValueMetadata == nil {
keyValueMetadata = make (map [string ]string , len (c .KeyValueMetadata ))
}
for k , v := range c .KeyValueMetadata {
keyValueMetadata [k ] = v
}
}
*config = WriterConfig {
CreatedBy : coalesceString (c .CreatedBy , config .CreatedBy ),
ColumnPageBuffers : coalesceBufferPool (c .ColumnPageBuffers , config .ColumnPageBuffers ),
ColumnIndexSizeLimit : coalesceInt (c .ColumnIndexSizeLimit , config .ColumnIndexSizeLimit ),
PageBufferSize : coalesceInt (c .PageBufferSize , config .PageBufferSize ),
WriteBufferSize : coalesceInt (c .WriteBufferSize , config .WriteBufferSize ),
DataPageVersion : coalesceInt (c .DataPageVersion , config .DataPageVersion ),
DataPageStatistics : coalesceBool (c .DataPageStatistics , config .DataPageStatistics ),
MaxRowsPerRowGroup : coalesceInt64 (c .MaxRowsPerRowGroup , config .MaxRowsPerRowGroup ),
KeyValueMetadata : keyValueMetadata ,
Schema : coalesceSchema (c .Schema , config .Schema ),
BloomFilters : coalesceBloomFilters (c .BloomFilters , config .BloomFilters ),
Compression : coalesceCompression (c .Compression , config .Compression ),
Sorting : coalesceSortingConfig (c .Sorting , config .Sorting ),
}
}
func (c *WriterConfig ) Validate () error {
const baseName = "parquet.(*WriterConfig)."
return errorInvalidConfiguration (
validateNotNil (baseName +"ColumnPageBuffers" , c .ColumnPageBuffers ),
validatePositiveInt (baseName +"ColumnIndexSizeLimit" , c .ColumnIndexSizeLimit ),
validatePositiveInt (baseName +"PageBufferSize" , c .PageBufferSize ),
validateOneOfInt (baseName +"DataPageVersion" , c .DataPageVersion , 1 , 2 ),
c .Sorting .Validate (),
)
}
type RowGroupConfig struct {
ColumnBufferCapacity int
Schema *Schema
Sorting SortingConfig
}
func DefaultRowGroupConfig () *RowGroupConfig {
return &RowGroupConfig {
ColumnBufferCapacity : DefaultColumnBufferCapacity ,
Sorting : SortingConfig {
SortingBuffers : &defaultSortingBufferPool ,
},
}
}
func NewRowGroupConfig (options ...RowGroupOption ) (*RowGroupConfig , error ) {
config := DefaultRowGroupConfig ()
config .Apply (options ...)
return config , config .Validate ()
}
func (c *RowGroupConfig ) Validate () error {
const baseName = "parquet.(*RowGroupConfig)."
return errorInvalidConfiguration (
validatePositiveInt (baseName +"ColumnBufferCapacity" , c .ColumnBufferCapacity ),
c .Sorting .Validate (),
)
}
func (c *RowGroupConfig ) Apply (options ...RowGroupOption ) {
for _ , opt := range options {
opt .ConfigureRowGroup (c )
}
}
func (c *RowGroupConfig ) ConfigureRowGroup (config *RowGroupConfig ) {
*config = RowGroupConfig {
ColumnBufferCapacity : coalesceInt (c .ColumnBufferCapacity , config .ColumnBufferCapacity ),
Schema : coalesceSchema (c .Schema , config .Schema ),
Sorting : coalesceSortingConfig (c .Sorting , config .Sorting ),
}
}
type SortingConfig struct {
SortingBuffers BufferPool
SortingColumns []SortingColumn
DropDuplicatedRows bool
}
func DefaultSortingConfig () *SortingConfig {
return &SortingConfig {
SortingBuffers : &defaultSortingBufferPool ,
}
}
func NewSortingConfig (options ...SortingOption ) (*SortingConfig , error ) {
config := DefaultSortingConfig ()
config .Apply (options ...)
return config , config .Validate ()
}
func (c *SortingConfig ) Validate () error {
const baseName = "parquet.(*SortingConfig)."
return errorInvalidConfiguration (
validateNotNil (baseName +"SortingBuffers" , c .SortingBuffers ),
)
}
func (c *SortingConfig ) Apply (options ...SortingOption ) {
for _ , opt := range options {
opt .ConfigureSorting (c )
}
}
func (c *SortingConfig ) ConfigureSorting (config *SortingConfig ) {
*config = coalesceSortingConfig (*c , *config )
}
type FileOption interface {
ConfigureFile (*FileConfig )
}
type ReaderOption interface {
ConfigureReader (*ReaderConfig )
}
type WriterOption interface {
ConfigureWriter (*WriterConfig )
}
type RowGroupOption interface {
ConfigureRowGroup (*RowGroupConfig )
}
type SortingOption interface {
ConfigureSorting (*SortingConfig )
}
func SkipPageIndex (skip bool ) FileOption {
return fileOption (func (config *FileConfig ) { config .SkipPageIndex = skip })
}
func SkipBloomFilters (skip bool ) FileOption {
return fileOption (func (config *FileConfig ) { config .SkipBloomFilters = skip })
}
func FileReadMode (mode ReadMode ) FileOption {
return fileOption (func (config *FileConfig ) { config .ReadMode = mode })
}
func ReadBufferSize (size int ) FileOption {
return fileOption (func (config *FileConfig ) { config .ReadBufferSize = size })
}
func FileSchema (schema *Schema ) FileOption {
return fileOption (func (config *FileConfig ) { config .Schema = schema })
}
func PageBufferSize (size int ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .PageBufferSize = size })
}
func WriteBufferSize (size int ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .WriteBufferSize = size })
}
func MaxRowsPerRowGroup (numRows int64 ) WriterOption {
if numRows <= 0 {
numRows = DefaultMaxRowsPerRowGroup
}
return writerOption (func (config *WriterConfig ) { config .MaxRowsPerRowGroup = numRows })
}
func CreatedBy (application , version , build string ) WriterOption {
createdBy := formatCreatedBy (application , version , build )
return writerOption (func (config *WriterConfig ) { config .CreatedBy = createdBy })
}
func ColumnPageBuffers (buffers BufferPool ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .ColumnPageBuffers = buffers })
}
func ColumnIndexSizeLimit (sizeLimit int ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .ColumnIndexSizeLimit = sizeLimit })
}
func DataPageVersion (version int ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .DataPageVersion = version })
}
func DataPageStatistics (enabled bool ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .DataPageStatistics = enabled })
}
func KeyValueMetadata (key , value string ) WriterOption {
return writerOption (func (config *WriterConfig ) {
if config .KeyValueMetadata == nil {
config .KeyValueMetadata = map [string ]string {key : value }
} else {
config .KeyValueMetadata [key ] = value
}
})
}
func BloomFilters (filters ...BloomFilterColumn ) WriterOption {
filters = append ([]BloomFilterColumn {}, filters ...)
return writerOption (func (config *WriterConfig ) { config .BloomFilters = filters })
}
func Compression (codec compress .Codec ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .Compression = codec })
}
func SortingWriterConfig (options ...SortingOption ) WriterOption {
options = append ([]SortingOption {}, options ...)
return writerOption (func (config *WriterConfig ) { config .Sorting .Apply (options ...) })
}
func SkipPageBounds (path ...string ) WriterOption {
return writerOption (func (config *WriterConfig ) { config .SkipPageBounds = append (config .SkipPageBounds , path ) })
}
func ColumnBufferCapacity (size int ) RowGroupOption {
return rowGroupOption (func (config *RowGroupConfig ) { config .ColumnBufferCapacity = size })
}
func SortingRowGroupConfig (options ...SortingOption ) RowGroupOption {
options = append ([]SortingOption {}, options ...)
return rowGroupOption (func (config *RowGroupConfig ) { config .Sorting .Apply (options ...) })
}
func SortingColumns (columns ...SortingColumn ) SortingOption {
columns = append ([]SortingColumn {}, columns ...)
return sortingOption (func (config *SortingConfig ) { config .SortingColumns = columns })
}
func SortingBuffers (buffers BufferPool ) SortingOption {
return sortingOption (func (config *SortingConfig ) { config .SortingBuffers = buffers })
}
func DropDuplicatedRows (drop bool ) SortingOption {
return sortingOption (func (config *SortingConfig ) { config .DropDuplicatedRows = drop })
}
type fileOption func (*FileConfig )
func (opt fileOption ) ConfigureFile (config *FileConfig ) { opt (config ) }
type readerOption func (*ReaderConfig )
func (opt readerOption ) ConfigureReader (config *ReaderConfig ) { opt (config ) }
type writerOption func (*WriterConfig )
func (opt writerOption ) ConfigureWriter (config *WriterConfig ) { opt (config ) }
type rowGroupOption func (*RowGroupConfig )
func (opt rowGroupOption ) ConfigureRowGroup (config *RowGroupConfig ) { opt (config ) }
type sortingOption func (*SortingConfig )
func (opt sortingOption ) ConfigureSorting (config *SortingConfig ) { opt (config ) }
func coalesceBool(i1 , i2 bool ) bool {
return i1 || i2
}
func coalesceInt(i1 , i2 int ) int {
if i1 != 0 {
return i1
}
return i2
}
func coalesceInt64(i1 , i2 int64 ) int64 {
if i1 != 0 {
return i1
}
return i2
}
func coalesceString(s1 , s2 string ) string {
if s1 != "" {
return s1
}
return s2
}
func coalesceBytes(b1 , b2 []byte ) []byte {
if b1 != nil {
return b1
}
return b2
}
func coalesceBufferPool(p1 , p2 BufferPool ) BufferPool {
if p1 != nil {
return p1
}
return p2
}
func coalesceSchema(s1 , s2 *Schema ) *Schema {
if s1 != nil {
return s1
}
return s2
}
func coalesceSortingColumns(s1 , s2 []SortingColumn ) []SortingColumn {
if s1 != nil {
return s1
}
return s2
}
func coalesceSortingConfig(c1 , c2 SortingConfig ) SortingConfig {
return SortingConfig {
SortingBuffers : coalesceBufferPool (c1 .SortingBuffers , c2 .SortingBuffers ),
SortingColumns : coalesceSortingColumns (c1 .SortingColumns , c2 .SortingColumns ),
DropDuplicatedRows : c1 .DropDuplicatedRows ,
}
}
func coalesceBloomFilters(f1 , f2 []BloomFilterColumn ) []BloomFilterColumn {
if f1 != nil {
return f1
}
return f2
}
func coalesceCompression(c1 , c2 compress .Codec ) compress .Codec {
if c1 != nil {
return c1
}
return c2
}
func validatePositiveInt(optionName string , optionValue int ) error {
if optionValue > 0 {
return nil
}
return errorInvalidOptionValue (optionName , optionValue )
}
func validatePositiveInt64(optionName string , optionValue int64 ) error {
if optionValue > 0 {
return nil
}
return errorInvalidOptionValue (optionName , optionValue )
}
func validateOneOfInt(optionName string , optionValue int , supportedValues ...int ) error {
for _ , value := range supportedValues {
if value == optionValue {
return nil
}
}
return errorInvalidOptionValue (optionName , optionValue )
}
func validateNotNil(optionName string , optionValue interface {}) error {
if optionValue != nil {
return nil
}
return errorInvalidOptionValue (optionName , optionValue )
}
func errorInvalidOptionValue(optionName string , optionValue interface {}) error {
return fmt .Errorf ("invalid option value: %s: %v" , optionName , optionValue )
}
func errorInvalidConfiguration(reasons ...error ) error {
var err *invalidConfiguration
for _ , reason := range reasons {
if reason != nil {
if err == nil {
err = new (invalidConfiguration )
}
err .reasons = append (err .reasons , reason )
}
}
if err != nil {
return err
}
return nil
}
type invalidConfiguration struct {
reasons []error
}
func (err *invalidConfiguration ) Error () string {
errorMessage := new (strings .Builder )
for _ , reason := range err .reasons {
errorMessage .WriteString (reason .Error())
errorMessage .WriteString ("\n" )
}
errorString := errorMessage .String ()
if errorString != "" {
errorString = errorString [:len (errorString )-1 ]
}
return errorString
}
var (
_ FileOption = (*FileConfig )(nil )
_ ReaderOption = (*ReaderConfig )(nil )
_ WriterOption = (*WriterConfig )(nil )
_ RowGroupOption = (*RowGroupConfig )(nil )
_ SortingOption = (*SortingConfig )(nil )
)
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .