package s3fs
import (
"bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
log "github.com/sirupsen/logrus"
)
type DirectoryMarkerMode string
const (
DirectoryMarkerModeNone DirectoryMarkerMode = "none"
DirectoryMarkerModeEmpty DirectoryMarkerMode = "empty"
DirectoryMarkerModeNonEmpty DirectoryMarkerMode = "nonempty"
)
type S3Client struct {
client *s3.Client
bucket string
region string
prefix string
directoryMarkerMode DirectoryMarkerMode
}
type S3Config struct {
Region string
Bucket string
AccessKeyID string
SecretAccessKey string
Endpoint string
Prefix string
DisableSSL bool
UsePathStyle bool
DirectoryMarkerMode DirectoryMarkerMode
}
var nonEmptyDirectoryMarkerPayload = []byte{'\n'}
func normalizeDirectoryMarkerMode(mode DirectoryMarkerMode) DirectoryMarkerMode {
if mode == "" {
return DirectoryMarkerModeEmpty
}
return mode
}
func isValidDirectoryMarkerMode(mode DirectoryMarkerMode) bool {
switch mode {
case DirectoryMarkerModeNone, DirectoryMarkerModeEmpty, DirectoryMarkerModeNonEmpty:
return true
default:
return false
}
}
func directoryMarkerPayload(mode DirectoryMarkerMode) ([]byte, bool) {
switch normalizeDirectoryMarkerMode(mode) {
case DirectoryMarkerModeNone:
return nil, false
case DirectoryMarkerModeNonEmpty:
return nonEmptyDirectoryMarkerPayload, true
default:
return []byte{}, true
}
}
func NewS3Client(cfg S3Config) (*S3Client, error) {
ctx := context.Background()
var awsCfg aws.Config
var err error
opts := []func(*config.LoadOptions) error{
config.WithRegion(cfg.Region),
}
if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
opts = append(opts, config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, ""),
))
}
awsCfg, err = config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}
clientOpts := []func(*s3.Options){}
if cfg.Endpoint != "" {
clientOpts = append(clientOpts, func(o *s3.Options) {
o.BaseEndpoint = aws.String(cfg.Endpoint)
o.UsePathStyle = cfg.UsePathStyle
})
}
client := s3.NewFromConfig(awsCfg, clientOpts...)
_, err = client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(cfg.Bucket),
})
if err != nil {
return nil, fmt.Errorf("failed to access bucket %s: %w", cfg.Bucket, err)
}
log.Infof("[s3fs] Connected to S3 bucket: %s (region: %s)", cfg.Bucket, cfg.Region)
prefix := strings.Trim(cfg.Prefix, "/")
return &S3Client{
client: client,
bucket: cfg.Bucket,
region: cfg.Region,
prefix: prefix,
directoryMarkerMode: normalizeDirectoryMarkerMode(cfg.DirectoryMarkerMode),
}, nil
}
func (c *S3Client) shouldEnforceParentDirectoryExistence() bool {
return normalizeDirectoryMarkerMode(c.directoryMarkerMode) != DirectoryMarkerModeNone
}
func (c *S3Client) buildKey(path string) string {
path = strings.TrimPrefix(path, "/")
if c.prefix == "" {
return path
}
if path == "" {
return c.prefix
}
return c.prefix + "/" + path
}
func (c *S3Client) GetObject(ctx context.Context, path string) ([]byte, error) {
key := c.buildKey(path)
result, err := c.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, fmt.Errorf("failed to get object %s: %w", key, err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, fmt.Errorf("failed to read object body: %w", err)
}
return data, nil
}
func (c *S3Client) GetObjectStream(ctx context.Context, path string) (io.ReadCloser, error) {
key := c.buildKey(path)
result, err := c.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, fmt.Errorf("failed to get object %s: %w", key, err)
}
return result.Body, nil
}
func (c *S3Client) GetObjectRange(ctx context.Context, path string, offset, size int64) ([]byte, error) {
key := c.buildKey(path)
var rangeHeader string
if size < 0 {
rangeHeader = fmt.Sprintf("bytes=%d-", offset)
} else {
rangeHeader = fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)
}
result, err := c.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
Range: aws.String(rangeHeader),
})
if err != nil {
return nil, fmt.Errorf("failed to get object range %s: %w", key, err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
return nil, fmt.Errorf("failed to read object body: %w", err)
}
return data, nil
}
func (c *S3Client) PutObject(ctx context.Context, path string, data []byte) error {
key := c.buildKey(path)
_, err := c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
if err != nil {
return fmt.Errorf("failed to put object %s: %w", key, err)
}
return nil
}
func (c *S3Client) DeleteObject(ctx context.Context, path string) error {
key := c.buildKey(path)
_, err := c.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("failed to delete object %s: %w", key, err)
}
return nil
}
func (c *S3Client) HeadObject(ctx context.Context, path string) (*s3.HeadObjectOutput, error) {
key := c.buildKey(path)
result, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
})
if err != nil {
return nil, err
}
return result, nil
}
type S3Object struct {
Key string
Size int64
LastModified time.Time
IsDir bool
}
func (c *S3Client) ListObjects(ctx context.Context, path string) ([]S3Object, error) {
prefix := c.buildKey(path)
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
var objects []S3Object
paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
for _, commonPrefix := range page.CommonPrefixes {
if commonPrefix.Prefix == nil {
continue
}
relPath := strings.TrimPrefix(*commonPrefix.Prefix, prefix)
relPath = strings.TrimSuffix(relPath, "/")
objects = append(objects, S3Object{
Key: relPath,
Size: 0,
LastModified: time.Now(),
IsDir: true,
})
}
for _, obj := range page.Contents {
if obj.Key == nil {
continue
}
if *obj.Key == prefix {
continue
}
relPath := strings.TrimPrefix(*obj.Key, prefix)
if strings.HasSuffix(relPath, "/") {
continue
}
objects = append(objects, S3Object{
Key: relPath,
Size: aws.ToInt64(obj.Size),
LastModified: aws.ToTime(obj.LastModified),
IsDir: false,
})
}
}
return objects, nil
}
func (c *S3Client) CreateDirectory(ctx context.Context, path string) error {
payload, shouldCreate := directoryMarkerPayload(c.directoryMarkerMode)
if !shouldCreate {
return nil
}
key := c.buildKey(path)
if !strings.HasSuffix(key, "/") {
key += "/"
}
_, err := c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(key),
Body: bytes.NewReader(payload),
})
if err != nil {
return fmt.Errorf("failed to create directory %s: %w", key, err)
}
return nil
}
func (c *S3Client) DeleteDirectory(ctx context.Context, path string) error {
prefix := c.buildKey(path)
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
var objectsToDelete []types.ObjectIdentifier
paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return fmt.Errorf("failed to list objects for deletion: %w", err)
}
for _, obj := range page.Contents {
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
Key: obj.Key,
})
}
}
batchSize := 1000
for i := 0; i < len(objectsToDelete); i += batchSize {
end := i + batchSize
if end > len(objectsToDelete) {
end = len(objectsToDelete)
}
_, err := c.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(c.bucket),
Delete: &types.Delete{
Objects: objectsToDelete[i:end],
},
})
if err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
}
}
return nil
}
func (c *S3Client) ObjectExists(ctx context.Context, path string) (bool, error) {
_, err := c.HeadObject(ctx, path)
if err != nil {
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "404") {
return false, nil
}
return false, err
}
return true, nil
}
func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, error) {
dirKey := c.buildKey(path)
if !strings.HasSuffix(dirKey, "/") {
dirKey += "/"
}
_, err := c.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(c.bucket),
Key: aws.String(dirKey),
})
if err == nil {
return true, nil
}
prefix := dirKey
result, err := c.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int32(1),
Delimiter: aws.String("/"),
})
if err != nil {
return false, err
}
return len(result.Contents) > 0 || len(result.CommonPrefixes) > 0, nil
}
func (c *S3Client) CopyObject(ctx context.Context, srcPath, dstPath string) error {
srcKey := c.buildKey(srcPath)
dstKey := c.buildKey(dstPath)
_, err := c.client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(c.bucket),
CopySource: aws.String(c.bucket + "/" + srcKey),
Key: aws.String(dstKey),
})
if err != nil {
return fmt.Errorf("failed to copy object %s -> %s: %w", srcKey, dstKey, err)
}
return nil
}
func (c *S3Client) ListAllObjects(ctx context.Context, path string) ([]S3Object, error) {
prefix := c.buildKey(path)
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
var objects []S3Object
paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list all objects: %w", err)
}
for _, obj := range page.Contents {
if obj.Key == nil {
continue
}
relPath := strings.TrimPrefix(*obj.Key, prefix)
isDir := strings.HasSuffix(relPath, "/")
objects = append(objects, S3Object{
Key: relPath,
Size: aws.ToInt64(obj.Size),
LastModified: aws.ToTime(obj.LastModified),
IsDir: isDir,
})
}
}
return objects, nil
}
func getParentPath(path string) string {
if path == "" || path == "/" {
return ""
}
parent := filepath.Dir(path)
if parent == "." {
return ""
}
return parent
}