package objstore
import (
"bytes"
"context"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/efficientgo/core/logerrcapture"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
)
const (
OpIter = "iter"
OpGet = "get"
OpGetRange = "get_range"
OpExists = "exists"
OpUpload = "upload"
OpDelete = "delete"
OpAttributes = "attributes"
)
type Bucket interface {
io .Closer
BucketReader
Upload (ctx context .Context , name string , r io .Reader ) error
Delete (ctx context .Context , name string ) error
Name () string
}
type InstrumentedBucket interface {
Bucket
WithExpectedErrs (IsOpFailureExpectedFunc ) Bucket
ReaderWithExpectedErrs (IsOpFailureExpectedFunc ) BucketReader
}
type BucketReader interface {
Iter (ctx context .Context , dir string , f func (string ) error , options ...IterOption ) error
Get (ctx context .Context , name string ) (io .ReadCloser , error )
GetRange (ctx context .Context , name string , off, length int64 ) (io .ReadCloser , error )
Exists (ctx context .Context , name string ) (bool , error )
IsObjNotFoundErr (err error ) bool
IsAccessDeniedErr (err error ) bool
Attributes (ctx context .Context , name string ) (ObjectAttributes , error )
}
type InstrumentedBucketReader interface {
BucketReader
ReaderWithExpectedErrs (IsOpFailureExpectedFunc ) BucketReader
}
type IterOption func (params *IterParams )
func WithRecursiveIter (params *IterParams ) {
params .Recursive = true
}
type IterParams struct {
Recursive bool
}
func ApplyIterOptions (options ...IterOption ) IterParams {
out := IterParams {}
for _ , opt := range options {
opt (&out )
}
return out
}
type DownloadOption func (params *downloadParams )
type downloadParams struct {
concurrency int
ignoredPaths []string
}
func WithDownloadIgnoredPaths (ignoredPaths ...string ) DownloadOption {
return func (params *downloadParams ) {
params .ignoredPaths = ignoredPaths
}
}
func WithFetchConcurrency (concurrency int ) DownloadOption {
return func (params *downloadParams ) {
params .concurrency = concurrency
}
}
func applyDownloadOptions(options ...DownloadOption ) downloadParams {
out := downloadParams {
concurrency : 1 ,
}
for _ , opt := range options {
opt (&out )
}
return out
}
type UploadOption func (params *uploadParams )
type uploadParams struct {
concurrency int
}
func WithUploadConcurrency (concurrency int ) UploadOption {
return func (params *uploadParams ) {
params .concurrency = concurrency
}
}
func applyUploadOptions(options ...UploadOption ) uploadParams {
out := uploadParams {
concurrency : 1 ,
}
for _ , opt := range options {
opt (&out )
}
return out
}
type ObjectAttributes struct {
Size int64 `json:"size"`
LastModified time .Time `json:"last_modified"`
}
func TryToGetSize (r io .Reader ) (int64 , error ) {
switch f := r .(type ) {
case *os .File :
fileInfo , err := f .Stat ()
if err != nil {
return 0 , errors .Wrap (err , "os.File.Stat()" )
}
return fileInfo .Size (), nil
case *bytes .Buffer :
return int64 (f .Len ()), nil
case *bytes .Reader :
return int64 (f .Len ()), nil
case *strings .Reader :
return f .Size (), nil
case ObjectSizer :
return f .ObjectSize ()
}
return 0 , errors .Errorf ("unsupported type of io.Reader: %T" , r )
}
type ObjectSizer interface {
ObjectSize () (int64 , error )
}
type nopCloserWithObjectSize struct { io .Reader }
func (nopCloserWithObjectSize ) Close () error { return nil }
func (n nopCloserWithObjectSize ) ObjectSize () (int64 , error ) { return TryToGetSize (n .Reader ) }
func NopCloserWithSize (r io .Reader ) io .ReadCloser {
return nopCloserWithObjectSize {r }
}
func UploadDir (ctx context .Context , logger log .Logger , bkt Bucket , srcdir , dstdir string , options ...UploadOption ) error {
df , err := os .Stat (srcdir )
opts := applyUploadOptions (options ...)
g , ctx := errgroup .WithContext (ctx )
g .SetLimit (opts .concurrency )
if err != nil {
return errors .Wrap (err , "stat dir" )
}
if !df .IsDir () {
return errors .Errorf ("%s is not a directory" , srcdir )
}
err = filepath .WalkDir (srcdir , func (src string , d fs .DirEntry , err error ) error {
g .Go (func () error {
if err != nil {
return err
}
if d .IsDir () {
return nil
}
srcRel , err := filepath .Rel (srcdir , src )
if err != nil {
return errors .Wrap (err , "getting relative path" )
}
dst := path .Join (dstdir , filepath .ToSlash (srcRel ))
return UploadFile (ctx , logger , bkt , src , dst )
})
return nil
})
if err == nil {
err = g .Wait ()
}
return err
}
func UploadFile (ctx context .Context , logger log .Logger , bkt Bucket , src , dst string ) error {
r , err := os .Open (filepath .Clean (src ))
if err != nil {
return errors .Wrapf (err , "open file %s" , src )
}
defer logerrcapture .Do (logger , r .Close , "close file %s" , src )
if err := bkt .Upload (ctx , dst , r ); err != nil {
return errors .Wrapf (err , "upload file %s as %s" , src , dst )
}
level .Debug (logger ).Log ("msg" , "uploaded file" , "from" , src , "dst" , dst , "bucket" , bkt .Name ())
return nil
}
const DirDelim = "/"
func DownloadFile (ctx context .Context , logger log .Logger , bkt BucketReader , src , dst string ) (err error ) {
if fi , err := os .Stat (dst ); err == nil {
if fi .IsDir () {
dst = filepath .Join (dst , filepath .Base (src ))
}
} else if !os .IsNotExist (err ) {
return err
}
rc , err := bkt .Get (ctx , src )
if err != nil {
return errors .Wrapf (err , "get file %s" , src )
}
defer logerrcapture .Do (logger , rc .Close , "close block's file reader" )
f , err := os .Create (dst )
if err != nil {
return errors .Wrapf (err , "create file %s" , dst )
}
defer func () {
if err != nil {
if rerr := os .Remove (dst ); rerr != nil {
level .Warn (logger ).Log ("msg" , "failed to remove partially downloaded file" , "file" , dst , "err" , rerr )
}
}
}()
defer logerrcapture .Do (logger , f .Close , "close block's output file" )
if _, err = io .Copy (f , rc ); err != nil {
return errors .Wrapf (err , "copy object to file %s" , src )
}
return nil
}
func DownloadDir (ctx context .Context , logger log .Logger , bkt BucketReader , originalSrc , src , dst string , options ...DownloadOption ) error {
if err := os .MkdirAll (dst , 0750 ); err != nil {
return errors .Wrap (err , "create dir" )
}
opts := applyDownloadOptions (options ...)
g , ctx := errgroup .WithContext (ctx )
g .SetLimit (opts .concurrency )
var downloadedFiles []string
var m sync .Mutex
err := bkt .Iter (ctx , src , func (name string ) error {
g .Go (func () error {
dst := filepath .Join (dst , filepath .Base (name ))
if strings .HasSuffix (name , DirDelim ) {
if err := DownloadDir (ctx , logger , bkt , originalSrc , name , dst , options ...); err != nil {
return err
}
m .Lock ()
downloadedFiles = append (downloadedFiles , dst )
m .Unlock ()
return nil
}
for _ , ignoredPath := range opts .ignoredPaths {
if ignoredPath == strings .TrimPrefix (name , string (originalSrc )+DirDelim ) {
level .Debug (logger ).Log ("msg" , "not downloading again because a provided path matches this one" , "file" , name )
return nil
}
}
if err := DownloadFile (ctx , logger , bkt , name , dst ); err != nil {
return err
}
m .Lock ()
downloadedFiles = append (downloadedFiles , dst )
m .Unlock ()
return nil
})
return nil
})
if err == nil {
err = g .Wait ()
}
if err != nil {
downloadedFiles = append (downloadedFiles , dst )
for _ , f := range downloadedFiles {
if rerr := os .RemoveAll (f ); rerr != nil {
level .Warn (logger ).Log ("msg" , "failed to remove file on partial dir download error" , "file" , f , "err" , rerr )
}
}
return err
}
return nil
}
type IsOpFailureExpectedFunc func (error ) bool
var _ InstrumentedBucket = &metricBucket {}
func BucketMetrics (reg prometheus .Registerer , name string ) *Metrics {
return &Metrics {
isOpFailureExpected : func (err error ) bool { return false },
ops : promauto .With (reg ).NewCounterVec (prometheus .CounterOpts {
Name : "objstore_bucket_operations_total" ,
Help : "Total number of all attempted operations against a bucket." ,
ConstLabels : prometheus .Labels {"bucket" : name },
}, []string {"operation" }),
opsFailures : promauto .With (reg ).NewCounterVec (prometheus .CounterOpts {
Name : "objstore_bucket_operation_failures_total" ,
Help : "Total number of operations against a bucket that failed, but were not expected to fail in certain way from caller perspective. Those errors have to be investigated." ,
ConstLabels : prometheus .Labels {"bucket" : name },
}, []string {"operation" }),
opsFetchedBytes : promauto .With (reg ).NewCounterVec (prometheus .CounterOpts {
Name : "objstore_bucket_operation_fetched_bytes_total" ,
Help : "Total number of bytes fetched from bucket, per operation." ,
ConstLabels : prometheus .Labels {"bucket" : name },
}, []string {"operation" }),
opsTransferredBytes : promauto .With (reg ).NewHistogramVec (prometheus .HistogramOpts {
Name : "objstore_bucket_operation_transferred_bytes" ,
Help : "Number of bytes transferred from/to bucket per operation." ,
ConstLabels : prometheus .Labels {"bucket" : name },
Buckets : prometheus .ExponentialBuckets (2 <<14 , 2 , 16 ),
NativeHistogramBucketFactor : 2 ,
NativeHistogramMaxBucketNumber : 100 ,
NativeHistogramMinResetDuration : 1 * time .Hour ,
}, []string {"operation" }),
opsDuration : promauto .With (reg ).NewHistogramVec (prometheus .HistogramOpts {
Name : "objstore_bucket_operation_duration_seconds" ,
Help : "Duration of successful operations against the bucket per operation - iter operations include time spent on each callback." ,
ConstLabels : prometheus .Labels {"bucket" : name },
Buckets : []float64 {0.001 , 0.01 , 0.1 , 0.3 , 0.6 , 1 , 3 , 6 , 9 , 20 , 30 , 60 , 90 , 120 },
NativeHistogramBucketFactor : 1.1 ,
NativeHistogramMaxBucketNumber : 100 ,
NativeHistogramMinResetDuration : 1 * time .Hour ,
}, []string {"operation" }),
lastSuccessfulUploadTime : promauto .With (reg ).NewGauge (prometheus .GaugeOpts {
Name : "objstore_bucket_last_successful_upload_time" ,
Help : "Second timestamp of the last successful upload to the bucket." ,
ConstLabels : prometheus .Labels {"bucket" : name },
}),
}
}
func WrapWithMetrics (b Bucket , reg prometheus .Registerer , name string ) *metricBucket {
metrics := BucketMetrics (reg , name )
return wrapWithMetrics (b , metrics )
}
func WrapWith (b Bucket , metrics *Metrics ) *metricBucket {
return wrapWithMetrics (b , metrics )
}
func wrapWithMetrics(b Bucket , metrics *Metrics ) *metricBucket {
bkt := &metricBucket {
bkt : b ,
metrics : metrics ,
}
for _ , op := range []string {
OpIter ,
OpGet ,
OpGetRange ,
OpExists ,
OpUpload ,
OpDelete ,
OpAttributes ,
} {
bkt .metrics .ops .WithLabelValues (op )
bkt .metrics .opsFailures .WithLabelValues (op )
bkt .metrics .opsDuration .WithLabelValues (op )
bkt .metrics .opsFetchedBytes .WithLabelValues (op )
}
for _ , op := range []string {
OpGet ,
OpGetRange ,
OpUpload ,
} {
bkt .metrics .opsTransferredBytes .WithLabelValues (op )
}
return bkt
}
type Metrics struct {
ops *prometheus .CounterVec
opsFailures *prometheus .CounterVec
isOpFailureExpected IsOpFailureExpectedFunc
opsFetchedBytes *prometheus .CounterVec
opsTransferredBytes *prometheus .HistogramVec
opsDuration *prometheus .HistogramVec
lastSuccessfulUploadTime prometheus .Gauge
}
type metricBucket struct {
bkt Bucket
metrics *Metrics
}
func (b *metricBucket ) WithExpectedErrs (fn IsOpFailureExpectedFunc ) Bucket {
return &metricBucket {
bkt : b .bkt ,
metrics : &Metrics {
ops : b .metrics .ops ,
opsFailures : b .metrics .opsFailures ,
opsFetchedBytes : b .metrics .opsFetchedBytes ,
opsTransferredBytes : b .metrics .opsTransferredBytes ,
isOpFailureExpected : fn ,
opsDuration : b .metrics .opsDuration ,
lastSuccessfulUploadTime : b .metrics .lastSuccessfulUploadTime ,
},
}
}
func (b *metricBucket ) ReaderWithExpectedErrs (fn IsOpFailureExpectedFunc ) BucketReader {
return b .WithExpectedErrs (fn )
}
func (b *metricBucket ) Iter (ctx context .Context , dir string , f func (name string ) error , options ...IterOption ) error {
const op = OpIter
b .metrics .ops .WithLabelValues (op ).Inc ()
start := time .Now ()
err := b .bkt .Iter (ctx , dir , f , options ...)
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
}
b .metrics .opsDuration .WithLabelValues (op ).Observe (time .Since (start ).Seconds ())
return err
}
func (b *metricBucket ) Attributes (ctx context .Context , name string ) (ObjectAttributes , error ) {
const op = OpAttributes
b .metrics .ops .WithLabelValues (op ).Inc ()
start := time .Now ()
attrs , err := b .bkt .Attributes (ctx , name )
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return attrs , err
}
b .metrics .opsDuration .WithLabelValues (op ).Observe (time .Since (start ).Seconds ())
return attrs , nil
}
func (b *metricBucket ) Get (ctx context .Context , name string ) (io .ReadCloser , error ) {
const op = OpGet
b .metrics .ops .WithLabelValues (op ).Inc ()
rc , err := b .bkt .Get (ctx , name )
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return nil , err
}
return newTimingReader (
rc ,
true ,
op ,
b .metrics .opsDuration ,
b .metrics .opsFailures ,
b .metrics .isOpFailureExpected ,
b .metrics .opsFetchedBytes ,
b .metrics .opsTransferredBytes ,
), nil
}
func (b *metricBucket ) GetRange (ctx context .Context , name string , off , length int64 ) (io .ReadCloser , error ) {
const op = OpGetRange
b .metrics .ops .WithLabelValues (op ).Inc ()
rc , err := b .bkt .GetRange (ctx , name , off , length )
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return nil , err
}
return newTimingReader (
rc ,
true ,
op ,
b .metrics .opsDuration ,
b .metrics .opsFailures ,
b .metrics .isOpFailureExpected ,
b .metrics .opsFetchedBytes ,
b .metrics .opsTransferredBytes ,
), nil
}
func (b *metricBucket ) Exists (ctx context .Context , name string ) (bool , error ) {
const op = OpExists
b .metrics .ops .WithLabelValues (op ).Inc ()
start := time .Now ()
ok , err := b .bkt .Exists (ctx , name )
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return false , err
}
b .metrics .opsDuration .WithLabelValues (op ).Observe (time .Since (start ).Seconds ())
return ok , nil
}
func (b *metricBucket ) Upload (ctx context .Context , name string , r io .Reader ) error {
const op = OpUpload
b .metrics .ops .WithLabelValues (op ).Inc ()
trc := newTimingReader (
r ,
false ,
op ,
b .metrics .opsDuration ,
b .metrics .opsFailures ,
b .metrics .isOpFailureExpected ,
nil ,
b .metrics .opsTransferredBytes ,
)
defer trc .Close ()
err := b .bkt .Upload (ctx , name , trc )
if err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return err
}
b .metrics .lastSuccessfulUploadTime .SetToCurrentTime ()
return nil
}
func (b *metricBucket ) Delete (ctx context .Context , name string ) error {
const op = OpDelete
b .metrics .ops .WithLabelValues (op ).Inc ()
start := time .Now ()
if err := b .bkt .Delete (ctx , name ); err != nil {
if !b .metrics .isOpFailureExpected (err ) && ctx .Err () != context .Canceled {
b .metrics .opsFailures .WithLabelValues (op ).Inc ()
}
return err
}
b .metrics .opsDuration .WithLabelValues (op ).Observe (time .Since (start ).Seconds ())
return nil
}
func (b *metricBucket ) IsObjNotFoundErr (err error ) bool {
return b .bkt .IsObjNotFoundErr (err )
}
func (b *metricBucket ) IsAccessDeniedErr (err error ) bool {
return b .bkt .IsAccessDeniedErr (err )
}
func (b *metricBucket ) Close () error {
return b .bkt .Close ()
}
func (b *metricBucket ) Name () string {
return b .bkt .Name ()
}
type timingReader struct {
io .Reader
closeReader bool
objSize int64
objSizeErr error
alreadyGotErr bool
start time .Time
op string
readBytes int64
duration *prometheus .HistogramVec
failed *prometheus .CounterVec
isFailureExpected IsOpFailureExpectedFunc
fetchedBytes *prometheus .CounterVec
transferredBytes *prometheus .HistogramVec
}
func newTimingReader(r io .Reader , closeReader bool , op string , dur *prometheus .HistogramVec , failed *prometheus .CounterVec , isFailureExpected IsOpFailureExpectedFunc , fetchedBytes *prometheus .CounterVec , transferredBytes *prometheus .HistogramVec ) io .ReadCloser {
dur .WithLabelValues (op )
failed .WithLabelValues (op )
objSize , objSizeErr := TryToGetSize (r )
trc := timingReader {
Reader : r ,
closeReader : closeReader ,
objSize : objSize ,
objSizeErr : objSizeErr ,
start : time .Now (),
op : op ,
duration : dur ,
failed : failed ,
isFailureExpected : isFailureExpected ,
fetchedBytes : fetchedBytes ,
transferredBytes : transferredBytes ,
readBytes : 0 ,
}
_ , isSeeker := r .(io .Seeker )
_ , isReaderAt := r .(io .ReaderAt )
if isSeeker && isReaderAt {
return &timingReaderSeekerReaderAt {timingReaderSeeker : timingReaderSeeker {timingReader : trc }}
}
if isSeeker {
return &timingReaderSeeker {timingReader : trc }
}
return &trc
}
func (r *timingReader ) ObjectSize () (int64 , error ) {
return r .objSize , r .objSizeErr
}
func (r *timingReader ) Close () error {
var closeErr error
if closer , ok := r .Reader .(io .Closer ); r .closeReader && ok {
closeErr = closer .Close ()
if !r .alreadyGotErr && closeErr != nil {
r .failed .WithLabelValues (r .op ).Inc ()
r .alreadyGotErr = true
}
}
if !r .alreadyGotErr {
r .duration .WithLabelValues (r .op ).Observe (time .Since (r .start ).Seconds ())
r .transferredBytes .WithLabelValues (r .op ).Observe (float64 (r .readBytes ))
r .alreadyGotErr = true
}
return closeErr
}
func (r *timingReader ) Read (b []byte ) (n int , err error ) {
n , err = r .Reader .Read (b )
if r .fetchedBytes != nil {
r .fetchedBytes .WithLabelValues (r .op ).Add (float64 (n ))
}
r .readBytes += int64 (n )
if !r .alreadyGotErr && err != nil && err != io .EOF {
if !r .isFailureExpected (err ) && !errors .Is (err , context .Canceled ) {
r .failed .WithLabelValues (r .op ).Inc ()
}
r .alreadyGotErr = true
}
return n , err
}
type timingReaderSeeker struct {
timingReader
}
func (rsc *timingReaderSeeker ) Seek (offset int64 , whence int ) (int64 , error ) {
return (rsc .Reader ).(io .Seeker ).Seek (offset , whence )
}
type timingReaderSeekerReaderAt struct {
timingReaderSeeker
}
func (rsc *timingReaderSeekerReaderAt ) ReadAt (p []byte , off int64 ) (int , error ) {
return (rsc .Reader ).(io .ReaderAt ).ReadAt (p , off )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .