package oci
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/common/auth"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
"github.com/oracle/oci-go-sdk/v65/objectstorage/transfer"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"gopkg.in/yaml.v2"
)
const DirDelim = "/"
type Provider string
const (
DefaultConfigProvider = Provider("default")
InstancePrincipalConfigProvider = Provider("instance-principal")
RawConfigProvider = Provider("raw")
)
var DefaultConfig = Config{
HTTPConfig: HTTPConfig{
IdleConnTimeout: model.Duration(90 * time.Second),
ResponseHeaderTimeout: model.Duration(2 * time.Minute),
TLSHandshakeTimeout: model.Duration(10 * time.Second),
ExpectContinueTimeout: model.Duration(1 * time.Second),
InsecureSkipVerify: false,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 0,
DisableCompression: false,
ClientTimeout: 90 * time.Second,
},
}
type HTTPConfig struct {
IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
MaxIdleConns int `yaml:"max_idle_conns"`
MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
MaxConnsPerHost int `yaml:"max_conns_per_host"`
DisableCompression bool `yaml:"disable_compression"`
ClientTimeout time.Duration `yaml:"client_timeout"`
}
type Config struct {
Provider string `yaml:"provider"`
Bucket string `yaml:"bucket"`
Compartment string `yaml:"compartment_ocid"`
Tenancy string `yaml:"tenancy_ocid"`
User string `yaml:"user_ocid"`
Region string `yaml:"region"`
Fingerprint string `yaml:"fingerprint"`
PrivateKey string `yaml:"privatekey"`
Passphrase string `yaml:"passphrase"`
PartSize int64 `yaml:"part_size"`
MaxRequestRetries int `yaml:"max_request_retries"`
RequestRetryInterval int `yaml:"request_retry_interval"`
HTTPConfig HTTPConfig `yaml:"http_config"`
}
type Bucket struct {
logger log.Logger
name string
namespace string
client *objectstorage.ObjectStorageClient
partSize int64
requestMetadata common.RequestMetadata
}
func (b *Bucket) Name() string {
return b.name
}
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}
objectNames, err := listAllObjects(ctx, *b, dir, options...)
if err != nil {
return errors.Wrapf(err, "cannot list objects in directory '%s'", dir)
}
level.Debug(b.logger).Log("NumberOfObjects", len(objectNames))
for _, objectName := range objectNames {
if objectName == "" || objectName == dir {
continue
}
if err := f(objectName); err != nil {
return err
}
}
return nil
}
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
response, err := getObject(ctx, *b, name, "")
if err != nil {
return nil, err
}
return response.Content, nil
}
func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
level.Debug(b.logger).Log("msg", "getting object", "name", name, "off", offset, "length", length)
byteRange := ""
if offset >= 0 {
if length > 0 {
byteRange = fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)
} else {
byteRange = fmt.Sprintf("bytes=%d-", offset)
}
} else {
if length > 0 {
byteRange = fmt.Sprintf("bytes=-%d", length)
} else {
return nil, errors.New(fmt.Sprintf("invalid range specified: offset=%d length=%d", offset, length))
}
}
level.Debug(b.logger).Log("byteRange", byteRange)
response, err := getObject(ctx, *b, name, byteRange)
if err != nil {
return nil, err
}
return response.Content, nil
}
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
req := transfer.UploadStreamRequest{
UploadRequest: transfer.UploadRequest{
NamespaceName: common.String(b.namespace),
BucketName: common.String(b.name),
ObjectName: &name,
EnableMultipartChecksumVerification: common.Bool(true),
ObjectStorageClient: b.client,
RequestMetadata: b.requestMetadata,
},
StreamReader: r,
}
if b.partSize > 0 {
req.UploadRequest.PartSize = &b.partSize
}
uploadManager := transfer.NewUploadManager()
_, err = uploadManager.UploadStream(ctx, req)
return err
}
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := getObject(ctx, *b, name, "")
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
}
return false, errors.Wrapf(err, "cannot get OCI object '%s'", name)
}
return true, nil
}
func (b *Bucket) Delete(ctx context.Context, name string) (err error) {
request := objectstorage.DeleteObjectRequest{
NamespaceName: &b.namespace,
BucketName: &b.name,
ObjectName: &name,
RequestMetadata: b.requestMetadata,
}
_, err = b.client.DeleteObject(ctx, request)
return err
}
func (b *Bucket) IsObjNotFoundErr(err error) bool {
failure, isServiceError := common.IsServiceError(err)
if isServiceError {
k := failure.GetHTTPStatusCode()
match := k == http.StatusNotFound
level.Debug(b.logger).Log("msg", match)
return failure.GetHTTPStatusCode() == http.StatusNotFound
}
return false
}
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
response, err := getObject(ctx, *b, name, "")
if err != nil {
return 0, err
}
return uint64(*response.ContentLength), nil
}
func (b *Bucket) Close() error {
return nil
}
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
response, err := getObject(ctx, *b, name, "")
if err != nil {
return objstore.ObjectAttributes{}, err
}
return objstore.ObjectAttributes{
Size: *response.ContentLength,
LastModified: response.LastModified.Time,
}, nil
}
func (b *Bucket) createBucket(ctx context.Context, compartmentId string) (err error) {
request := objectstorage.CreateBucketRequest{
NamespaceName: &b.namespace,
RequestMetadata: b.requestMetadata,
}
request.CompartmentId = &compartmentId
request.Name = &b.name
request.Metadata = make(map[string]string)
request.PublicAccessType = objectstorage.CreateBucketDetailsPublicAccessTypeNopublicaccess
_, err = b.client.CreateBucket(ctx, request)
return err
}
func (b *Bucket) deleteBucket(ctx context.Context) (err error) {
request := objectstorage.DeleteBucketRequest{
NamespaceName: &b.namespace,
BucketName: &b.name,
RequestMetadata: b.requestMetadata,
}
_, err = b.client.DeleteBucket(ctx, request)
return err
}
func NewBucket(logger log.Logger, ociConfig []byte) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new oci bucket connection")
var config = DefaultConfig
var configurationProvider common.ConfigurationProvider
var err error
if err := yaml.Unmarshal(ociConfig, &config); err != nil {
return nil, errors.Wrapf(err, "unable to unmarshal the given oci configurations")
}
provider := Provider(strings.ToLower(config.Provider))
level.Info(logger).Log("msg", "creating OCI client", "provider", provider)
switch provider {
case DefaultConfigProvider:
configurationProvider = common.DefaultConfigProvider()
case InstancePrincipalConfigProvider:
configurationProvider, err = auth.InstancePrincipalConfigurationProvider()
if err != nil {
return nil, errors.Wrapf(err, "unable to create OCI instance principal config provider")
}
case RawConfigProvider:
if err := config.validateConfig(); err != nil {
return nil, errors.Wrapf(err, "invalid oci configurations")
}
configurationProvider = common.NewRawConfigurationProvider(config.Tenancy, config.User, config.Region,
config.Fingerprint, config.PrivateKey, &config.Passphrase)
default:
return nil, errors.Wrapf(err, fmt.Sprintf("unsupported OCI provider: %s", provider))
}
client, err := objectstorage.NewObjectStorageClientWithConfigurationProvider(configurationProvider)
if err != nil {
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
}
httpClient := http.Client{
Transport: CustomTransport(config),
Timeout: config.HTTPConfig.ClientTimeout,
}
client.HTTPClient = &httpClient
requestMetadata := getRequestMetadata(config.MaxRequestRetries, config.RequestRetryInterval)
level.Info(logger).Log("msg", "getting namespace, it might take some time")
namespace, err := getNamespace(client, requestMetadata)
if err != nil {
return nil, err
}
level.Debug(logger).Log("msg", fmt.Sprintf("Oracle Cloud Infrastructure tenancy namespace: %s", *namespace))
bkt := Bucket{
logger: logger,
name: config.Bucket,
namespace: *namespace,
client: &client,
partSize: config.PartSize,
requestMetadata: requestMetadata,
}
return &bkt, nil
}
func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
config, err := getConfigFromEnv()
if err != nil {
return nil, nil, err
}
ociConfig, err := yaml.Marshal(config)
if err != nil {
return nil, nil, err
}
bkt, err := NewBucket(log.NewNopLogger(), ociConfig)
if err != nil {
return nil, nil, err
}
ctx := context.Background()
bkt.name = objstore.CreateTemporaryTestBucketName(t)
if err := bkt.createBucket(ctx, config.Compartment); err != nil {
t.Errorf("failed to create temporary Oracle Cloud Infrastructure bucket '%s' for testing", bkt.name)
return nil, nil, err
}
t.Logf("created temporary Oracle Cloud Infrastructure bucket '%s' for testing", bkt.name)
return bkt, func() {
objstore.EmptyBucket(t, ctx, bkt)
if err := bkt.deleteBucket(ctx); err != nil {
t.Logf("failed to delete temporary Oracle Cloud Infrastructure bucket %s for testing: %s", bkt.name, err)
}
t.Logf("deleted temporary Oracle Cloud Infrastructure bucket '%s' for testing", bkt.name)
}, nil
}