package storage

import (
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
	

	
	
	
)

/*
	Iceberg is an Apache Iceberg backed DataSink/DataSource.

	The Iceberg layout is as follows:
	<warehouse>/<database>/<table>/<table_version_number>.metadata.json   // Metadata file
	<warehouse>/<database>/<table>/data/<ulid>.parquet			          // data files
	<warehouse>/<database>/<table>/metadata/snap.<ulid>.avro	          // Manifest list file (snapshot)
	<warehouse>/<database>/<table>/metadata/<ulid>.avro		              // Manifest file
	<warehouse>/<database>/<table>/metadata/version-hint.text		      // Version hint file (if hdfs catalog used)

	On Upload a new snapshot is created and the data file is added to a manifest (or a new manifest is created depending on settings).
	This manifest is then added to the existing manifest list, and a new version of the metadata file is created.

	Once the new metadata file is written the version hint file is updated with the latest version number of the table.
	This version-hint file is used to determine the latest version of the table. (HDFS catalog only)

	On Scan the latest snapshot is loaded and the manifest list is read.
	If the manifests are partitioned; the manifests are filtered out based on the given filter against the partition data.
	The remaining manifests are then read, and the data files are filtered out based on the given filter and the min/max columns of the data file.

	Remaining data files are then read and the filter is applied to each row group in the data file.

*/

const (
	DefaultOrphanedFileAge = 24 * time.Hour
)

var defaultWriterOptions = []table.WriterOption{
	table.WithManifestSizeBytes(8 * 1024 * 1024), // 8MiB manifest size
	table.WithMergeSchema(),
	table.WithExpireSnapshotsOlderThan(6 * time.Hour), // 6 hours of snapshots
	table.WithMetadataDeleteAfterCommit(),
	table.WithMetadataPreviousVersionsMax(3), // Keep 3 previous versions of the metadata
}

// Iceberg is an Apache Iceberg backed DataSink/DataSource.
type Iceberg struct {
	catalog   catalog.Catalog
	bucketURI string // bucketURI is the URI of the bucket i.e gs://<bucket-name>, s3://<bucket-name> etc.
	bucket    objstore.Bucket
	logger    log.Logger

	// configuration options
	partitionSpec       iceberg.PartitionSpec
	maxDataFileAge      time.Duration
	orphanedFileAge     time.Duration
	maintenanceSchedule time.Duration

	// mainteneance goroutine lifecycle controls
	maintenanceCtx  context.Context
	maintenanceDone context.CancelFunc
	maintenanceWg   sync.WaitGroup
}

// IcebergOption is a function that configures an Iceberg DataSink/DataSource.
type IcebergOption func(*Iceberg)

// NewIceberg creates a new Iceberg DataSink/DataSource.
// You must provide the URI of the warehouse and the objstore.Bucket that points to that warehouse.
func ( string,  catalog.Catalog,  objstore.Bucket,  ...IcebergOption) (*Iceberg, error) {
	 := &Iceberg{
		catalog:         ,
		bucketURI:       ,
		bucket:          catalog.NewIcebucket(, ),
		orphanedFileAge: DefaultOrphanedFileAge,
		logger:          log.NewNopLogger(),
	}

	for ,  := range  {
		()
	}

	// Start a maintenance goroutine if a schedule is set
	if .maintenanceSchedule > 0 {
		.maintenanceCtx, .maintenanceDone = context.WithCancel(context.Background())
		.maintenanceWg.Add(1)
		go func( context.Context) {
			defer .maintenanceWg.Done()
			 := time.NewTicker(.maintenanceSchedule)
			defer .Stop()
			for {
				select {
				case <-.C:
					if  := .Maintenance();  != nil {
						level.Error(.logger).Log("msg", "iceberg maintenance failure", "err", )
					}
				case <-.Done():
					return
				}
			}
		}(.maintenanceCtx)
	}

	return , nil
}

func ( *Iceberg) () error {
	if .maintenanceDone != nil {
		.maintenanceDone()
		.maintenanceWg.Wait()
	}
	return nil
}

func ( *Iceberg) ( context.Context) error {
	,  := .catalog.ListNamespaces(, []string{.bucketURI})
	if  != nil {
		return 
	}

	for ,  := range  {
		,  := .catalog.ListTables(, []string{filepath.Join(append([]string{.bucketURI}, ...)...)}) // FIXME: this is clunky
		if  != nil {
			return 
		}

		for ,  := range  {
			 := filepath.Join(.bucketURI, [0], [0]) // FIXME this is clunky; Iceberg should just return the fully qualified path
			,  := .catalog.LoadTable(, []string{}, iceberg.Properties{})
			if  != nil {
				return 
			}

			if .maxDataFileAge > 0 {
				,  := .SnapshotWriter(defaultWriterOptions...)
				if  != nil {
					return 
				}

				// Delete all data files in a table that are older than the max age
				if  := .DeleteDataFile(, func( iceberg.DataFile) bool {
					,  := ulid.Parse(strings.TrimSuffix(filepath.Base(.FilePath()), ".parquet"))
					if  != nil {
						level.Error(.logger).Log("msg", "failed to parse ulid", "err", )
						return false
					}

					return time.Since(ulid.Time(.Time())) > .maxDataFileAge
				});  != nil {
					return 
				}

				if  := .Close();  != nil {
					return 
				}

				// Reload the table we just modified
				,  = .catalog.LoadTable(, []string{}, iceberg.Properties{})
				if  != nil {
					return 
				}
			}

			// Delete orphaned files that are more than the max orphaned file age
			if  := table.DeleteOrphanFiles(, , .orphanedFileAge);  != nil {
				return 
			}
		}
	}

	return nil
}

// WithIcebergPartitionSpec sets the partition spec for the Iceberg table. This is useful for pruning manifests during scans.
// note that at this time the Iceberg storage engine does not write data in a partition fashion. So this is only useful for setting the upper/lower bounds
// of columns in the manifest data.
func ( iceberg.PartitionSpec) IcebergOption {
	return func( *Iceberg) {
		.partitionSpec = 
	}
}

// WithDataFileExpiry will a maxiumum age for data files. Data files older than the max age will be deleted from the table periodically according to the maintenance schedule.
func ( time.Duration) IcebergOption {
	return func( *Iceberg) {
		.maxDataFileAge = 
	}
}

// WithMaintenanceSchedule sets the schedule for the maintenance of the Iceberg table.
// This will spawn a goroutine that will periodically expire data files if WithDataFileExpiry is set.
// And will delete orphanded files from the table.
func ( time.Duration) IcebergOption {
	return func( *Iceberg) {
		.maintenanceSchedule = 
	}
}

func ( log.Logger) IcebergOption {
	return func( *Iceberg) {
		.logger = 
	}
}

func ( *Iceberg) () string {
	return "Iceberg"
}

// Scan will load the latest Iceberg table. It will filter out any manifests that do not contain useful data.
// Then it will read the manifests that may contain useful data. It will then filter out the data file that dot not contain useful data.
// Finally it has a set of data files that may contain useful data. It will then read the data files and apply the filter to each row group in the data file.
func ( *Iceberg) ( context.Context,  string,  *dynparquet.Schema,  logicalplan.Expr,  uint64,  func(context.Context, any) error) error {
	,  := .catalog.LoadTable(, []string{.bucketURI, }, iceberg.Properties{})
	if  != nil {
		if errors.Is(, catalog.ErrorTableNotFound) {
			return nil
		}
		return fmt.Errorf("failed to load table: %w", )
	}

	// Get the latest snapshot
	 := .CurrentSnapshot()
	,  := .Manifests(.bucket)
	if  != nil {
		return fmt.Errorf("error reading manifest list: %w", )
	}

	,  := expr.BooleanExpr()
	if  != nil {
		return 
	}

	for ,  := range  {
		,  := manifestMayContainUsefulData(.Metadata().PartitionSpec(), .Schema(), , )
		if  != nil {
			return fmt.Errorf("failed to filter manifest: %w", )
		}
		if ! {
			continue
		}

		, ,  := .FetchEntries(.bucket, false)
		if  != nil {
			return fmt.Errorf("fetch entries %s: %w", .FilePath(), )
		}

		for ,  := range  {
			,  := manifestEntryMayContainUsefulData(icebergSchemaToParquetSchema(), , )
			if  != nil {
				return fmt.Errorf("failed to filter entry: %w", )
			}
			if ! {
				continue
			}

			// TODO(thor): data files can be processed in parallel
			 := NewBucketReaderAt(.bucket)
			,  := .GetReaderAt(, .DataFile().FilePath())
			if  != nil {
				return 
			}

			,  := parquet.OpenFile(
				,
				.DataFile().FileSizeBytes(),
				parquet.FileReadMode(parquet.ReadModeAsync),
			)
			if  != nil {
				return fmt.Errorf("failed to open file %s: %w", .DataFile().FilePath(), )
			}

			// Get a reader from the file bytes
			,  := dynparquet.NewSerializedBuffer()
			if  != nil {
				return 
			}

			for  := 0;  < .NumRowGroups(); ++ {
				 := .DynamicRowGroup()
				,  := .Eval(, false)
				if  != nil {
					return 
				}
				if  {
					if  := (, );  != nil {
						return 
					}
				}
			}
		}
	}

	return nil
}

// Prefixes lists all the tables found in the warehouse for the given database(prefix).
func ( *Iceberg) ( context.Context,  string) ([]string, error) {
	,  := .catalog.ListTables(, []string{filepath.Join(.bucketURI, )})
	if  != nil {
		return nil, 
	}

	 := make([]string, 0, len())
	for ,  := range  {
		 = append(, filepath.Join(...))
	}
	return , nil
}

// Upload a parquet file into the Iceberg table.
func ( *Iceberg) ( context.Context,  string,  io.Reader) error {
	 := filepath.Join(.bucketURI, filepath.Dir(filepath.Dir()))
	,  := .catalog.LoadTable(, []string{}, iceberg.Properties{})
	if  != nil {
		if !errors.Is(, catalog.ErrorTableNotFound) {
			return 
		}

		// Table doesn't exist, create it
		,  = .catalog.CreateTable(, , iceberg.NewSchema(0), iceberg.Properties{},
			catalog.WithPartitionSpec(.partitionSpec),
		)
		if  != nil {
			return fmt.Errorf("failed to create table: %w", )
		}
	}

	,  := .SnapshotWriter(defaultWriterOptions...)
	if  != nil {
		return 
	}

	if  := .Append(, );  != nil {
		return 
	}

	return .Close()
}

func ( *Iceberg) ( context.Context,  string) error {
	// Noop
	// NOTE: Deletes are used in DataSinks when an upload fails for any reason. Because an Iceberg table is not updated
	// until a full upload is successfull there is no risk of partial data being left in the table, or a corrupted file being read.
	return nil
}

func icebergTypeToParquetNode( iceberg.Type) parquet.Node {
	switch .Type() {
	case "long":
		return parquet.Int(64)
	case "binary":
		return parquet.String()
	case "boolean":
		return parquet.Leaf(parquet.BooleanType)
	case "int":
		return parquet.Int(32)
	case "float":
		return parquet.Leaf(parquet.FloatType)
	case "double":
		return parquet.Leaf(parquet.DoubleType)
	case "string":
		return parquet.String()
	default:
		panic(fmt.Sprintf("unsupported type: %s", .Type()))
	}
}

func icebergSchemaToParquetSchema( *iceberg.Schema) *parquet.Schema {
	 := parquet.Group{}
	for ,  := range .Fields() {
		[.Name] = icebergTypeToParquetNode(.Type)
	}
	return parquet.NewSchema("iceberg", )
}

func manifestMayContainUsefulData( iceberg.PartitionSpec,  *iceberg.Schema,  iceberg.ManifestFile,  expr.TrueNegativeFilter) (bool, error) {
	if .IsUnpartitioned() {
		return true, nil
	}
	// Ignore missing columns as the partition spec only contains the columns that are partitioned
	return .Eval(manifestToParticulate(, , ), true)
}

func manifestEntryMayContainUsefulData( *parquet.Schema,  iceberg.ManifestEntry,  expr.TrueNegativeFilter) (bool, error) {
	return .Eval(dataFileToParticulate(, .DataFile()), false)
}

func manifestToParticulate( iceberg.PartitionSpec,  *iceberg.Schema,  iceberg.ManifestFile) expr.Particulate {
	// Convert the partition spec to a parquet schema
	 := parquet.Group{}
	 := make([]parquet.ColumnChunk, 0, .NumFields())
	for  := 0;  < .NumFields(); ++ {
		 := .Field()
		 := .Partitions()[]
		 := icebergTypeToParquetNode(.Field(.SourceID).Type)
		[.Name] = 
		 = append(, &virtualColumnChunk{
			pType:       .Type(),
			nulls:       0, // TODO future optimization?
			column:      ,
			lowerBounds: *.LowerBound,
			upperBounds: *.UpperBound,
			numValues:   1, // m.ExistingRows() + m.AddedRows() // TODO: future optimization?
		})
	}

	return &manifestParticulate{
		schema:       parquet.NewSchema("iceberg-partition", ),
		columnChunks: ,
	}
}

type manifestParticulate struct {
	columnChunks []parquet.ColumnChunk
	schema       *parquet.Schema
}

func ( *manifestParticulate) () *parquet.Schema { return .schema }

func ( *manifestParticulate) () []parquet.ColumnChunk { return .columnChunks }

func dataFileToParticulate( *parquet.Schema,  iceberg.DataFile) expr.Particulate {
	return &dataFileParticulate{
		schema: ,
		data:   ,
	}
}

type dataFileParticulate struct {
	schema *parquet.Schema
	data   iceberg.DataFile
}

func ( *dataFileParticulate) () *parquet.Schema {
	return .schema
}

func ( *dataFileParticulate) () []parquet.ColumnChunk {
	 := make([]parquet.ColumnChunk, 0, len(.schema.Fields()))
	for  := range .schema.Fields() {
		 = append(, &virtualColumnChunk{
			pType:       .schema.Fields()[].Type(),
			nulls:       .data.NullValueCounts()[],
			column:      ,
			lowerBounds: .data.LowerBoundValues()[],
			upperBounds: .data.UpperBoundValues()[],
			numValues:   .data.Count(),
		})
	}
	return 
}

type virtualColumnChunk struct {
	pType       parquet.Type
	column      int
	numValues   int64
	nulls       int64
	lowerBounds []byte
	upperBounds []byte
}

func ( *virtualColumnChunk) () parquet.Type   { return nil }
func ( *virtualColumnChunk) () int          { return .column }
func ( *virtualColumnChunk) () parquet.Pages { return nil }
func ( *virtualColumnChunk) () (parquet.ColumnIndex, error) {
	return &virtualColumnIndex{
		pType:       .pType,
		nulls:       .nulls,
		lowerBounds: .lowerBounds,
		upperBounds: .upperBounds,
	}, nil
}
func ( *virtualColumnChunk) () (parquet.OffsetIndex, error) { return nil, nil }
func ( *virtualColumnChunk) () parquet.BloomFilter          { return nil }
func ( *virtualColumnChunk) () int64                          { return .numValues }

type virtualColumnIndex struct {
	lowerBounds []byte
	upperBounds []byte
	nulls       int64
	pType       parquet.Type
}

func ( *virtualColumnIndex) () int       { return 1 }
func ( *virtualColumnIndex) (int) int64 { return .nulls }
func ( *virtualColumnIndex) (int) bool   { return false }
func ( *virtualColumnIndex) (int) parquet.Value {
	switch .pType.Kind() {
	case parquet.Int64:
		 := binary.LittleEndian.Uint64(.lowerBounds)
		return parquet.Int64Value(int64())
	case parquet.ByteArray:
		return parquet.ByteArrayValue(.lowerBounds)
	default:
		return parquet.ByteArrayValue(.lowerBounds)
	}
}

func ( *virtualColumnIndex) (int) parquet.Value {
	switch .pType.Kind() {
	case parquet.Int64:
		 := binary.LittleEndian.Uint64(.upperBounds)
		return parquet.Int64Value(int64())
	case parquet.ByteArray:
		return parquet.ByteArrayValue(.upperBounds)
	default:
		return parquet.ByteArrayValue(.upperBounds)
	}
}

func ( *virtualColumnIndex) () bool  { return true }
func ( *virtualColumnIndex) () bool { return false }