package iceberg
import (
"context"
"encoding/json"
"fmt"
"io"
"sync"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/ocf"
"github.com/thanos-io/objstore"
)
type ManifestContent int32
const (
ManifestContentData ManifestContent = 0
ManifestContentDeletes ManifestContent = 1
)
type FieldSummary struct {
ContainsNull bool `avro:"contains_null"`
ContainsNaN *bool `avro:"contains_nan"`
LowerBound *[]byte `avro:"lower_bound"`
UpperBound *[]byte `avro:"upper_bound"`
}
type ManifestV1Builder struct {
m *manifestFileV1
}
func NewManifestV1Builder (path string , length int64 , partitionSpecID int32 , addedSnapshotID int64 ) *ManifestV1Builder {
return &ManifestV1Builder {
m : &manifestFileV1 {
Path : path ,
Len : length ,
SpecID : partitionSpecID ,
AddedSnapshotID : addedSnapshotID ,
},
}
}
func (b *ManifestV1Builder ) AddedFiles (cnt int32 ) *ManifestV1Builder {
b .m .AddedFilesCount = &cnt
return b
}
func (b *ManifestV1Builder ) ExistingFiles (cnt int32 ) *ManifestV1Builder {
b .m .ExistingFilesCount = &cnt
return b
}
func (b *ManifestV1Builder ) DeletedFiles (cnt int32 ) *ManifestV1Builder {
b .m .DeletedFilesCount = &cnt
return b
}
func (b *ManifestV1Builder ) AddedRows (cnt int64 ) *ManifestV1Builder {
b .m .AddedRowsCount = &cnt
return b
}
func (b *ManifestV1Builder ) ExistingRows (cnt int64 ) *ManifestV1Builder {
b .m .ExistingRowsCount = &cnt
return b
}
func (b *ManifestV1Builder ) DeletedRows (cnt int64 ) *ManifestV1Builder {
b .m .DeletedRowsCount = &cnt
return b
}
func (b *ManifestV1Builder ) Partitions (p []FieldSummary ) *ManifestV1Builder {
b .m .PartitionList = &p
return b
}
func (b *ManifestV1Builder ) KeyMetadata (km []byte ) *ManifestV1Builder {
b .m .Key = km
return b
}
func (b *ManifestV1Builder ) Build () ManifestFile {
return b .m
}
type fallbackManifestFileV1 struct {
manifestFileV1
AddedSnapshotID *int64 `avro:"added_snapshot_id"`
}
func (f *fallbackManifestFileV1 ) toManifest () *manifestFileV1 {
f .manifestFileV1 .AddedSnapshotID = *f .AddedSnapshotID
return &f .manifestFileV1
}
type manifestFileV1 struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount *int32 `avro:"added_data_files_count"`
ExistingFilesCount *int32 `avro:"existing_data_files_count"`
DeletedFilesCount *int32 `avro:"deleted_data_files_count"`
AddedRowsCount *int64 `avro:"added_rows_count"`
ExistingRowsCount *int64 `avro:"existing_rows_count"`
DeletedRowsCount *int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
}
func (*manifestFileV1 ) Version () int { return 1 }
func (m *manifestFileV1 ) FilePath () string { return m .Path }
func (m *manifestFileV1 ) Length () int64 { return m .Len }
func (m *manifestFileV1 ) PartitionSpecID () int32 { return m .SpecID }
func (m *manifestFileV1 ) ManifestContent () ManifestContent {
return ManifestContentData
}
func (m *manifestFileV1 ) SnapshotID () int64 {
return m .AddedSnapshotID
}
func (m *manifestFileV1 ) AddedDataFiles () int32 {
if m .AddedFilesCount == nil {
return 0
}
return *m .AddedFilesCount
}
func (m *manifestFileV1 ) ExistingDataFiles () int32 {
if m .ExistingFilesCount == nil {
return 0
}
return *m .ExistingFilesCount
}
func (m *manifestFileV1 ) DeletedDataFiles () int32 {
if m .DeletedFilesCount == nil {
return 0
}
return *m .DeletedFilesCount
}
func (m *manifestFileV1 ) AddedRows () int64 {
if m .AddedRowsCount == nil {
return 0
}
return *m .AddedRowsCount
}
func (m *manifestFileV1 ) ExistingRows () int64 {
if m .ExistingRowsCount == nil {
return 0
}
return *m .ExistingRowsCount
}
func (m *manifestFileV1 ) DeletedRows () int64 {
if m .DeletedRowsCount == nil {
return 0
}
return *m .DeletedRowsCount
}
func (m *manifestFileV1 ) HasAddedFiles () bool {
return m .AddedFilesCount == nil || *m .AddedFilesCount > 0
}
func (m *manifestFileV1 ) HasExistingFiles () bool {
return m .ExistingFilesCount == nil || *m .ExistingFilesCount > 0
}
func (m *manifestFileV1 ) SequenceNum () int64 { return 0 }
func (m *manifestFileV1 ) MinSequenceNum () int64 { return 0 }
func (m *manifestFileV1 ) KeyMetadata () []byte { return m .Key }
func (m *manifestFileV1 ) Partitions () []FieldSummary {
if m .PartitionList == nil {
return nil
}
return *m .PartitionList
}
func (m *manifestFileV1 ) FetchEntries (bucket objstore .Bucket , discardDeleted bool ) ([]ManifestEntry , *Schema , error ) {
return fetchManifestEntries (m , bucket , discardDeleted )
}
type ManifestV2Builder struct {
m *manifestFileV2
}
func NewManifestV2Builder (path string , length int64 , partitionSpecID int32 , content ManifestContent , addedSnapshotID int64 ) *ManifestV2Builder {
return &ManifestV2Builder {
m : &manifestFileV2 {
Path : path ,
Len : length ,
SpecID : partitionSpecID ,
Content : content ,
AddedSnapshotID : addedSnapshotID ,
},
}
}
func (b *ManifestV2Builder ) SequenceNum (num , minSeqNum int64 ) *ManifestV2Builder {
b .m .SeqNumber , b .m .MinSeqNumber = num , minSeqNum
return b
}
func (b *ManifestV2Builder ) AddedFiles (cnt int32 ) *ManifestV2Builder {
b .m .AddedFilesCount = cnt
return b
}
func (b *ManifestV2Builder ) ExistingFiles (cnt int32 ) *ManifestV2Builder {
b .m .ExistingFilesCount = cnt
return b
}
func (b *ManifestV2Builder ) DeletedFiles (cnt int32 ) *ManifestV2Builder {
b .m .DeletedFilesCount = cnt
return b
}
func (b *ManifestV2Builder ) AddedRows (cnt int64 ) *ManifestV2Builder {
b .m .AddedRowsCount = cnt
return b
}
func (b *ManifestV2Builder ) ExistingRows (cnt int64 ) *ManifestV2Builder {
b .m .ExistingRowsCount = cnt
return b
}
func (b *ManifestV2Builder ) DeletedRows (cnt int64 ) *ManifestV2Builder {
b .m .DeletedRowsCount = cnt
return b
}
func (b *ManifestV2Builder ) Partitions (p []FieldSummary ) *ManifestV2Builder {
b .m .PartitionList = &p
return b
}
func (b *ManifestV2Builder ) KeyMetadata (km []byte ) *ManifestV2Builder {
b .m .Key = km
return b
}
func (b *ManifestV2Builder ) Build () ManifestFile {
return b .m
}
type manifestFileV2 struct {
Path string `avro:"manifest_path"`
Len int64 `avro:"manifest_length"`
SpecID int32 `avro:"partition_spec_id"`
Content ManifestContent `avro:"content"`
SeqNumber int64 `avro:"sequence_number"`
MinSeqNumber int64 `avro:"min_sequence_number"`
AddedSnapshotID int64 `avro:"added_snapshot_id"`
AddedFilesCount int32 `avro:"added_files_count"`
ExistingFilesCount int32 `avro:"existing_files_count"`
DeletedFilesCount int32 `avro:"deleted_files_count"`
AddedRowsCount int64 `avro:"added_rows_count"`
ExistingRowsCount int64 `avro:"existing_rows_count"`
DeletedRowsCount int64 `avro:"deleted_rows_count"`
PartitionList *[]FieldSummary `avro:"partitions"`
Key []byte `avro:"key_metadata"`
}
func (*manifestFileV2 ) Version () int { return 2 }
func (m *manifestFileV2 ) FilePath () string { return m .Path }
func (m *manifestFileV2 ) Length () int64 { return m .Len }
func (m *manifestFileV2 ) PartitionSpecID () int32 { return m .SpecID }
func (m *manifestFileV2 ) ManifestContent () ManifestContent { return m .Content }
func (m *manifestFileV2 ) SnapshotID () int64 {
return m .AddedSnapshotID
}
func (m *manifestFileV2 ) AddedDataFiles () int32 {
return m .AddedFilesCount
}
func (m *manifestFileV2 ) ExistingDataFiles () int32 {
return m .ExistingFilesCount
}
func (m *manifestFileV2 ) DeletedDataFiles () int32 {
return m .DeletedFilesCount
}
func (m *manifestFileV2 ) AddedRows () int64 {
return m .AddedRowsCount
}
func (m *manifestFileV2 ) ExistingRows () int64 {
return m .ExistingRowsCount
}
func (m *manifestFileV2 ) DeletedRows () int64 {
return m .DeletedRowsCount
}
func (m *manifestFileV2 ) SequenceNum () int64 { return m .SeqNumber }
func (m *manifestFileV2 ) MinSequenceNum () int64 { return m .MinSeqNumber }
func (m *manifestFileV2 ) KeyMetadata () []byte { return m .Key }
func (m *manifestFileV2 ) Partitions () []FieldSummary {
if m .PartitionList == nil {
return nil
}
return *m .PartitionList
}
func (m *manifestFileV2 ) HasAddedFiles () bool {
return m .AddedFilesCount > 0
}
func (m *manifestFileV2 ) HasExistingFiles () bool {
return m .ExistingFilesCount > 0
}
func (m *manifestFileV2 ) FetchEntries (bucket objstore .Bucket , discardDeleted bool ) ([]ManifestEntry , *Schema , error ) {
return fetchManifestEntries (m , bucket , discardDeleted )
}
func fetchManifestEntries(m ManifestFile , bucket objstore .Bucket , discardDeleted bool ) ([]ManifestEntry , *Schema , error ) {
f , err := bucket .Get (context .TODO (), m .FilePath ())
if err != nil {
return nil , nil , err
}
defer f .Close ()
dec , err := ocf .NewDecoder (f )
if err != nil {
return nil , nil , err
}
schema := &Schema {}
metadata := dec .Metadata ()
if err := json .Unmarshal (metadata ["schema" ], schema ); err != nil {
return nil , nil , fmt .Errorf ("failed to unmarshal schema: %w" , err )
}
isVer1 , isFallback := true , false
if string (metadata ["format-version" ]) == "2" {
isVer1 = false
} else {
sc , err := avro .ParseBytes (dec .Metadata ()["avro.schema" ])
if err != nil {
return nil , nil , err
}
for _ , f := range sc .(*avro .RecordSchema ).Fields () {
if f .Name () == "snapshot_id" {
if f .Type ().Type () == avro .Union {
isFallback = true
}
break
}
}
}
results := make ([]ManifestEntry , 0 )
for dec .HasNext () {
var tmp ManifestEntry
if isVer1 {
if isFallback {
tmp = &fallbackManifestEntryV1 {
manifestEntryV1 : manifestEntryV1 {
Data : &dataFile {},
},
}
} else {
tmp = &manifestEntryV1 {
Data : &dataFile {},
}
}
} else {
tmp = &manifestEntryV2 {
Data : &dataFile {},
}
}
if err := dec .Decode (tmp ); err != nil {
return nil , nil , err
}
if isFallback {
tmp = tmp .(*fallbackManifestEntryV1 ).toEntry ()
}
if !discardDeleted || tmp .Status () != EntryStatusDELETED {
tmp .inheritSeqNum (m )
results = append (results , tmp )
}
}
return results , schema , dec .Error ()
}
type ManifestFile interface {
Version () int
FilePath () string
Length () int64
PartitionSpecID () int32
ManifestContent () ManifestContent
SnapshotID () int64
AddedDataFiles () int32
ExistingDataFiles () int32
DeletedDataFiles () int32
AddedRows () int64
ExistingRows () int64
DeletedRows () int64
SequenceNum () int64
MinSequenceNum () int64
KeyMetadata () []byte
Partitions () []FieldSummary
HasAddedFiles () bool
HasExistingFiles () bool
FetchEntries (bucket objstore .Bucket , discardDeleted bool ) ([]ManifestEntry , *Schema , error )
}
func ReadManifestList (in io .Reader ) ([]ManifestFile , error ) {
dec , err := ocf .NewDecoder (in )
if err != nil {
return nil , err
}
sc , err := avro .ParseBytes (dec .Metadata ()["avro.schema" ])
if err != nil {
return nil , err
}
var fallbackAddedSnapshot bool
for _ , f := range sc .(*avro .RecordSchema ).Fields () {
if f .Name () == "added_snapshot_id" {
if f .Type ().Type () == avro .Union {
fallbackAddedSnapshot = true
}
break
}
}
out := make ([]ManifestFile , 0 )
for dec .HasNext () {
var file ManifestFile
if string (dec .Metadata ()["format-version" ]) == "2" {
file = &manifestFileV2 {}
} else {
if fallbackAddedSnapshot {
file = &fallbackManifestFileV1 {}
} else {
file = &manifestFileV1 {}
}
}
if err := dec .Decode (file ); err != nil {
return nil , err
}
if fallbackAddedSnapshot {
file = file .(*fallbackManifestFileV1 ).toManifest ()
}
out = append (out , file )
}
return out , dec .Error ()
}
type ManifestEntryStatus int8
const (
EntryStatusEXISTING ManifestEntryStatus = 0
EntryStatusADDED ManifestEntryStatus = 1
EntryStatusDELETED ManifestEntryStatus = 2
)
type ManifestEntryContent int8
const (
EntryContentData ManifestEntryContent = 0
EntryContentPosDeletes ManifestEntryContent = 1
EntryContentEqDeletes ManifestEntryContent = 2
)
type FileFormat string
const (
AvroFile FileFormat = "AVRO"
OrcFile FileFormat = "ORC"
ParquetFile FileFormat = "PARQUET"
)
type colMap[K , V any ] struct {
Key K `avro:"key"`
Value V `avro:"value"`
}
func avroColMapFromMap[K comparable , V any ](m map [K ]V ) *[]colMap [K , V ] {
if m == nil {
return nil
}
out := make ([]colMap [K , V ], 0 , len (m ))
for k , v := range m {
out = append (out , colMap [K , V ]{Key : k , Value : v })
}
return &out
}
func avroColMapToMap[K comparable , V any ](c *[]colMap [K , V ]) map [K ]V {
if c == nil {
return nil
}
out := make (map [K ]V )
for _ , data := range *c {
out [data .Key ] = data .Value
}
return out
}
type dataFile struct {
Content ManifestEntryContent `avro:"content"`
Path string `avro:"file_path"`
Format FileFormat `avro:"file_format"`
PartitionData map [string ]any `avro:"partition"`
RecordCount int64 `avro:"record_count"`
FileSize int64 `avro:"file_size_in_bytes"`
BlockSizeInBytes int64 `avro:"block_size_in_bytes"`
ColSizes *[]colMap [int , int64 ] `avro:"column_sizes"`
ValCounts *[]colMap [int , int64 ] `avro:"value_counts"`
NullCounts *[]colMap [int , int64 ] `avro:"null_value_counts"`
NaNCounts *[]colMap [int , int64 ] `avro:"nan_value_counts"`
DistinctCounts *[]colMap [int , int64 ] `avro:"distinct_counts"`
LowerBounds *[]colMap [int , []byte ] `avro:"lower_bounds"`
UpperBounds *[]colMap [int , []byte ] `avro:"upper_bounds"`
Key *[]byte `avro:"key_metadata"`
Splits *[]int64 `avro:"split_offsets"`
EqualityIDs *[]int `avro:"equality_ids"`
SortOrder *int `avro:"sort_order_id"`
colSizeMap map [int ]int64
valCntMap map [int ]int64
nullCntMap map [int ]int64
nanCntMap map [int ]int64
distinctCntMap map [int ]int64
lowerBoundMap map [int ][]byte
upperBoundMap map [int ][]byte
initMaps sync .Once
}
func (d *dataFile ) initializeMapData () {
d .initMaps .Do (func () {
d .colSizeMap = avroColMapToMap (d .ColSizes )
d .valCntMap = avroColMapToMap (d .ValCounts )
d .nullCntMap = avroColMapToMap (d .NullCounts )
d .nanCntMap = avroColMapToMap (d .NaNCounts )
d .distinctCntMap = avroColMapToMap (d .DistinctCounts )
d .lowerBoundMap = avroColMapToMap (d .LowerBounds )
d .upperBoundMap = avroColMapToMap (d .UpperBounds )
})
}
func (d *dataFile ) ContentType () ManifestEntryContent { return d .Content }
func (d *dataFile ) FilePath () string { return d .Path }
func (d *dataFile ) FileFormat () FileFormat { return d .Format }
func (d *dataFile ) Partition () map [string ]any { return d .PartitionData }
func (d *dataFile ) Count () int64 { return d .RecordCount }
func (d *dataFile ) FileSizeBytes () int64 { return d .FileSize }
func (d *dataFile ) ColumnSizes () map [int ]int64 {
d .initializeMapData ()
return d .colSizeMap
}
func (d *dataFile ) ValueCounts () map [int ]int64 {
d .initializeMapData ()
return d .valCntMap
}
func (d *dataFile ) NullValueCounts () map [int ]int64 {
d .initializeMapData ()
return d .nullCntMap
}
func (d *dataFile ) NaNValueCounts () map [int ]int64 {
d .initializeMapData ()
return d .nanCntMap
}
func (d *dataFile ) DistinctValueCounts () map [int ]int64 {
d .initializeMapData ()
return d .distinctCntMap
}
func (d *dataFile ) LowerBoundValues () map [int ][]byte {
d .initializeMapData ()
return d .lowerBoundMap
}
func (d *dataFile ) UpperBoundValues () map [int ][]byte {
d .initializeMapData ()
return d .upperBoundMap
}
func (d *dataFile ) KeyMetadata () []byte {
if d .Key == nil {
return nil
}
return *d .Key
}
func (d *dataFile ) SplitOffsets () []int64 {
if d .Splits == nil {
return nil
}
return *d .Splits
}
func (d *dataFile ) EqualityFieldIDs () []int {
if d .EqualityIDs == nil {
return nil
}
return d .EqualityFieldIDs ()
}
func (d *dataFile ) SortOrderID () *int { return d .SortOrder }
type manifestEntryV1 struct {
EntryStatus ManifestEntryStatus `avro:"status"`
Snapshot int64 `avro:"snapshot_id"`
SeqNum *int64
FileSeqNum *int64
Data DataFile `avro:"data_file"`
}
type fallbackManifestEntryV1 struct {
manifestEntryV1
Snapshot *int64 `avro:"snapshot_id"`
}
func (f *fallbackManifestEntryV1 ) toEntry () *manifestEntryV1 {
f .manifestEntryV1 .Snapshot = *f .Snapshot
return &f .manifestEntryV1
}
func (m *manifestEntryV1 ) inheritSeqNum (manifest ManifestFile ) {}
func (m *manifestEntryV1 ) Status () ManifestEntryStatus { return m .EntryStatus }
func (m *manifestEntryV1 ) SnapshotID () int64 { return m .Snapshot }
func (m *manifestEntryV1 ) SequenceNum () int64 {
if m .SeqNum == nil {
return 0
}
return *m .SeqNum
}
func (m *manifestEntryV1 ) FileSequenceNum () *int64 {
return m .FileSeqNum
}
func (m *manifestEntryV1 ) DataFile () DataFile { return m .Data }
type manifestEntryV2 struct {
EntryStatus ManifestEntryStatus `avro:"status"`
Snapshot *int64 `avro:"snapshot_id"`
SeqNum *int64 `avro:"sequence_number"`
FileSeqNum *int64 `avro:"file_sequence_number"`
Data DataFile `avro:"data_file"`
}
func (m *manifestEntryV2 ) inheritSeqNum (manifest ManifestFile ) {
if m .Snapshot == nil {
snap := manifest .SnapshotID ()
m .Snapshot = &snap
}
manifestSequenceNum := manifest .SequenceNum ()
if m .SeqNum == nil && (manifestSequenceNum == 0 || m .EntryStatus == EntryStatusADDED ) {
m .SeqNum = &manifestSequenceNum
}
if m .FileSeqNum == nil && (manifestSequenceNum == 0 || m .EntryStatus == EntryStatusADDED ) {
m .FileSeqNum = &manifestSequenceNum
}
}
func (m *manifestEntryV2 ) Status () ManifestEntryStatus { return m .EntryStatus }
func (m *manifestEntryV2 ) SnapshotID () int64 {
if m .Snapshot == nil {
return 0
}
return *m .Snapshot
}
func (m *manifestEntryV2 ) SequenceNum () int64 {
if m .SeqNum == nil {
return 0
}
return *m .SeqNum
}
func (m *manifestEntryV2 ) FileSequenceNum () *int64 {
return m .FileSeqNum
}
func (m *manifestEntryV2 ) DataFile () DataFile { return m .Data }
type DataFile interface {
ContentType () ManifestEntryContent
FilePath () string
FileFormat () FileFormat
Partition () map [string ]any
Count () int64
FileSizeBytes () int64
ColumnSizes () map [int ]int64
ValueCounts () map [int ]int64
NullValueCounts () map [int ]int64
NaNValueCounts () map [int ]int64
DistinctValueCounts () map [int ]int64
LowerBoundValues () map [int ][]byte
UpperBoundValues () map [int ][]byte
KeyMetadata () []byte
SplitOffsets () []int64
EqualityFieldIDs () []int
SortOrderID () *int
}
type ManifestEntry interface {
Status () ManifestEntryStatus
SnapshotID () int64
SequenceNum () int64
FileSequenceNum () *int64
DataFile () DataFile
inheritSeqNum(manifest ManifestFile )
}
var PositionalDeleteSchema = NewSchema (0 ,
NestedField {ID : 2147483546 , Type : PrimitiveTypes .String , Name : "file_path" , Required : true },
NestedField {ID : 2147483545 , Type : PrimitiveTypes .Int32 , Name : "pos" , Required : true },
)
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .