package table

import (
	
	
	
	
	
	
	
	
	

	
	
	
	
	
)

type snapshotWriter struct {
	commit     func(ctx context.Context, v int) error
	version    int
	snapshotID int64
	bucket     objstore.Bucket
	table      Table
	options    writerOptions

	schemaChanged bool
	schema        *iceberg.Schema
	spec          iceberg.PartitionSpec

	// entries are the new entries that have been added to the table during the write operation.
	entries []iceberg.ManifestEntry

	// modifiedManifests is the set of manifests that need to be written due to modification during write operations.
	modifiedManifests map[string][]iceberg.ManifestEntry
}

func ( func( context.Context,  int) error,  int,  objstore.Bucket,  Table,  ...WriterOption) snapshotWriter {
	 := snapshotWriter{
		commit:     ,
		version:    ,
		snapshotID: rand.Int63(),
		bucket:     ,
		table:      ,
		options: writerOptions{
			logger: log.NewNopLogger(),
		},
		schema:            .Metadata().CurrentSchema(),
		spec:              .Metadata().PartitionSpec(),
		modifiedManifests: map[string][]iceberg.ManifestEntry{},
	}

	for ,  := range  {
		(&.options)
	}

	return 
}

func ( *snapshotWriter) () string {
	return filepath.Join(.table.Location(), metadataDirName)
}

func ( *snapshotWriter) () string {
	return filepath.Join(.table.Location(), dataDirName)
}

// Append a new new Parquet data file to the table. Append does not write into partitions but instead just adds the data file directly to the table.
// If the table is partitioned, the upper and lower bounds of the entries data file as well as the manifest will still be populated from the Parquet file.
func ( *snapshotWriter) ( context.Context,  io.Reader) error {
	 := &bytes.Buffer{}
	 := io.TeeReader(, ) // Read file into memory while uploading

	// TODO(thor): We may want to pass in the filename as an option.
	 := filepath.Join(.dataDir(), fmt.Sprintf("%s%s", generateULID(), parquetFileExt))
	if  := .bucket.Upload(, , );  != nil {
		return 
	}

	// Create manifest entry
	, ,  := iceberg.ManifestEntryV1FromParquet(, int64(.Len()), .schema, bytes.NewReader(.Bytes()))
	if  != nil {
		return 
	}

	// If merge schema is disabled; ensure that the schema isn't changing.
	if .schema != nil && !.schema.Equals() {
		if !.options.mergeSchema {
			return fmt.Errorf("unexpected schema mismatch: %v != %v", .schema, )
		}

		.schemaChanged = true
	}

	.schema = 

	// Update the partition spec if the schema has changed so that the spec ID's match the new field IDs
	if  := .table.Metadata().PartitionSpec(); .schemaChanged && !.IsUnpartitioned() {
		 := make([]iceberg.PartitionField, 0, .NumFields())
		for  := 0;  < .NumFields(); ++ {
			 := .Field()
			,  := .schema.FindFieldByName(.Name)
			if ! {
				return fmt.Errorf("partition field %s not found in schema", .Name)
			}

			 = append(, iceberg.PartitionField{
				SourceID:  .ID,
				Name:      .Name,
				Transform: .Transform,
			})
		}
		.spec = iceberg.NewPartitionSpec(...)
	}

	.entries = append(.entries, )
	return nil
}

// Close will write out all the metadata files to the bucket and commit the snapshot to the table.
// Close is an atomic operation and will either succeed or fail and none of the changes made to the table will be committed.
func ( *snapshotWriter) ( context.Context) error {
	 := .entries
	 := .table.CurrentSnapshot()
	var  []iceberg.ManifestFile
	if  != nil {
		var  error
		,  = .Manifests(.bucket)
		if  != nil {
			return 
		}
	}

	 := false
	// If the schema hasn't changed we can append to the previous manifest file if it's not too large and we aren't in fast append mode.
	// Otherweise we always create a new manifest file.
	if !.schemaChanged && len() != 0 && !.options.fastAppendMode && .options.manifestSizeBytes > 0 {
		// Check the size of the previous manifest file
		 := [len()-1]
		if .Length() < int64(.options.manifestSizeBytes) { // Append to the latest manifest

			// If the manifest was modified use the modified manifest instead of reading the entries
			if ,  := .modifiedManifests[.FilePath()];  {
				 = append(, .entries...)
			} else {
				, ,  := .FetchEntries(.bucket, false)
				if  != nil {
					return 
				}
				 = append(, .entries...)
			}
			 = true
			.modifiedManifests[.FilePath()] = 
		}
	}

	// Create new manifests from the modified manifests
	for ,  := range .modifiedManifests {
		if len() == 0 { // All the data files were removed from the manifest
			// Remove the manifest from the previous manifests
			for ,  := range  {
				if .FilePath() ==  {
					 = append([:], [+1:]...)
					break
				}
			}
			continue
		}

		// Write the modified manifest
		,  := .createNewManifestFile(, )
		if  != nil {
			return 
		}

		// Replace the previous manifest with the modified manifest
		for ,  := range  {
			if .FilePath() ==  {
				[] = 
				break
			}
		}
	}

	// New entries were not appended to an existing manifest; create a new one
	if ! && len() > 0 {
		,  := .createNewManifestFile(, )
		if  != nil {
			return 
		}

		 = append(, )
	}

	// Upload the manifest list
	 := fmt.Sprintf("snap-%v-%s%s", .snapshotID, generateULID(), manifestFileExt)
	 := filepath.Join(.metadataDir(), )
	if ,  := .uploadManifest(, , func( context.Context,  io.Writer) error {
		return iceberg.WriteManifestListV1(, )
	});  != nil {
		return 
	}

	// Create snapshot data
	 := Snapshot{
		SnapshotID:   .snapshotID,
		TimestampMs:  time.Now().UnixMilli(),
		ManifestList: ,
		Summary: &Summary{
			Operation: OpAppend,
		},
	}
	, , ,  := .addSnapshot(, .table, , .schema)
	if  != nil {
		return 
	}

	// Write the new metadata file
	if  := .createNewMetadataFile(, );  != nil {
		return 
	}

	// Commit the snapshot to the table
	if  := .commit(, .version+1);  != nil {
		return 
	}

	// Cleanup stale snapshot files and metadata files (if configured)
	.cleanup(, , , )
	return nil
}

// createNewMetadataFile marshals the metadata and uploads it to the bucket under the name v{version}.metadata.json
func ( *snapshotWriter) ( context.Context,  Metadata) error {
	 := filepath.Join(.metadataDir(), fmt.Sprintf("v%v.metadata%s", .version+1, metadataFileExt))
	,  := json.Marshal()
	if  != nil {
		return 
	}

	if  := .bucket.Upload(, , bytes.NewReader());  != nil {
		return 
	}

	return nil
}

// createNewManifestFile creates a new manifest file from the given manifest entries and uploads it to the bucket.
func ( *snapshotWriter) ( context.Context,  []iceberg.ManifestEntry) (iceberg.ManifestFile, error) {
	 := fmt.Sprintf("%s%s", generateULID(), manifestFileExt)
	 := filepath.Join(.metadataDir(), )

	// Write the manifest file
	,  := .uploadManifest(, , func( context.Context,  io.Writer) error {
		return iceberg.WriteManifestV1(, .schema, )
	})
	if  != nil {
		return nil, 
	}

	 := int64(0)
	for ,  := range .entries {
		 += .DataFile().Count()
	}

	// Create manifest list
	 := iceberg.NewManifestV1Builder(, int64(), 0, .snapshotID).
		AddedRows().
		AddedFiles(int32(len(.entries))).
		ExistingFiles(int32(len() - len(.entries)))

	// Add partition information if the table is partitioned
	if !.spec.IsUnpartitioned() {
		.Partitions(summarizeFields(.spec, .schema, ))
	}

	return .Build(), nil
}

func ( *snapshotWriter) ( context.Context,  []string,  []Snapshot,  Metadata) {
	// Delete stale metadata files
	if .options.metadataDeleteAfterCommit {
		for ,  := range  {
			if  := .bucket.Delete(, );  != nil {
				level.Error(.options.logger).Log("msg", "failed to delete old file", "file", , "err", )
			}
		}
	}

	if len() == 0 {
		return
	}

	// Cleanup snapshots; First find if any manifests are no longer reachable by the expired snapshots and remove those.
	// Then remove the snapshot files.

	// Find all the reachable manifest files from snapshots
	 := map[string]struct{}{}
	for ,  := range .Snapshots() {
		,  := .Manifests(.bucket)
		if  != nil {
			level.Error(.options.logger).Log("msg", "failed to fetch manifests", "snapshot", .SnapshotID, "file", .ManifestList, "err", )
			continue
		}

		for ,  := range  {
			[.FilePath()] = struct{}{}
		}
	}

	 := map[string]struct{}{}
	for ,  := range  {
		,  := .Manifests(.bucket)
		if  != nil {
			level.Error(.options.logger).Log("msg", "failed to fetch manifests", "snapshot", .SnapshotID, "file", .ManifestList, "err", )
			continue
		}

		for ,  := range  {
			[.FilePath()] = struct{}{}
		}
	}

	// Remove stale manifest files
	for  := range  {
		if ,  := [];  {
			continue
		}

		if  := .bucket.Delete(, );  != nil {
			level.Error(.options.logger).Log("msg", "failed to delete stale manifest", "file", , "err", )
		}
	}

	// Delete stale snapshot files
	for ,  := range  {
		if  := .bucket.Delete(, .ManifestList);  != nil {
			level.Error(.options.logger).Log("msg", "failed to delete stale snapshot", "snapshot", .SnapshotID, "file", .ManifestList, "err", )
		}
	}
}

func ( *snapshotWriter) ( context.Context,  Table,  Snapshot,  *iceberg.Schema) (Metadata, []string, []Snapshot, error) {
	 := CloneMetadataV1(.Metadata())
	 := time.Now().UnixMilli()

	if !.Metadata().CurrentSchema().Equals() {
		// need to only update the schema ID if it has changed
		.ID = .CurrentSchema().ID + 1
	} else {
		.ID = .CurrentSchema().ID
	}

	// Expire old snapshots if requested
	 := .Snapshots()
	 := []Snapshot{}
	if .options.expireSnapshotsOlderThan != 0 {
		 = []Snapshot{}
		for ,  := range .Snapshots() {
			if time.Since(time.UnixMilli(.TimestampMs)) <= .options.expireSnapshotsOlderThan {
				 = append(, )
			} else {
				 = append(, )
			}
		}
	}

	 := .GetMetadataLog()
	 := []string{}
	if .options.metadataPreviousVersionsMax > 0 {
		 = append(, MetadataLogEntry{
			MetadataFile: .table.MetadataLocation(),
			TimestampMs:  ,
		})

		// Truncate up to the maximum number of previous versions
		if len() > .options.metadataPreviousVersionsMax {
			 := len() - .options.metadataPreviousVersionsMax
			for  := 0;  < ; ++ {
				 = append(, [].MetadataFile)
			}
			 = [len()-.options.metadataPreviousVersionsMax:]
		}
	}

	return .
		WithSchema().
		WithSchemas(nil). // Only retain a single schema
		WithLastUpdatedMs().
		WithCurrentSnapshotID(.SnapshotID).
		WithSnapshots(append(, )).
		WithPartitionSpecs([]iceberg.PartitionSpec{.spec}). // Only retain a single partition spec
		WithMetadataLog().
		Build(), , , nil
}

// uploadManifest uploads a manifest to the iceberg table. It's a wrapper around the bucket upload which requires a io.Reader and the manifest write functions which requires a io.Writer.
func ( *snapshotWriter) ( context.Context,  string,  func( context.Context,  io.Writer) error) (int, error) {
	,  := io.Pipe()
	 := &accountingWriter{w: }

	,  := errgroup.WithContext()
	.Go(func() error {
		defer .Close()
		return (, )
	})

	.Go(func() error {
		return .bucket.Upload(, , )
	})

	return .n, .Wait()
}

type accountingWriter struct {
	w io.WriteCloser
	n int
}

func ( *accountingWriter) ( []byte) (int, error) {
	,  := .w.Write()
	.n += 
	return , 
}

func ( *accountingWriter) () error {
	return .w.Close()
}

// summarizeFields returns the field summaries for the given partition spec and manifest entries.
func summarizeFields( iceberg.PartitionSpec,  *iceberg.Schema,  []iceberg.ManifestEntry) []iceberg.FieldSummary {
	 := []iceberg.FieldSummary{}

	for  := 0;  < .NumFields(); ++ {
		 := .Field()
		 := .Field(.SourceID).Type

		// Find the entry with the lower/upper bounds for the field
		,  := []byte{}, []byte{}
		for ,  := range  {
			 := .DataFile().UpperBoundValues()[.SourceID]
			 := .DataFile().LowerBoundValues()[.SourceID]

			if len() == 0 || compare(, , ) < 0 {
				 = 
			}

			if len() == 0 || compare(, , ) < 0 {
				 = 
			}
		}

		 = append(, iceberg.FieldSummary{
			LowerBound: &,
			UpperBound: &,
		})
	}

	return 
}

func compare(,  []byte,  iceberg.Type) int {
	switch .Type() {
	case "boolean":
		return bytes.Compare(, )
	case "int":
		 := int32(binary.LittleEndian.Uint32())
		 := int32(binary.LittleEndian.Uint32())
		switch {
		case  < :
			return -1
		case  > :
			return 1
		default:
			return 0
		}
	case "float":
		 := float32(binary.LittleEndian.Uint32())
		 := float32(binary.LittleEndian.Uint32())
		switch {
		case  < :
			return -1
		case  > :
			return 1
		default:
			return 0
		}
	case "long":
		 := int64(binary.LittleEndian.Uint64())
		 := int64(binary.LittleEndian.Uint64())
		switch {
		case  < :
			return -1
		case  > :
			return 1
		default:
			return 0
		}
	case "double":
		 := float64(binary.LittleEndian.Uint64())
		 := float64(binary.LittleEndian.Uint64())
		switch {
		case  < :
			return -1
		case  > :
			return 1
		default:
			return 0
		}
	case "string":
		return bytes.Compare(, )
	case "binary":
		return bytes.Compare(, )
	default:
		panic(fmt.Sprintf("unsupported type %v", ))
	}
}

func ( *snapshotWriter) ( context.Context,  func( iceberg.DataFile) bool) error {
	 := .table.CurrentSnapshot()
	,  := .Manifests(.bucket)
	if  != nil {
		return fmt.Errorf("error reading manifest list: %w", )
	}

	for ,  := range  {
		, ,  := .FetchEntries(.bucket, false)
		if  != nil {
			return fmt.Errorf("fetch entries %s: %w", .FilePath(), )
		}

		 := make([]iceberg.ManifestEntry, 0, len())
		for ,  := range  {
			if !(.DataFile()) {
				 = append(, )
			}
		}

		// Manifest was modified
		if len() != len() {
			.modifiedManifests[.FilePath()] = 
		}
	}
	return nil
}