package filesystem
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"github.com/efficientgo/core/errcapture"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"github.com/thanos-io/objstore"
)
type Config struct {
Directory string `yaml:"directory"`
}
type Bucket struct {
rootDir string
}
func NewBucketFromConfig(conf []byte) (*Bucket, error) {
var c Config
if err := yaml.Unmarshal(conf, &c); err != nil {
return nil, err
}
if c.Directory == "" {
return nil, errors.New("missing directory for filesystem bucket")
}
return NewBucket(c.Directory)
}
func NewBucket(rootDir string) (*Bucket, error) {
absDir, err := filepath.Abs(rootDir)
if err != nil {
return nil, err
}
return &Bucket{rootDir: absDir}, nil
}
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
params := objstore.ApplyIterOptions(options...)
absDir := filepath.Join(b.rootDir, dir)
info, err := os.Stat(absDir)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrapf(err, "stat %s", absDir)
}
if !info.IsDir() {
return nil
}
files, err := os.ReadDir(absDir)
if err != nil {
return err
}
for _, file := range files {
name := filepath.Join(dir, file.Name())
if file.IsDir() {
empty, err := isDirEmpty(filepath.Join(absDir, file.Name()))
if err != nil {
return err
}
if empty {
continue
}
name += objstore.DirDelim
if params.Recursive {
if err := b.Iter(ctx, name, f, options...); err != nil {
return err
}
continue
}
}
name = filepath.ToSlash(name)
if err := f(name); err != nil {
return err
}
}
return nil
}
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.GetRange(ctx, name, 0, -1)
}
type rangeReaderCloser struct {
io.Reader
f *os.File
}
func (r *rangeReaderCloser) Close() error {
return r.f.Close()
}
func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
file := filepath.Join(b.rootDir, name)
stat, err := os.Stat(file)
if err != nil {
return objstore.ObjectAttributes{}, errors.Wrapf(err, "stat %s", file)
}
return objstore.ObjectAttributes{
Size: stat.Size(),
LastModified: stat.ModTime(),
}, nil
}
func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
if name == "" {
return nil, errors.New("object name is empty")
}
file := filepath.Join(b.rootDir, name)
if _, err := os.Stat(file); err != nil {
return nil, errors.Wrapf(err, "stat %s", file)
}
f, err := os.OpenFile(filepath.Clean(file), os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
if off > 0 {
_, err := f.Seek(off, 0)
if err != nil {
return nil, errors.Wrapf(err, "seek %v", off)
}
}
if length == -1 {
return f, nil
}
return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil
}
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
info, err := os.Stat(filepath.Join(b.rootDir, name))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, errors.Wrapf(err, "stat %s", filepath.Join(b.rootDir, name))
}
return !info.IsDir(), nil
}
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (err error) {
file := filepath.Join(b.rootDir, name)
if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil {
return err
}
f, err := os.Create(file)
if err != nil {
return err
}
defer errcapture.Do(&err, f.Close, "close")
if _, err := io.Copy(f, r); err != nil {
return errors.Wrapf(err, "copy to %s", file)
}
return nil
}
func isDirEmpty(name string) (ok bool, err error) {
f, err := os.Open(filepath.Clean(name))
if os.IsNotExist(err) {
return true, nil
}
if err != nil {
return false, err
}
defer errcapture.Do(&err, f.Close, "close dir")
if _, err = f.Readdir(1); err == io.EOF || os.IsNotExist(err) {
return true, nil
}
return false, err
}
func (b *Bucket) Delete(_ context.Context, name string) error {
file := filepath.Join(b.rootDir, name)
for file != b.rootDir {
if err := os.RemoveAll(file); err != nil {
return errors.Wrapf(err, "rm %s", file)
}
file = filepath.Dir(file)
empty, err := isDirEmpty(file)
if err != nil {
return err
}
if !empty {
break
}
}
return nil
}
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return os.IsNotExist(errors.Cause(err))
}
func (b *Bucket) Close() error { return nil }
func (b *Bucket) Name() string {
return fmt.Sprintf("fs: %s", b.rootDir)
}