package table
import (
"encoding/json"
"errors"
"fmt"
"io"
"github.com/polarsignals/iceberg-go"
"github.com/google/uuid"
)
type Metadata interface {
Version () int
TableUUID () uuid .UUID
Location () string
LastUpdatedMillis () int64
LastColumnID () int
Schemas () []*iceberg .Schema
SchemaID () int
CurrentSchema () *iceberg .Schema
PartitionSpecs () []iceberg .PartitionSpec
PartitionSpec () iceberg .PartitionSpec
DefaultPartitionSpec () int
LastPartitionSpecID () *int
Snapshots () []Snapshot
SnapshotByID (int64 ) *Snapshot
SnapshotID () *int64
SnapshotByName (name string ) *Snapshot
CurrentSnapshot () *Snapshot
SortOrder () SortOrder
SortOrders () []SortOrder
Properties () iceberg .Properties
GetSnapshotLog () []SnapshotLogEntry
GetMetadataLog () []MetadataLogEntry
SortOrderID () int
SnapshotRefs () map [string ]SnapshotRef
}
var (
ErrInvalidMetadataFormatVersion = errors .New ("invalid or missing format-version in table metadata" )
ErrInvalidMetadata = errors .New ("invalid metadata" )
)
func ParseMetadata (r io .Reader ) (Metadata , error ) {
data , err := io .ReadAll (r )
if err != nil {
return nil , err
}
return ParseMetadataBytes (data )
}
func ParseMetadataString (s string ) (Metadata , error ) {
return ParseMetadataBytes ([]byte (s ))
}
func ParseMetadataBytes (b []byte ) (Metadata , error ) {
ver := struct {
FormatVersion int `json:"format-version"`
}{}
if err := json .Unmarshal (b , &ver ); err != nil {
return nil , err
}
var ret Metadata
switch ver .FormatVersion {
case 1 :
ret = &MetadataV1 {}
case 2 :
ret = &MetadataV2 {}
default :
return nil , ErrInvalidMetadataFormatVersion
}
return ret , json .Unmarshal (b , ret )
}
type commonMetadata struct {
FormatVersion int `json:"format-version"`
UUID uuid .UUID `json:"table-uuid"`
Loc string `json:"location"`
LastUpdatedMS int64 `json:"last-updated-ms"`
LastColumnId int `json:"last-column-id"`
SchemaList []*iceberg .Schema `json:"schemas,omitempty"`
CurrentSchemaID int `json:"current-schema-id"`
Specs []iceberg .PartitionSpec `json:"partition-specs"`
DefaultSpecID int `json:"default-spec-id"`
LastPartitionID *int `json:"last-partition-id,omitempty"`
Props iceberg .Properties `json:"properties"`
SnapshotList []Snapshot `json:"snapshots,omitempty"`
CurrentSnapshotID *int64 `json:"current-snapshot-id,omitempty"`
SnapshotLog []SnapshotLogEntry `json:"snapshot-log,omitempty"`
MetadataLog []MetadataLogEntry `json:"metadata-log,omitempty"`
SortOrderList []SortOrder `json:"sort-orders,omitempty"`
DefaultSortOrderID int `json:"default-sort-order-id,omitempty"`
Refs map [string ]SnapshotRef `json:"refs,omitempty"`
}
func (c *commonMetadata ) GetSnapshotLog () []SnapshotLogEntry { return c .SnapshotLog }
func (c *commonMetadata ) GetMetadataLog () []MetadataLogEntry { return c .MetadataLog }
func (c *commonMetadata ) SnapshotRefs () map [string ]SnapshotRef { return c .Refs }
func (c *commonMetadata ) SortOrderID () int { return c .DefaultSortOrderID }
func (c *commonMetadata ) SnapshotID () *int64 { return c .CurrentSnapshotID }
func (c *commonMetadata ) SchemaID () int { return c .CurrentSchemaID }
func (c *commonMetadata ) TableUUID () uuid .UUID { return c .UUID }
func (c *commonMetadata ) Location () string { return c .Loc }
func (c *commonMetadata ) LastUpdatedMillis () int64 { return c .LastUpdatedMS }
func (c *commonMetadata ) LastColumnID () int { return c .LastColumnId }
func (c *commonMetadata ) Schemas () []*iceberg .Schema { return c .SchemaList }
func (c *commonMetadata ) CurrentSchema () *iceberg .Schema {
for _ , s := range c .SchemaList {
if s .ID == c .CurrentSchemaID {
return s
}
}
panic ("should never get here" )
}
func (c *commonMetadata ) PartitionSpecs () []iceberg .PartitionSpec {
return c .Specs
}
func (c *commonMetadata ) DefaultPartitionSpec () int {
return c .DefaultSpecID
}
func (c *commonMetadata ) PartitionSpec () iceberg .PartitionSpec {
for _ , s := range c .Specs {
if s .ID () == c .DefaultSpecID {
return s
}
}
return *iceberg .UnpartitionedSpec
}
func (c *commonMetadata ) LastPartitionSpecID () *int { return c .LastPartitionID }
func (c *commonMetadata ) Snapshots () []Snapshot { return c .SnapshotList }
func (c *commonMetadata ) SnapshotByID (id int64 ) *Snapshot {
for i := range c .SnapshotList {
if c .SnapshotList [i ].SnapshotID == id {
return &c .SnapshotList [i ]
}
}
return nil
}
func (c *commonMetadata ) SnapshotByName (name string ) *Snapshot {
if ref , ok := c .Refs [name ]; ok {
return c .SnapshotByID (ref .SnapshotID )
}
return nil
}
func (c *commonMetadata ) CurrentSnapshot () *Snapshot {
if c .CurrentSnapshotID == nil {
return nil
}
return c .SnapshotByID (*c .CurrentSnapshotID )
}
func (c *commonMetadata ) SortOrders () []SortOrder { return c .SortOrderList }
func (c *commonMetadata ) SortOrder () SortOrder {
for _ , s := range c .SortOrderList {
if s .OrderID == c .DefaultSortOrderID {
return s
}
}
return UnsortedSortOrder
}
func (c *commonMetadata ) Properties () iceberg .Properties {
return c .Props
}
func (c *commonMetadata ) preValidate () {
if c .CurrentSnapshotID != nil && *c .CurrentSnapshotID == -1 {
c .CurrentSnapshotID = nil
}
if c .Refs == nil {
c .Refs = make (map [string ]SnapshotRef )
}
if c .CurrentSnapshotID != nil {
if _ , ok := c .Refs [MainBranch ]; !ok {
c .Refs [MainBranch ] = SnapshotRef {
SnapshotID : *c .CurrentSnapshotID ,
SnapshotRefType : BranchRef ,
}
}
}
if c .MetadataLog == nil {
c .MetadataLog = []MetadataLogEntry {}
}
if c .SnapshotLog == nil {
c .SnapshotLog = []SnapshotLogEntry {}
}
if c .Props == nil {
c .Props = iceberg .Properties {}
}
}
func (c *commonMetadata ) checkSchemas () error {
for _ , s := range c .SchemaList {
if s .ID == c .CurrentSchemaID {
return nil
}
}
return fmt .Errorf ("%w: current-schema-id %d can't be found in any schema" ,
ErrInvalidMetadata , c .CurrentSchemaID )
}
func (c *commonMetadata ) checkPartitionSpecs () error {
for _ , spec := range c .Specs {
if spec .ID () == c .DefaultSpecID {
return nil
}
}
return fmt .Errorf ("%w: default-spec-id %d can't be found" ,
ErrInvalidMetadata , c .DefaultSpecID )
}
func (c *commonMetadata ) checkSortOrders () error {
if c .DefaultSortOrderID == UnsortedSortOrderID {
return nil
}
for _ , o := range c .SortOrderList {
if o .OrderID == c .DefaultSortOrderID {
return nil
}
}
return fmt .Errorf ("%w: default-sort-order-id %d can't be found in %+v" ,
ErrInvalidMetadata , c .DefaultSortOrderID , c .SortOrderList )
}
func (c *commonMetadata ) validate () error {
if err := c .checkSchemas (); err != nil {
return err
}
if err := c .checkPartitionSpecs (); err != nil {
return err
}
if err := c .checkSortOrders (); err != nil {
return err
}
switch {
case c .LastUpdatedMS == 0 :
return fmt .Errorf ("%w: missing last-updated-ms" , ErrInvalidMetadata )
case c .LastColumnId == 0 :
return fmt .Errorf ("%w: missing last-column-id" , ErrInvalidMetadata )
}
return nil
}
func (c *commonMetadata ) Version () int { return c .FormatVersion }
type MetadataV1 struct {
commonMetadata
Schema *iceberg .Schema `json:"schema"`
Partition []iceberg .PartitionField `json:"partition-spec"`
}
func (m *MetadataV1 ) preValidate () {
if len (m .Partition ) == 0 {
m .Partition = []iceberg .PartitionField {}
}
if len (m .SchemaList ) == 0 {
m .SchemaList = []*iceberg .Schema {m .Schema }
}
if len (m .Specs ) == 0 {
m .Specs = []iceberg .PartitionSpec {
iceberg .NewPartitionSpec (m .Partition ...)}
m .DefaultSpecID = m .Specs [0 ].ID ()
}
if m .LastPartitionID == nil {
id := m .Specs [0 ].LastAssignedFieldID ()
for _ , spec := range m .Specs [1 :] {
last := spec .LastAssignedFieldID ()
if last > id {
id = last
}
}
m .LastPartitionID = &id
}
if len (m .SortOrderList ) == 0 {
m .SortOrderList = []SortOrder {UnsortedSortOrder }
}
m .commonMetadata .preValidate ()
}
func (c *MetadataV1 ) CurrentSchema () *iceberg .Schema {
if c .Schema != nil {
return c .Schema
}
return c .commonMetadata .CurrentSchema ()
}
func (m *MetadataV1 ) UnmarshalJSON (b []byte ) error {
type Alias MetadataV1
aux := (*Alias )(m )
if err := json .Unmarshal (b , aux ); err != nil {
return err
}
m .preValidate ()
return m .validate ()
}
func (m *MetadataV1 ) ToV2 () MetadataV2 {
commonOut := m .commonMetadata
commonOut .FormatVersion = 2
if commonOut .UUID .String () == "" {
commonOut .UUID = uuid .New ()
}
return MetadataV2 {commonMetadata : commonOut }
}
type MetadataV2 struct {
LastSequenceNumber int `json:"last-sequence-number"`
commonMetadata
}
func (m *MetadataV2 ) UnmarshalJSON (b []byte ) error {
type Alias MetadataV2
aux := (*Alias )(m )
if err := json .Unmarshal (b , aux ); err != nil {
return err
}
m .preValidate ()
return m .validate ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .