package bos
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"strings"
"testing"
"time"
"github.com/baidubce/bce-sdk-go/bce"
"github.com/baidubce/bce-sdk-go/services/bos"
"github.com/baidubce/bce-sdk-go/services/bos/api"
"github.com/go-kit/log"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"github.com/thanos-io/objstore"
)
const partSize = 1024 * 1024 * 128
type Bucket struct {
logger log.Logger
client *bos.Client
name string
}
type Config struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey string `yaml:"secret_key"`
}
func (conf *Config) validate() error {
if conf.Bucket == "" ||
conf.Endpoint == "" ||
conf.AccessKey == "" ||
conf.SecretKey == "" {
return errors.New("insufficient BOS configuration information")
}
return nil
}
func parseConfig(conf []byte) (Config, error) {
config := Config{}
if err := yaml.Unmarshal(conf, &config); err != nil {
return Config{}, err
}
return config, nil
}
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
config, err := parseConfig(conf)
if err != nil {
return nil, errors.Wrap(err, "parsing BOS configuration")
}
return NewBucketWithConfig(logger, config, component)
}
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validating BOS configuration")
}
client, err := bos.NewClient(config.AccessKey, config.SecretKey, config.Endpoint)
if err != nil {
return nil, errors.Wrap(err, "creating BOS client")
}
client.Config.UserAgent = fmt.Sprintf("thanos-%s", component)
bkt := &Bucket{
logger: logger,
client: client,
name: config.Bucket,
}
return bkt, nil
}
func (b *Bucket) Name() string {
return b.name
}
func (b *Bucket) Delete(_ context.Context, name string) error {
return b.client.DeleteObject(b.name, name)
}
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
body, err := bce.NewBodyFromSizedReader(r, lastSlice)
if err != nil {
return errors.Wrapf(err, "failed to create SizedReader for %s", name)
}
if _, err := b.client.PutObject(b.name, name, body, nil); err != nil {
return errors.Wrapf(err, "failed to upload %s", name)
}
return nil
}
result, err := b.client.BasicInitiateMultipartUpload(b.name, name)
if err != nil {
return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) {
body, err := bce.NewBodyFromSizedReader(r, partSize)
if err != nil {
return "", err
}
etag, err := b.client.UploadPart(b.name, name, uploadId, part, body, nil)
if err != nil {
if err := b.client.AbortMultipartUpload(b.name, name, uploadId); err != nil {
return etag, err
}
return etag, err
}
return etag, nil
}
var parts []api.UploadInfoType
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload part %d for %s", part, name)
}
parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag})
}
if lastSlice != 0 {
etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload the last part for %s", name)
}
parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag})
}
if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil {
return errors.Wrapf(err, "failed to set %s upload completed", name)
}
return nil
}
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}
delimiter := objstore.DirDelim
if objstore.ApplyIterOptions(opt...).Recursive {
delimiter = ""
}
var marker string
for {
if err := ctx.Err(); err != nil {
return err
}
objects, err := b.client.ListObjects(b.name, &api.ListObjectsArgs{
Delimiter: delimiter,
Marker: marker,
MaxKeys: 1000,
Prefix: dir,
})
if err != nil {
return err
}
marker = objects.NextMarker
for _, object := range objects.Contents {
if err := f(object.Key); err != nil {
return err
}
}
for _, object := range objects.CommonPrefixes {
if err := f(object.Prefix); err != nil {
return err
}
}
if !objects.IsTruncated {
break
}
}
return nil
}
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, b.name, name, 0, -1)
}
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, b.name, name, off, length)
}
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
_, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
}
return false, errors.Wrapf(err, "getting object metadata of %s", name)
}
return true, nil
}
func (b *Bucket) Close() error {
return nil
}
func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
objMeta, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
return 0, err
}
return uint64(objMeta.ContentLength), nil
}
func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
objMeta, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
return objstore.ObjectAttributes{}, errors.Wrapf(err, "gettting objectmeta of %s", name)
}
lastModified, err := time.Parse(time.RFC1123, objMeta.LastModified)
if err != nil {
return objstore.ObjectAttributes{}, err
}
return objstore.ObjectAttributes{
Size: objMeta.ContentLength,
LastModified: lastModified,
}, nil
}
func (b *Bucket) IsObjNotFoundErr(err error) bool {
switch bosErr := errors.Cause(err).(type) {
case *bce.BceServiceError:
if bosErr.StatusCode == http.StatusNotFound || bosErr.Code == "NoSuchKey" {
return true
}
}
return false
}
func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off, length int64) (io.ReadCloser, error) {
if len(objectKey) == 0 {
return nil, errors.Errorf("given object name should not empty")
}
ranges := []int64{off}
if length != -1 {
ranges = append(ranges, off+length-1)
}
obj, err := b.client.GetObject(bucketName, objectKey, map[string]string{}, ranges...)
if err != nil {
return nil, err
}
return obj.Body, nil
}
func configFromEnv() Config {
c := Config{
Bucket: os.Getenv("BOS_BUCKET"),
Endpoint: os.Getenv("BOS_ENDPOINT"),
AccessKey: os.Getenv("BOS_ACCESS_KEY"),
SecretKey: os.Getenv("BOS_SECRET_KEY"),
}
return c
}
func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
c := configFromEnv()
if err := validateForTest(c); err != nil {
return nil, nil, err
}
if c.Bucket != "" {
if os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" {
return nil, nil, errors.New("BOS_BUCKET is defined. Normally this tests will create temporary bucket " +
"and delete it after test. Unset BOS_BUCKET env variable to use default logic. If you really want to run " +
"tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true. WARNING: That bucket " +
"needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " +
"to safety (accidentally pointing prod bucket for test) as well as BOS not being fully strong consistent.")
}
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
if err := b.Iter(context.Background(), "", func(f string) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "checking bucket %s", c.Bucket)
}
t.Log("WARNING. Reusing", c.Bucket, "BOS bucket for BOS tests. Manual cleanup afterwards is required")
return b, func() {}, nil
}
src := rand.NewSource(time.Now().UnixNano())
tmpBucketName := strings.Replace(fmt.Sprintf("test_%x", src.Int63()), "_", "-", -1)
if len(tmpBucketName) >= 31 {
tmpBucketName = tmpBucketName[:31]
}
c.Bucket = tmpBucketName
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
if _, err := b.client.PutBucket(b.name); err != nil {
return nil, nil, err
}
t.Log("created temporary BOS bucket for BOS tests with name", tmpBucketName)
return b, func() {
objstore.EmptyBucket(t, context.Background(), b)
if err := b.client.DeleteBucket(b.name); err != nil {
t.Logf("deleting bucket %s failed: %s", tmpBucketName, err)
}
}, nil
}
func validateForTest(conf Config) error {
if conf.Endpoint == "" ||
conf.AccessKey == "" ||
conf.SecretKey == "" {
return errors.New("insufficient BOS configuration information")
}
return nil
}