package catalog

import (
	
	
	
	
	
	
	

	
	
	
	
)

var (
	ErrorTableNotFound = fmt.Errorf("table not found")
)

const (
	hdfsTableMetadataDir = "metadata"
	hdfsVersionHintFile  = "version-hint.text"

	namespaceSeparator = "\x1F"
)

func hdfsMetadataFileName( int) string {
	return fmt.Sprintf("v%d.metadata.json", )
}

type hdfs struct {
	bucket objstore.Bucket
}

func ( string,  objstore.Bucket) Catalog {
	return &hdfs{bucket: NewIcebucket(, )}
}

func ( *hdfs) () CatalogType {
	return Hadoop
}

func ( *hdfs) ( context.Context,  table.Identifier) ([]table.Identifier, error) {
	if len() != 1 {
		return nil, fmt.Errorf("hdfs catalog only supports listing tables in a single namespace")
	}

	 := [0]
	 := []table.Identifier{}
	.bucket.Iter(, , func( string) error {
		 = append(, table.Identifier{filepath.Base()})
		return nil
	})

	return , nil
}

func ( *hdfs) ( context.Context,  table.Identifier) error {
	, ,  := splitIdentForPath()
	if  != nil {
		return 
	}

	// Drop the table's directory.
	return .bucket.Delete(, filepath.Join(, ))
}

func ( *hdfs) ( context.Context, ,  table.Identifier) (table.Table, error) {
	return nil, fmt.Errorf("hdfs catalog does not support renaming tables")
}

func ( *hdfs) ( context.Context,  table.Identifier) ([]table.Identifier, error) {
	 := []table.Identifier{}
	return , .bucket.Iter(, filepath.Join(...), func( string) error {
		 = append(, table.Identifier{})
		return nil
	})
}

func ( *hdfs) ( context.Context,  table.Identifier,  iceberg.Properties) error {
	return fmt.Errorf("hdfs catalog does not support creating namespaces")
}

func ( *hdfs) ( context.Context,  table.Identifier) error {
	return fmt.Errorf("hdfs catalog does not support dropping namespaces")
}

func ( *hdfs) ( context.Context,  table.Identifier) (iceberg.Properties, error) {
	return nil, fmt.Errorf("hdfs catalog does not support loading namespace properties")
}

func ( *hdfs) ( context.Context,  table.Identifier,  []string,  iceberg.Properties) (PropertiesUpdateSummary, error) {
	return PropertiesUpdateSummary{}, fmt.Errorf("hdfs catalog does not support updating namespace properties")
}

func ( *hdfs) ( context.Context,  table.Identifier,  iceberg.Properties) (table.Table, error) {
	, ,  := splitIdentForPath()
	if  != nil {
		return nil, 
	}

	// Load the latest version of the table.
	,  := .loadLatestTable(, , , )
	if  != nil {
		return nil, 
	}

	return , nil
}

func ( *hdfs) ( context.Context,  string,  *iceberg.Schema,  iceberg.Properties,  ...TableOption) (table.Table, error) {
	 := &tableOptions{
		partitionSpec: iceberg.NewPartitionSpec(),
	}
	for ,  := range  {
		()
	}

	// TODO: upload the metadata file to the bucket?
	 := table.NewMetadataV1Builder(
		,
		,
		time.Now().UnixMilli(),
		.NumFields(),
	).
		WithTableUUID(uuid.New()).
		WithCurrentSchemaID(.ID).
		WithPartitionSpecs([]iceberg.PartitionSpec{.partitionSpec}).
		WithDefaultSpecID(.partitionSpec.ID()).
		Build()

	return table.NewHDFSTable(0, table.Identifier{}, , filepath.Join(, hdfsTableMetadataDir, hdfsMetadataFileName(0)), .bucket), nil
}

func ( *hdfs) ( context.Context,  table.Identifier, ,  string) (table.Table, error) {
	,  := getTableVersion(, .bucket, , )
	if  != nil {
		return nil, 
	}

	,  := getTableMetadata(, .bucket, , , )
	if  != nil {
		return nil, 
	}

	return table.NewHDFSTable(, , , filepath.Join(, , hdfsTableMetadataDir, hdfsMetadataFileName()), .bucket), nil
}

// getTableMetadata returns the metadata of the table at the specified version.
func getTableMetadata( context.Context,  objstore.Bucket, ,  string,  int) (table.Metadata, error) {
	,  := .Get(, filepath.Join(, , hdfsTableMetadataDir, hdfsMetadataFileName()))
	if  != nil {
		return nil, fmt.Errorf("failed to get metadata file: %w", )
	}
	defer .Close()

	,  := table.ParseMetadata()
	if  != nil {
		return nil, fmt.Errorf("failed to parse metadata: %w", )
	}

	return , nil
}

// getTableVersion returns the latest version of the table.
// FIXME: this could fallback to a version file scan instead of returning an error
func getTableVersion( context.Context,  objstore.Bucket, ,  string) (int, error) {
	,  := .Get(, filepath.Join(, , hdfsTableMetadataDir, hdfsVersionHintFile))
	if  != nil {
		if .IsObjNotFoundErr() { // Table does not exist.
			return -1, ErrorTableNotFound
		}
		return -1, fmt.Errorf("failed to get version hint file: %w", )
	}
	defer .Close()

	 := bufio.NewScanner()
	.Scan()
	 := .Text()

	,  := strconv.Atoi()
	if  != nil {
		return -1, fmt.Errorf("failed to parse version hint: %w", )
	}

	return , nil
}

func splitIdentForPath( table.Identifier) (string, string, error) {
	if len() < 1 {
		return "", "", fmt.Errorf("%w: missing namespace or invalid identifier %v",
			ErrNoSuchTable, strings.Join(, "."))
	}

	return strings.Join(NamespaceFromIdent(), namespaceSeparator), TableNameFromIdent(), nil
}