package parquet
import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/compress"
format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
)
const (
DefaultBufSize int64 = 4096 * 4
DefaultDataPageSize int64 = 1024 * 1024
DefaultDictionaryEnabled = true
DefaultDictionaryPageSizeLimit = DefaultDataPageSize
DefaultWriteBatchSize int64 = 1024
DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024
DefaultStatsEnabled = true
DefaultMaxStatsSize int64 = 4096
DefaultPageIndexEnabled = false
DefaultCreatedBy = "parquet-go version " + arrow .PkgVersion
DefaultRootName = "schema"
DefaultMaxBloomFilterBytes = 1024 * 1024
DefaultBloomFilterEnabled = false
DefaultBloomFilterFPP = 0.01
DefaultAdaptiveBloomFilterEnabled = false
DefaultBloomFilterCandidates = 5
)
type ColumnProperties struct {
Encoding Encoding
Codec compress .Compression
DictionaryEnabled bool
StatsEnabled bool
PageIndexEnabled bool
MaxStatsSize int64
CompressionLevel int
BloomFilterEnabled bool
BloomFilterFPP float64
AdaptiveBloomFilterEnabled bool
BloomFilterCandidates int
BloomFilterNDV int64
}
func DefaultColumnProperties () ColumnProperties {
return ColumnProperties {
Encoding : Encodings .Plain ,
Codec : compress .Codecs .Uncompressed ,
DictionaryEnabled : DefaultDictionaryEnabled ,
StatsEnabled : DefaultStatsEnabled ,
PageIndexEnabled : DefaultPageIndexEnabled ,
MaxStatsSize : DefaultMaxStatsSize ,
CompressionLevel : compress .DefaultCompressionLevel ,
BloomFilterEnabled : DefaultBloomFilterEnabled ,
BloomFilterFPP : DefaultBloomFilterFPP ,
AdaptiveBloomFilterEnabled : DefaultAdaptiveBloomFilterEnabled ,
BloomFilterCandidates : DefaultBloomFilterCandidates ,
}
}
type SortingColumn = format .SortingColumn
type writerPropConfig struct {
wr *WriterProperties
encodings map [string ]Encoding
codecs map [string ]compress .Compression
compressLevel map [string ]int
dictEnabled map [string ]bool
statsEnabled map [string ]bool
indexEnabled map [string ]bool
bloomFilterNDVs map [string ]int64
bloomFilterFPPs map [string ]float64
bloomFilterEnabled map [string ]bool
adaptiveBloomFilterEnabled map [string ]bool
numBloomFilterCandidates map [string ]int
}
type WriterProperty func (*writerPropConfig )
func WithAllocator (mem memory .Allocator ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .mem = mem
}
}
func WithDictionaryDefault (dict bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .DictionaryEnabled = dict
}
}
func WithDictionaryFor (path string , dict bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .dictEnabled [path ] = dict
}
}
func WithDictionaryPath (path ColumnPath , dict bool ) WriterProperty {
return WithDictionaryFor (path .String (), dict )
}
func WithDictionaryPageSizeLimit (limit int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .dictPagesize = limit
}
}
func WithBatchSize (batch int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .batchSize = batch
}
}
func WithMaxRowGroupLength (nrows int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .maxRowGroupLen = nrows
}
}
func WithDataPageSize (pgsize int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .pageSize = pgsize
}
}
func WithDataPageVersion (version DataPageVersion ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .dataPageVersion = version
}
}
func WithVersion (version Version ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .parquetVersion = version
}
}
func WithCreatedBy (createdby string ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .createdBy = createdby
}
}
func WithRootName (name string ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .rootName = name
}
}
func WithRootRepetition (repetition Repetition ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .rootRepetition = repetition
}
}
func WithEncoding (encoding Encoding ) WriterProperty {
return func (cfg *writerPropConfig ) {
if encoding == Encodings .PlainDict || encoding == Encodings .RLEDict {
panic ("parquet: can't use dictionary encoding as fallback encoding" )
}
cfg .wr .defColumnProps .Encoding = encoding
}
}
func WithEncodingFor (path string , encoding Encoding ) WriterProperty {
return func (cfg *writerPropConfig ) {
if encoding == Encodings .PlainDict || encoding == Encodings .RLEDict {
panic ("parquet: can't use dictionary encoding as fallback encoding" )
}
cfg .encodings [path ] = encoding
}
}
func WithEncodingPath (path ColumnPath , encoding Encoding ) WriterProperty {
return WithEncodingFor (path .String (), encoding )
}
func WithCompression (codec compress .Compression ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .Codec = codec
}
}
func WithCompressionFor (path string , codec compress .Compression ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .codecs [path ] = codec
}
}
func WithCompressionPath (path ColumnPath , codec compress .Compression ) WriterProperty {
return WithCompressionFor (path .String (), codec )
}
func WithMaxStatsSize (maxStatsSize int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .MaxStatsSize = maxStatsSize
}
}
func WithCompressionLevel (level int ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .CompressionLevel = level
}
}
func WithCompressionLevelFor (path string , level int ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .compressLevel [path ] = level
}
}
func WithCompressionLevelPath (path ColumnPath , level int ) WriterProperty {
return WithCompressionLevelFor (path .String (), level )
}
func WithStats (enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .StatsEnabled = enabled
}
}
func WithStatsFor (path string , enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .statsEnabled [path ] = enabled
}
}
func WithStatsPath (path ColumnPath , enabled bool ) WriterProperty {
return WithStatsFor (path .String (), enabled )
}
func WithEncryptionProperties (props *FileEncryptionProperties ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .encryptionProps = props
}
}
func WithStoreDecimalAsInteger (enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .storeDecimalAsInt = enabled
}
}
func WithSortingColumns (cols []SortingColumn ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .sortingCols = cols
}
}
func WithPageIndexEnabled (enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .PageIndexEnabled = enabled
}
}
func WithPageIndexEnabledFor (path string , enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .indexEnabled [path ] = enabled
}
}
func WithPageIndexEnabledPath (path ColumnPath , enabled bool ) WriterProperty {
return WithPageIndexEnabledFor (path .String (), enabled )
}
func WithMaxBloomFilterBytes (nbytes int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .maxBloomFilterBytes = nbytes
}
}
func WithBloomFilterEnabled (enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .BloomFilterEnabled = enabled
}
}
func WithBloomFilterEnabledFor (path string , enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .bloomFilterEnabled [path ] = enabled
}
}
func WithBloomFilterEnabledPath (path ColumnPath , enabled bool ) WriterProperty {
return WithBloomFilterEnabledFor (path .String (), enabled )
}
func WithBloomFilterFPP (fpp float64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .BloomFilterFPP = fpp
}
}
func WithBloomFilterFPPFor (path string , fpp float64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .bloomFilterFPPs [path ] = fpp
}
}
func WithBloomFilterFPPPath (path ColumnPath , fpp float64 ) WriterProperty {
return WithBloomFilterFPPFor (path .String (), fpp )
}
func WithAdaptiveBloomFilterEnabled (enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .AdaptiveBloomFilterEnabled = enabled
}
}
func WithAdaptiveBloomFilterEnabledFor (path string , enabled bool ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .adaptiveBloomFilterEnabled [path ] = enabled
}
}
func WithAdaptiveBloomFilterEnabledPath (path ColumnPath , enabled bool ) WriterProperty {
return WithAdaptiveBloomFilterEnabledFor (path .String (), enabled )
}
func WithBloomFilterCandidates (candidates int ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .BloomFilterCandidates = candidates
}
}
func WithBloomFilterCandidatesFor (path string , candidates int ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .numBloomFilterCandidates [path ] = candidates
}
}
func WithBloomFilterCandidatesPath (path ColumnPath , candidates int ) WriterProperty {
return WithBloomFilterCandidatesFor (path .String (), candidates )
}
func WithBloomFilterNDV (ndv int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .wr .defColumnProps .BloomFilterNDV = ndv
}
}
func WithBloomFilterNDVFor (path string , ndv int64 ) WriterProperty {
return func (cfg *writerPropConfig ) {
cfg .bloomFilterNDVs [path ] = ndv
}
}
func WithBloomFilterNDVPath (path ColumnPath , ndv int64 ) WriterProperty {
return WithBloomFilterNDVFor (path .String (), ndv )
}
type WriterProperties struct {
mem memory .Allocator
dictPagesize int64
batchSize int64
maxRowGroupLen int64
pageSize int64
parquetVersion Version
createdBy string
dataPageVersion DataPageVersion
rootName string
rootRepetition Repetition
storeDecimalAsInt bool
maxBloomFilterBytes int64
defColumnProps ColumnProperties
columnProps map [string ]*ColumnProperties
encryptionProps *FileEncryptionProperties
sortingCols []SortingColumn
}
func defaultWriterProperties() *WriterProperties {
return &WriterProperties {
mem : memory .DefaultAllocator ,
dictPagesize : DefaultDictionaryPageSizeLimit ,
batchSize : DefaultWriteBatchSize ,
maxRowGroupLen : DefaultMaxRowGroupLen ,
pageSize : DefaultDataPageSize ,
parquetVersion : V2_LATEST ,
dataPageVersion : DataPageV1 ,
createdBy : DefaultCreatedBy ,
rootName : DefaultRootName ,
rootRepetition : Repetitions .Repeated ,
maxBloomFilterBytes : DefaultMaxBloomFilterBytes ,
defColumnProps : DefaultColumnProperties (),
sortingCols : []SortingColumn {},
}
}
func NewWriterProperties (opts ...WriterProperty ) *WriterProperties {
cfg := writerPropConfig {
wr : defaultWriterProperties (),
encodings : make (map [string ]Encoding ),
codecs : make (map [string ]compress .Compression ),
compressLevel : make (map [string ]int ),
dictEnabled : make (map [string ]bool ),
statsEnabled : make (map [string ]bool ),
indexEnabled : make (map [string ]bool ),
bloomFilterNDVs : make (map [string ]int64 ),
bloomFilterFPPs : make (map [string ]float64 ),
bloomFilterEnabled : make (map [string ]bool ),
adaptiveBloomFilterEnabled : make (map [string ]bool ),
numBloomFilterCandidates : make (map [string ]int ),
}
for _ , o := range opts {
o (&cfg )
}
cfg .wr .columnProps = make (map [string ]*ColumnProperties )
get := func (key string ) *ColumnProperties {
if p , ok := cfg .wr .columnProps [key ]; ok {
return p
}
cfg .wr .columnProps [key ] = new (ColumnProperties )
*cfg .wr .columnProps [key ] = cfg .wr .defColumnProps
return cfg .wr .columnProps [key ]
}
for key , value := range cfg .encodings {
get (key ).Encoding = value
}
for key , value := range cfg .codecs {
get (key ).Codec = value
}
for key , value := range cfg .compressLevel {
get (key ).CompressionLevel = value
}
for key , value := range cfg .dictEnabled {
get (key ).DictionaryEnabled = value
}
for key , value := range cfg .statsEnabled {
get (key ).StatsEnabled = value
}
for key , value := range cfg .indexEnabled {
get (key ).PageIndexEnabled = value
}
for key , value := range cfg .bloomFilterEnabled {
get (key ).BloomFilterEnabled = value
}
for key , value := range cfg .bloomFilterFPPs {
get (key ).BloomFilterFPP = value
}
for key , value := range cfg .bloomFilterNDVs {
get (key ).BloomFilterNDV = value
}
for key , value := range cfg .adaptiveBloomFilterEnabled {
get (key ).AdaptiveBloomFilterEnabled = value
}
for key , value := range cfg .numBloomFilterCandidates {
get (key ).BloomFilterCandidates = value
}
return cfg .wr
}
func (w *WriterProperties ) FileEncryptionProperties () *FileEncryptionProperties {
return w .encryptionProps
}
func (w *WriterProperties ) Allocator () memory .Allocator { return w .mem }
func (w *WriterProperties ) CreatedBy () string { return w .createdBy }
func (w *WriterProperties ) RootName () string { return w .rootName }
func (w *WriterProperties ) RootRepetition () Repetition { return w .rootRepetition }
func (w *WriterProperties ) WriteBatchSize () int64 { return w .batchSize }
func (w *WriterProperties ) DataPageSize () int64 { return w .pageSize }
func (w *WriterProperties ) DictionaryPageSizeLimit () int64 { return w .dictPagesize }
func (w *WriterProperties ) Version () Version { return w .parquetVersion }
func (w *WriterProperties ) DataPageVersion () DataPageVersion { return w .dataPageVersion }
func (w *WriterProperties ) MaxRowGroupLength () int64 { return w .maxRowGroupLen }
func (w *WriterProperties ) SortingColumns () []SortingColumn { return w .sortingCols }
func (w *WriterProperties ) Compression () compress .Compression { return w .defColumnProps .Codec }
func (w *WriterProperties ) CompressionFor (path string ) compress .Compression {
if p , ok := w .columnProps [path ]; ok {
return p .Codec
}
return w .defColumnProps .Codec
}
func (w *WriterProperties ) CompressionPath (path ColumnPath ) compress .Compression {
return w .CompressionFor (path .String ())
}
func (w *WriterProperties ) CompressionLevel () int { return w .defColumnProps .CompressionLevel }
func (w *WriterProperties ) CompressionLevelFor (path string ) int {
if p , ok := w .columnProps [path ]; ok {
return p .CompressionLevel
}
return w .defColumnProps .CompressionLevel
}
func (w *WriterProperties ) CompressionLevelPath (path ColumnPath ) int {
return w .CompressionLevelFor (path .String ())
}
func (w *WriterProperties ) Encoding () Encoding { return w .defColumnProps .Encoding }
func (w *WriterProperties ) EncodingFor (path string ) Encoding {
if p , ok := w .columnProps [path ]; ok {
return p .Encoding
}
return w .defColumnProps .Encoding
}
func (w *WriterProperties ) EncodingPath (path ColumnPath ) Encoding {
return w .EncodingFor (path .String ())
}
func (w *WriterProperties ) DictionaryIndexEncoding () Encoding {
if w .parquetVersion == V1_0 {
return Encodings .PlainDict
}
return Encodings .RLEDict
}
func (w *WriterProperties ) DictionaryPageEncoding () Encoding {
if w .parquetVersion == V1_0 {
return Encodings .PlainDict
}
return Encodings .Plain
}
func (w *WriterProperties ) DictionaryEnabled () bool { return w .defColumnProps .DictionaryEnabled }
func (w *WriterProperties ) DictionaryEnabledFor (path string ) bool {
if p , ok := w .columnProps [path ]; ok {
return p .DictionaryEnabled
}
return w .defColumnProps .DictionaryEnabled
}
func (w *WriterProperties ) DictionaryEnabledPath (path ColumnPath ) bool {
return w .DictionaryEnabledFor (path .String ())
}
func (w *WriterProperties ) StatisticsEnabled () bool { return w .defColumnProps .StatsEnabled }
func (w *WriterProperties ) StatisticsEnabledFor (path string ) bool {
if p , ok := w .columnProps [path ]; ok {
return p .StatsEnabled
}
return w .defColumnProps .StatsEnabled
}
func (w *WriterProperties ) StatisticsEnabledPath (path ColumnPath ) bool {
return w .StatisticsEnabledFor (path .String ())
}
func (w *WriterProperties ) PageIndexEnabled () bool { return w .defColumnProps .PageIndexEnabled }
func (w *WriterProperties ) PageIndexEnabledFor (path string ) bool {
if p , ok := w .columnProps [path ]; ok {
return p .PageIndexEnabled
}
return w .defColumnProps .PageIndexEnabled
}
func (w *WriterProperties ) PageIndexEnabledPath (path ColumnPath ) bool {
return w .PageIndexEnabledFor (path .String ())
}
func (w *WriterProperties ) MaxStatsSize () int64 { return w .defColumnProps .MaxStatsSize }
func (w *WriterProperties ) MaxStatsSizeFor (path string ) int64 {
if p , ok := w .columnProps [path ]; ok {
return p .MaxStatsSize
}
return w .defColumnProps .MaxStatsSize
}
func (w *WriterProperties ) MaxStatsSizePath (path ColumnPath ) int64 {
return w .MaxStatsSizeFor (path .String ())
}
func (w *WriterProperties ) ColumnEncryptionProperties (path string ) *ColumnEncryptionProperties {
if w .encryptionProps != nil {
return w .encryptionProps .ColumnEncryptionProperties (path )
}
return nil
}
func (w *WriterProperties ) StoreDecimalAsInteger () bool {
return w .storeDecimalAsInt
}
func (w *WriterProperties ) MaxBloomFilterBytes () int64 {
return w .maxBloomFilterBytes
}
func (w *WriterProperties ) BloomFilterEnabled () bool {
return w .defColumnProps .BloomFilterEnabled
}
func (w *WriterProperties ) BloomFilterEnabledFor (path string ) bool {
if p , ok := w .columnProps [path ]; ok {
return p .BloomFilterEnabled
}
return w .defColumnProps .BloomFilterEnabled
}
func (w *WriterProperties ) BloomFilterEnabledPath (path ColumnPath ) bool {
return w .BloomFilterEnabledFor (path .String ())
}
func (w *WriterProperties ) BloomFilterFPP () float64 {
return w .defColumnProps .BloomFilterFPP
}
func (w *WriterProperties ) BloomFilterFPPFor (path string ) float64 {
if p , ok := w .columnProps [path ]; ok {
return p .BloomFilterFPP
}
return w .defColumnProps .BloomFilterFPP
}
func (w *WriterProperties ) BloomFilterFPPPath (path ColumnPath ) float64 {
return w .BloomFilterFPPFor (path .String ())
}
func (w *WriterProperties ) AdaptiveBloomFilterEnabled () bool {
return w .defColumnProps .AdaptiveBloomFilterEnabled
}
func (w *WriterProperties ) AdaptiveBloomFilterEnabledFor (path string ) bool {
if p , ok := w .columnProps [path ]; ok {
return p .AdaptiveBloomFilterEnabled
}
return w .defColumnProps .AdaptiveBloomFilterEnabled
}
func (w *WriterProperties ) AdaptiveBloomFilterEnabledPath (path ColumnPath ) bool {
return w .AdaptiveBloomFilterEnabledFor (path .String ())
}
func (w *WriterProperties ) BloomFilterCandidates () int {
return w .defColumnProps .BloomFilterCandidates
}
func (w *WriterProperties ) BloomFilterCandidatesFor (path string ) int {
if p , ok := w .columnProps [path ]; ok {
return p .BloomFilterCandidates
}
return w .defColumnProps .BloomFilterCandidates
}
func (w *WriterProperties ) BloomFilterCandidatesPath (path ColumnPath ) int {
return w .BloomFilterCandidatesFor (path .String ())
}
func (w *WriterProperties ) BloomFilterNDV () int64 {
return w .defColumnProps .BloomFilterNDV
}
func (w *WriterProperties ) BloomFilterNDVFor (path string ) int64 {
if p , ok := w .columnProps [path ]; ok {
return p .BloomFilterNDV
}
return w .defColumnProps .BloomFilterNDV
}
func (w *WriterProperties ) BloomFilterNDVPath (path ColumnPath ) int64 {
return w .BloomFilterNDVFor (path .String ())
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .