// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package objstore

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
	
	
)

const (
	OpIter       = "iter"
	OpGet        = "get"
	OpGetRange   = "get_range"
	OpExists     = "exists"
	OpUpload     = "upload"
	OpDelete     = "delete"
	OpAttributes = "attributes"
)

// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
	io.Closer
	BucketReader

	// Upload the contents of the reader as an object into the bucket.
	// Upload should be idempotent.
	Upload(ctx context.Context, name string, r io.Reader) error

	// Delete removes the object with the given name.
	// If object does not exist in the moment of deletion, Delete should throw error.
	Delete(ctx context.Context, name string) error

	// Name returns the bucket name for the provider.
	Name() string
}

// InstrumentedBucket is a Bucket with optional instrumentation control on reader.
type InstrumentedBucket interface {
	Bucket

	// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	WithExpectedErrs(IsOpFailureExpectedFunc) Bucket

	// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	// TODO(bwplotka): Remove this when moved to Go 1.14 and replace with InstrumentedBucketReader.
	ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
	// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
	// object name including the prefix of the inspected directory.
	// Entries are passed to function in sorted order.
	Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error

	// Get returns a reader for the given object name.
	Get(ctx context.Context, name string) (io.ReadCloser, error)

	// GetRange returns a new range reader for the given object name and range.
	GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)

	// Exists checks if the given object exists in the bucket.
	Exists(ctx context.Context, name string) (bool, error)

	// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
	IsObjNotFoundErr(err error) bool

	// IsAccessDeniedErr returns true if access to object is denied.
	IsAccessDeniedErr(err error) bool

	// Attributes returns information about the specified object.
	Attributes(ctx context.Context, name string) (ObjectAttributes, error)
}

// InstrumentedBucketReader is a BucketReader with optional instrumentation control.
type InstrumentedBucketReader interface {
	BucketReader

	// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
	// objstore_bucket_operation_failures_total metric.
	ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

// IterOption configures the provided params.
type IterOption func(params *IterParams)

// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
// in the bucket.
func ( *IterParams) {
	.Recursive = true
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
	Recursive bool
}

func ( ...IterOption) IterParams {
	 := IterParams{}
	for ,  := range  {
		(&)
	}
	return 
}

// DownloadOption configures the provided params.
type DownloadOption func(params *downloadParams)

// downloadParams holds the DownloadDir() parameters and is used by objstore clients implementations.
type downloadParams struct {
	concurrency  int
	ignoredPaths []string
}

// WithDownloadIgnoredPaths is an option to set the paths to not be downloaded.
func ( ...string) DownloadOption {
	return func( *downloadParams) {
		.ignoredPaths = 
	}
}

// WithFetchConcurrency is an option to set the concurrency of the download operation.
func ( int) DownloadOption {
	return func( *downloadParams) {
		.concurrency = 
	}
}

func applyDownloadOptions( ...DownloadOption) downloadParams {
	 := downloadParams{
		concurrency: 1,
	}
	for ,  := range  {
		(&)
	}
	return 
}

// UploadOption configures the provided params.
type UploadOption func(params *uploadParams)

// uploadParams holds the UploadDir() parameters and is used by objstore clients implementations.
type uploadParams struct {
	concurrency int
}

// WithUploadConcurrency is an option to set the concurrency of the upload operation.
func ( int) UploadOption {
	return func( *uploadParams) {
		.concurrency = 
	}
}

func applyUploadOptions( ...UploadOption) uploadParams {
	 := uploadParams{
		concurrency: 1,
	}
	for ,  := range  {
		(&)
	}
	return 
}

type ObjectAttributes struct {
	// Size is the object size in bytes.
	Size int64 `json:"size"`

	// LastModified is the timestamp the object was last modified.
	LastModified time.Time `json:"last_modified"`
}

// TryToGetSize tries to get upfront size from reader.
// Some implementations may return only size of unread data in the reader, so it's best to call this method before
// doing any reading.
//
// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
func ( io.Reader) (int64, error) {
	switch f := .(type) {
	case *os.File:
		,  := .Stat()
		if  != nil {
			return 0, errors.Wrap(, "os.File.Stat()")
		}
		return .Size(), nil
	case *bytes.Buffer:
		return int64(.Len()), nil
	case *bytes.Reader:
		// Returns length of unread data only.
		return int64(.Len()), nil
	case *strings.Reader:
		return .Size(), nil
	case ObjectSizer:
		return .ObjectSize()
	}
	return 0, errors.Errorf("unsupported type of io.Reader: %T", )
}

// ObjectSizer can return size of object.
type ObjectSizer interface {
	// ObjectSize returns the size of the object in bytes, or error if it is not available.
	ObjectSize() (int64, error)
}

type nopCloserWithObjectSize struct{ io.Reader }

func (nopCloserWithObjectSize) () error                 { return nil }
func ( nopCloserWithObjectSize) () (int64, error) { return TryToGetSize(.Reader) }

// NopCloserWithSize returns a ReadCloser with a no-op Close method wrapping
// the provided Reader r. Returned ReadCloser also implements Size method.
func ( io.Reader) io.ReadCloser {
	return nopCloserWithObjectSize{}
}

// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
// named dstdir. It is a caller responsibility to clean partial upload in case of failure.
func ( context.Context,  log.Logger,  Bucket, ,  string,  ...UploadOption) error {
	,  := os.Stat()
	 := applyUploadOptions(...)

	// The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first
	// time Wait returns, whichever occurs first.
	,  := errgroup.WithContext()
	.SetLimit(.concurrency)

	if  != nil {
		return errors.Wrap(, "stat dir")
	}
	if !.IsDir() {
		return errors.Errorf("%s is not a directory", )
	}
	 = filepath.WalkDir(, func( string,  fs.DirEntry,  error) error {
		.Go(func() error {
			if  != nil {
				return 
			}
			if .IsDir() {
				return nil
			}
			,  := filepath.Rel(, )
			if  != nil {
				return errors.Wrap(, "getting relative path")
			}

			 := path.Join(, filepath.ToSlash())
			return UploadFile(, , , , )
		})

		return nil
	})

	if  == nil {
		 = .Wait()
	}

	return 
}

// UploadFile uploads the file with the given name to the bucket.
// It is a caller responsibility to clean partial upload in case of failure.
func ( context.Context,  log.Logger,  Bucket, ,  string) error {
	,  := os.Open(filepath.Clean())
	if  != nil {
		return errors.Wrapf(, "open file %s", )
	}
	defer logerrcapture.Do(, .Close, "close file %s", )

	if  := .Upload(, , );  != nil {
		return errors.Wrapf(, "upload file %s as %s", , )
	}
	level.Debug().Log("msg", "uploaded file", "from", , "dst", , "bucket", .Name())
	return nil
}

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

// DownloadFile downloads the src file from the bucket to dst. If dst is an existing
// directory, a file with the same name as the source is created in dst.
// If destination file is already existing, download file will overwrite it.
func ( context.Context,  log.Logger,  BucketReader, ,  string) ( error) {
	if ,  := os.Stat();  == nil {
		if .IsDir() {
			 = filepath.Join(, filepath.Base())
		}
	} else if !os.IsNotExist() {
		return 
	}

	,  := .Get(, )
	if  != nil {
		return errors.Wrapf(, "get file %s", )
	}
	defer logerrcapture.Do(, .Close, "close block's file reader")

	,  := os.Create()
	if  != nil {
		return errors.Wrapf(, "create file %s", )
	}
	defer func() {
		if  != nil {
			if  := os.Remove();  != nil {
				level.Warn().Log("msg", "failed to remove partially downloaded file", "file", , "err", )
			}
		}
	}()
	defer logerrcapture.Do(, .Close, "close block's output file")

	if _,  = io.Copy(, );  != nil {
		return errors.Wrapf(, "copy object to file %s", )
	}
	return nil
}

// DownloadDir downloads all object found in the directory into the local directory.
func ( context.Context,  log.Logger,  BucketReader, , ,  string,  ...DownloadOption) error {
	if  := os.MkdirAll(, 0750);  != nil {
		return errors.Wrap(, "create dir")
	}
	 := applyDownloadOptions(...)

	// The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first
	// time Wait returns, whichever occurs first.
	,  := errgroup.WithContext()
	.SetLimit(.concurrency)

	var  []string
	var  sync.Mutex

	 := .Iter(, , func( string) error {
		.Go(func() error {
			 := filepath.Join(, filepath.Base())
			if strings.HasSuffix(, DirDelim) {
				if  := (, , , , , , ...);  != nil {
					return 
				}
				.Lock()
				 = append(, )
				.Unlock()
				return nil
			}
			for ,  := range .ignoredPaths {
				if  == strings.TrimPrefix(, string()+DirDelim) {
					level.Debug().Log("msg", "not downloading again because a provided path matches this one", "file", )
					return nil
				}
			}
			if  := DownloadFile(, , , , );  != nil {
				return 
			}

			.Lock()
			 = append(, )
			.Unlock()
			return nil
		})
		return nil
	})

	if  == nil {
		 = .Wait()
	}

	if  != nil {
		 = append(, ) // Last, clean up the root dst directory.
		// Best-effort cleanup if the download failed.
		for ,  := range  {
			if  := os.RemoveAll();  != nil {
				level.Warn().Log("msg", "failed to remove file on partial dir download error", "file", , "err", )
			}
		}
		return 
	}

	return nil
}

// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment objstore_bucket_operation_failures_total metric.
type IsOpFailureExpectedFunc func(error) bool

var _ InstrumentedBucket = &metricBucket{}

func ( prometheus.Registerer,  string) *Metrics {
	return &Metrics{
		isOpFailureExpected: func( error) bool { return false },
		ops: promauto.With().NewCounterVec(prometheus.CounterOpts{
			Name:        "objstore_bucket_operations_total",
			Help:        "Total number of all attempted operations against a bucket.",
			ConstLabels: prometheus.Labels{"bucket": },
		}, []string{"operation"}),

		opsFailures: promauto.With().NewCounterVec(prometheus.CounterOpts{
			Name:        "objstore_bucket_operation_failures_total",
			Help:        "Total number of operations against a bucket that failed, but were not expected to fail in certain way from caller perspective. Those errors have to be investigated.",
			ConstLabels: prometheus.Labels{"bucket": },
		}, []string{"operation"}),

		opsFetchedBytes: promauto.With().NewCounterVec(prometheus.CounterOpts{
			Name:        "objstore_bucket_operation_fetched_bytes_total",
			Help:        "Total number of bytes fetched from bucket, per operation.",
			ConstLabels: prometheus.Labels{"bucket": },
		}, []string{"operation"}),

		opsTransferredBytes: promauto.With().NewHistogramVec(prometheus.HistogramOpts{
			Name:        "objstore_bucket_operation_transferred_bytes",
			Help:        "Number of bytes transferred from/to bucket per operation.",
			ConstLabels: prometheus.Labels{"bucket": },
			Buckets:     prometheus.ExponentialBuckets(2<<14, 2, 16), // 32KiB, 64KiB, ... 1GiB
			// Use factor=2 for native histograms, which gives similar buckets as the original exponential buckets.
			NativeHistogramBucketFactor:     2,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"operation"}),

		opsDuration: promauto.With().NewHistogramVec(prometheus.HistogramOpts{
			Name:        "objstore_bucket_operation_duration_seconds",
			Help:        "Duration of successful operations against the bucket per operation - iter operations include time spent on each callback.",
			ConstLabels: prometheus.Labels{"bucket": },
			Buckets:     []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
			// Use the recommended defaults for native histograms with 10% growth factor.
			NativeHistogramBucketFactor:     1.1,
			NativeHistogramMaxBucketNumber:  100,
			NativeHistogramMinResetDuration: 1 * time.Hour,
		}, []string{"operation"}),

		lastSuccessfulUploadTime: promauto.With().NewGauge(prometheus.GaugeOpts{
			Name:        "objstore_bucket_last_successful_upload_time",
			Help:        "Second timestamp of the last successful upload to the bucket.",
			ConstLabels: prometheus.Labels{"bucket": },
		}),
	}
}

// WrapWithMetrics takes a bucket and registers metrics with the given registry for
// operations run against the bucket.
func ( Bucket,  prometheus.Registerer,  string) *metricBucket {
	 := BucketMetrics(, )
	return wrapWithMetrics(, )
}

// WrapWith takes a `bucket` and `metrics` that returns instrumented bucket.
// Similar to WrapWithMetrics, but `metrics` can be passed separately as an argument.
func ( Bucket,  *Metrics) *metricBucket {
	return wrapWithMetrics(, )
}

func wrapWithMetrics( Bucket,  *Metrics) *metricBucket {
	 := &metricBucket{
		bkt:     ,
		metrics: ,
	}

	for ,  := range []string{
		OpIter,
		OpGet,
		OpGetRange,
		OpExists,
		OpUpload,
		OpDelete,
		OpAttributes,
	} {
		.metrics.ops.WithLabelValues()
		.metrics.opsFailures.WithLabelValues()
		.metrics.opsDuration.WithLabelValues()
		.metrics.opsFetchedBytes.WithLabelValues()
	}

	// fetched bytes only relevant for get, getrange and upload
	for ,  := range []string{
		OpGet,
		OpGetRange,
		OpUpload,
	} {
		.metrics.opsTransferredBytes.WithLabelValues()
	}
	return 
}

type Metrics struct {
	ops                 *prometheus.CounterVec
	opsFailures         *prometheus.CounterVec
	isOpFailureExpected IsOpFailureExpectedFunc

	opsFetchedBytes          *prometheus.CounterVec
	opsTransferredBytes      *prometheus.HistogramVec
	opsDuration              *prometheus.HistogramVec
	lastSuccessfulUploadTime prometheus.Gauge
}

type metricBucket struct {
	bkt     Bucket
	metrics *Metrics
}

func ( *metricBucket) ( IsOpFailureExpectedFunc) Bucket {
	return &metricBucket{
		bkt: .bkt,
		metrics: &Metrics{
			ops:                      .metrics.ops,
			opsFailures:              .metrics.opsFailures,
			opsFetchedBytes:          .metrics.opsFetchedBytes,
			opsTransferredBytes:      .metrics.opsTransferredBytes,
			isOpFailureExpected:      ,
			opsDuration:              .metrics.opsDuration,
			lastSuccessfulUploadTime: .metrics.lastSuccessfulUploadTime,
		},
	}
}

func ( *metricBucket) ( IsOpFailureExpectedFunc) BucketReader {
	return .WithExpectedErrs()
}

func ( *metricBucket) ( context.Context,  string,  func( string) error,  ...IterOption) error {
	const  = OpIter
	.metrics.ops.WithLabelValues().Inc()

	 := time.Now()
	 := .bkt.Iter(, , , ...)
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
	}
	.metrics.opsDuration.WithLabelValues().Observe(time.Since().Seconds())
	return 
}

func ( *metricBucket) ( context.Context,  string) (ObjectAttributes, error) {
	const  = OpAttributes
	.metrics.ops.WithLabelValues().Inc()

	 := time.Now()
	,  := .bkt.Attributes(, )
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return , 
	}
	.metrics.opsDuration.WithLabelValues().Observe(time.Since().Seconds())
	return , nil
}

func ( *metricBucket) ( context.Context,  string) (io.ReadCloser, error) {
	const  = OpGet
	.metrics.ops.WithLabelValues().Inc()

	,  := .bkt.Get(, )
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return nil, 
	}
	return newTimingReader(
		,
		true,
		,
		.metrics.opsDuration,
		.metrics.opsFailures,
		.metrics.isOpFailureExpected,
		.metrics.opsFetchedBytes,
		.metrics.opsTransferredBytes,
	), nil
}

func ( *metricBucket) ( context.Context,  string, ,  int64) (io.ReadCloser, error) {
	const  = OpGetRange
	.metrics.ops.WithLabelValues().Inc()

	,  := .bkt.GetRange(, , , )
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return nil, 
	}
	return newTimingReader(
		,
		true,
		,
		.metrics.opsDuration,
		.metrics.opsFailures,
		.metrics.isOpFailureExpected,
		.metrics.opsFetchedBytes,
		.metrics.opsTransferredBytes,
	), nil
}

func ( *metricBucket) ( context.Context,  string) (bool, error) {
	const  = OpExists
	.metrics.ops.WithLabelValues().Inc()

	 := time.Now()
	,  := .bkt.Exists(, )
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return false, 
	}
	.metrics.opsDuration.WithLabelValues().Observe(time.Since().Seconds())
	return , nil
}

func ( *metricBucket) ( context.Context,  string,  io.Reader) error {
	const  = OpUpload
	.metrics.ops.WithLabelValues().Inc()

	 := newTimingReader(
		,
		false,
		,
		.metrics.opsDuration,
		.metrics.opsFailures,
		.metrics.isOpFailureExpected,
		nil,
		.metrics.opsTransferredBytes,
	)
	defer .Close()
	 := .bkt.Upload(, , )
	if  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return 
	}
	.metrics.lastSuccessfulUploadTime.SetToCurrentTime()

	return nil
}

func ( *metricBucket) ( context.Context,  string) error {
	const  = OpDelete
	.metrics.ops.WithLabelValues().Inc()

	 := time.Now()
	if  := .bkt.Delete(, );  != nil {
		if !.metrics.isOpFailureExpected() && .Err() != context.Canceled {
			.metrics.opsFailures.WithLabelValues().Inc()
		}
		return 
	}
	.metrics.opsDuration.WithLabelValues().Observe(time.Since().Seconds())

	return nil
}

func ( *metricBucket) ( error) bool {
	return .bkt.IsObjNotFoundErr()
}

func ( *metricBucket) ( error) bool {
	return .bkt.IsAccessDeniedErr()
}

func ( *metricBucket) () error {
	return .bkt.Close()
}

func ( *metricBucket) () string {
	return .bkt.Name()
}

type timingReader struct {
	io.Reader

	// closeReader holds whether the wrapper io.Reader should be closed when
	// Close() is called on the timingReader.
	closeReader bool

	objSize    int64
	objSizeErr error

	alreadyGotErr bool

	start             time.Time
	op                string
	readBytes         int64
	duration          *prometheus.HistogramVec
	failed            *prometheus.CounterVec
	isFailureExpected IsOpFailureExpectedFunc
	fetchedBytes      *prometheus.CounterVec
	transferredBytes  *prometheus.HistogramVec
}

func newTimingReader( io.Reader,  bool,  string,  *prometheus.HistogramVec,  *prometheus.CounterVec,  IsOpFailureExpectedFunc,  *prometheus.CounterVec,  *prometheus.HistogramVec) io.ReadCloser {
	// Initialize the metrics with 0.
	.WithLabelValues()
	.WithLabelValues()
	,  := TryToGetSize()

	 := timingReader{
		Reader:            ,
		closeReader:       ,
		objSize:           ,
		objSizeErr:        ,
		start:             time.Now(),
		op:                ,
		duration:          ,
		failed:            ,
		isFailureExpected: ,
		fetchedBytes:      ,
		transferredBytes:  ,
		readBytes:         0,
	}

	,  := .(io.Seeker)
	,  := .(io.ReaderAt)

	if  &&  {
		// The assumption is that in most cases when io.ReaderAt() is implemented then
		// io.Seeker is implemented too (e.g. os.File).
		return &timingReaderSeekerReaderAt{timingReaderSeeker: timingReaderSeeker{timingReader: }}
	}
	if  {
		return &timingReaderSeeker{timingReader: }
	}

	return &
}

func ( *timingReader) () (int64, error) {
	return .objSize, .objSizeErr
}

func ( *timingReader) () error {
	var  error

	// Call the wrapped reader if it implements Close(), only if we've been asked to close it.
	if ,  := .Reader.(io.Closer); .closeReader &&  {
		 = .Close()

		if !.alreadyGotErr &&  != nil {
			.failed.WithLabelValues(.op).Inc()
			.alreadyGotErr = true
		}
	}

	// Track duration and transferred bytes only if no error occurred.
	if !.alreadyGotErr {
		.duration.WithLabelValues(.op).Observe(time.Since(.start).Seconds())
		.transferredBytes.WithLabelValues(.op).Observe(float64(.readBytes))

		// Trick to tracking metrics multiple times in case Close() gets called again.
		.alreadyGotErr = true
	}

	return 
}

func ( *timingReader) ( []byte) ( int,  error) {
	,  = .Reader.Read()
	if .fetchedBytes != nil {
		.fetchedBytes.WithLabelValues(.op).Add(float64())
	}

	.readBytes += int64()
	// Report metric just once.
	if !.alreadyGotErr &&  != nil &&  != io.EOF {
		if !.isFailureExpected() && !errors.Is(, context.Canceled) {
			.failed.WithLabelValues(.op).Inc()
		}
		.alreadyGotErr = true
	}
	return , 
}

type timingReaderSeeker struct {
	timingReader
}

func ( *timingReaderSeeker) ( int64,  int) (int64, error) {
	return (.Reader).(io.Seeker).Seek(, )
}

type timingReaderSeekerReaderAt struct {
	timingReaderSeeker
}

func ( *timingReaderSeekerReaderAt) ( []byte,  int64) (int, error) {
	return (.Reader).(io.ReaderAt).ReadAt(, )
}