package storage
import (
"bytes"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/util/zip"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
type S3Storage struct {
s3Client *s3.S3
bucket string
}
func (s3s *S3Storage) Test() {
}
func (s3s *S3Storage) ReadDir(dirName string) ([]string, error) {
bucketName, prefix, err := s3s.ParseDirPath(dirName, false)
if err != nil {
return nil, err
}
var matchedKeys []string
result, err := s3s.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
})
for _, item := range result.Contents {
if *item.Key == prefix {
continue
}
syPath := strings.Split(*item.Key, prefix)[1]
sPath := strings.Split(syPath, "/")
nextPath := sPath[0]
matchedKeys = append(matchedKeys, nextPath)
}
if err != nil {
return nil, fmt.Errorf("failed to list objects: %w", err)
}
return matchedKeys, nil
}
func (s3s *S3Storage) MkdirAll(dirPath string) error {
bucketName, parsePath, err := s3s.ParseDirPath(dirPath, false)
if err != nil {
return fmt.Errorf("failed to parse directory path: %w", err)
}
exists, err := s3s.CheckDirExists(bucketName, parsePath)
if err != nil {
return fmt.Errorf("failed to check directory existence: %w", err)
}
if !exists {
_, err = s3s.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(parsePath),
Body: nil,
})
if err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
}
return nil
}
func (s3s *S3Storage) CheckDirExists(bucketName, dirPath string) (bool, error) {
resp, err := s3s.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(dirPath),
MaxKeys: aws.Int64(1),
})
if err != nil {
return false, err
}
return len(resp.Contents) > 0, nil
}
func (s3s *S3Storage) ClearDirectory(bucketName, dirPath string) error {
objectsToDelete := &s3.Delete{
Objects: []*s3.ObjectIdentifier{},
Quiet: aws.Bool(true),
}
err := s3s.s3Client.ListObjectsV2Pages(&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(dirPath),
}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, obj := range page.Contents {
objectsToDelete.Objects = append(objectsToDelete.Objects, &s3.ObjectIdentifier{
Key: aws.String(*obj.Key),
})
}
return !lastPage
})
if err != nil {
return fmt.Errorf("failed to list objects for deletion: %w", err)
}
if len(objectsToDelete.Objects) == 0 {
return nil
}
_, err = s3s.s3Client.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(bucketName),
Delete: objectsToDelete,
})
if err != nil {
return fmt.Errorf("failed to delete objects: %w", err)
}
return nil
}
func (s3s *S3Storage) ServeFile(w http.ResponseWriter, r *http.Request, filePath string) {
bucketName, key, err := s3s.ParseDirPath(filePath, true)
output, err := s3s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err != nil {
http.Error(w, fmt.Sprintf("failed to get object from S3: %v", err), http.StatusInternalServerError)
return
}
defer output.Body.Close()
w.Header().Set("Content-Type", *output.ContentType)
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", key))
if _, err := io.Copy(w, output.Body); err != nil {
http.Error(w, fmt.Sprintf("failed to write response: %v", err), http.StatusInternalServerError)
return
}
}
func (s3s *S3Storage) ensureBucketExists(bucketName string) error {
_, err := s3s.s3Client.HeadBucket(&s3.HeadBucketInput{
Bucket: aws.String(bucketName),
})
bucketExists := true
if err != nil {
_, err = s3s.s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
bucketExists = false
logrus.Infof("Created new bucket: %s", bucketName)
}
if err := s3s.ensureBucketLifecycle(bucketName, bucketExists); err != nil {
logrus.Warnf("Failed to configure bucket lifecycle for %s: %v", bucketName, err)
}
return nil
}
func (s3s *S3Storage) ensureBucketLifecycle(bucketName string, bucketExists bool) error {
logrus.Infof("[Lifecycle] ensureBucketLifecycle called for bucket: %s, bucketExists: %v", bucketName, bucketExists)
if bucketExists {
logrus.Infof("[Lifecycle] Bucket exists, checking existing lifecycle configuration for: %s", bucketName)
existingConfig, err := s3s.s3Client.GetBucketLifecycleConfiguration(&s3.GetBucketLifecycleConfigurationInput{
Bucket: aws.String(bucketName),
})
if err != nil {
logrus.Warnf("[Lifecycle] Failed to get existing lifecycle config for %s: %v (will attempt to create new one)", bucketName, err)
} else if existingConfig != nil && len(existingConfig.Rules) > 0 {
logrus.Infof("[Lifecycle] Found existing lifecycle rules for %s: %d rules", bucketName, len(existingConfig.Rules))
hasOurRules := false
for _, rule := range existingConfig.Rules {
if rule.ID != nil {
logrus.Debugf("[Lifecycle] Checking existing rule: %s", *rule.ID)
if *rule.ID == "delete-chunks-1d" ||
*rule.ID == "delete-restore-1d" ||
*rule.ID == "delete-temp-events-1d" ||
*rule.ID == "delete-app-import-1d" ||
*rule.ID == "delete-app-export-7d" ||
*rule.ID == "delete-build-tenant-7d" ||
*rule.ID == "abort-incomplete-multipart-1d" {
hasOurRules = true
logrus.Infof("[Lifecycle] Found our rule: %s, skipping lifecycle configuration", *rule.ID)
break
}
}
}
if hasOurRules {
logrus.Infof("[Lifecycle] Bucket %s already has our lifecycle policy configured, skipping", bucketName)
return nil
}
logrus.Infof("[Lifecycle] No matching rules found, will create new lifecycle policy")
} else {
logrus.Infof("[Lifecycle] No existing lifecycle rules found for %s", bucketName)
}
} else {
logrus.Infof("[Lifecycle] Bucket is new, will create lifecycle policy")
}
logrus.Infof("[Lifecycle] Creating lifecycle configuration input for bucket: %s", bucketName)
testSimpleRule := false
if testSimpleRule {
logrus.Info("[Lifecycle] Testing with single simple rule first...")
input := &s3.PutBucketLifecycleConfigurationInput{
Bucket: aws.String(bucketName),
LifecycleConfiguration: &s3.BucketLifecycleConfiguration{
Rules: []*s3.LifecycleRule{
{
ID: aws.String("test-rule"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("test/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(1),
},
},
},
},
}
resp, err := s3s.s3Client.PutBucketLifecycleConfiguration(input)
if err != nil {
logrus.Errorf("[Lifecycle] Simple rule test failed: %v", err)
return fmt.Errorf("simple rule test failed: %w", err)
}
logrus.Infof("[Lifecycle] Simple rule test succeeded: %+v", resp)
return nil
}
input := &s3.PutBucketLifecycleConfigurationInput{
Bucket: aws.String(bucketName),
LifecycleConfiguration: &s3.BucketLifecycleConfiguration{
Rules: []*s3.LifecycleRule{
{
ID: aws.String("delete-chunks-1d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("package_build/temp/chunks/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(1),
},
},
{
ID: aws.String("delete-restore-1d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("restore/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(1),
},
},
{
ID: aws.String("delete-temp-events-1d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("package_build/temp/events/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(1),
},
},
{
ID: aws.String("delete-app-import-1d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("app/import/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(1),
},
},
{
ID: aws.String("delete-app-export-7d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("app/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(7),
},
},
{
ID: aws.String("delete-build-tenant-7d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
Prefix: aws.String("build/tenant/"),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(7),
},
},
{
ID: aws.String("abort-incomplete-multipart-1d"),
Status: aws.String("Enabled"),
Filter: &s3.LifecycleRuleFilter{
},
AbortIncompleteMultipartUpload: &s3.AbortIncompleteMultipartUpload{
DaysAfterInitiation: aws.Int64(1),
},
Expiration: &s3.LifecycleExpiration{
Days: aws.Int64(365),
},
},
},
},
}
logrus.Infof("[Lifecycle] Prepared %d lifecycle rules, calling PutBucketLifecycleConfiguration for bucket: %s",
len(input.LifecycleConfiguration.Rules), bucketName)
for i, rule := range input.LifecycleConfiguration.Rules {
prefix := ""
if rule.Filter != nil && rule.Filter.Prefix != nil {
prefix = *rule.Filter.Prefix
}
logrus.Debugf("[Lifecycle] Rule %d: ID=%s, Status=%s, Prefix=%s",
i+1, *rule.ID, *rule.Status, prefix)
}
resp, err := s3s.s3Client.PutBucketLifecycleConfiguration(input)
if err != nil {
logrus.Errorf("[Lifecycle] PutBucketLifecycleConfiguration failed for bucket %s: %v", bucketName, err)
return fmt.Errorf("failed to configure bucket lifecycle: %w", err)
}
logrus.Infof("[Lifecycle] PutBucketLifecycleConfiguration response: %+v", resp)
logrus.Infof("[Lifecycle] Successfully configured lifecycle policy for bucket: %s", bucketName)
return nil
}
func (s3s *S3Storage) ParseDirPath(dirPath string, isFile bool) (string, string, error) {
parts := strings.Split(dirPath, "/")
if len(parts) < 2 {
return "", "", fmt.Errorf("dirPath is invalid, must include bucket name and path")
}
var bucketName string
var key string
keyIndex := 1
for i, p := range parts {
if p != "" {
bucketName = p
keyIndex = i
break
}
}
if len(bucketName) == 2 {
bucketName = "gr" + bucketName
}
if err := s3s.ensureBucketExists(bucketName); err != nil {
return "", "", err
}
key = strings.Join(parts[keyIndex+1:], "/")
if !isFile {
key += "/"
}
return bucketName, key, nil
}
func (s3s *S3Storage) Unzip(archive, target string, currentDirectory bool) error {
bucketName, key, err := s3s.ParseDirPath(archive, true)
zipFile, err := os.CreateTemp("", "archive-*.zip")
if err != nil {
return fmt.Errorf("error creating temp file: %v", err)
}
defer os.Remove(zipFile.Name())
defer zipFile.Close()
obj, err := s3s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("error downloading file from S3: %v", err)
}
defer obj.Body.Close()
if _, err := io.Copy(zipFile, obj.Body); err != nil {
return fmt.Errorf("error writing to temp file: %v", err)
}
reader, err := zip.OpenReader(zipFile.Name())
if err != nil {
return fmt.Errorf("error opening archive: %v", err)
}
defer reader.Close()
if err := os.MkdirAll(target, 0755); err != nil {
return err
}
for _, file := range reader.File {
if err := extractFile(file, target, currentDirectory); err != nil {
return err
}
}
return nil
}
func (s3s *S3Storage) SaveFile(fileName string, reader multipart.File) error {
bucketName, key, err := s3s.ParseDirPath(fileName, true)
if err != nil {
logrus.Errorf("Failed to parse file path: %s", err.Error())
return err
}
_, err = s3s.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: reader,
})
if err != nil {
logrus.Errorf("Failed to upload file: %s", err.Error())
return err
}
return nil
}
func (s3s *S3Storage) UploadFileToFile(src, dst string, logger event.Logger) error {
srcFile, err := os.OpenFile(src, os.O_RDONLY, 0644)
if err != nil {
if logger != nil {
logger.Error("打开源文件失败", map[string]string{"step": "share"})
}
logrus.Errorf("open file %s error: %v", src, err)
return err
}
defer srcFile.Close()
srcStat, err := srcFile.Stat()
if err != nil {
if logger != nil {
logger.Error("获取源文件信息失败", map[string]string{"step": "share"})
}
return err
}
bucket, key, err := s3s.ParseDirPath(dst, true)
if err != nil {
if logger != nil {
logger.Error("解析目标路径失败", map[string]string{"step": "share"})
}
return err
}
return s3s.S3CopyWithProgress(srcFile, bucket, key, srcStat.Size(), logger)
}
func (s3s *S3Storage) S3CopyWithProgress(srcFile io.Reader, bucket, key string, allSize int64, logger event.Logger) error {
progressID := uuid.New().String()[0:7]
var written int64
resp, err := s3s.s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
if logger != nil {
logger.Error("初始化分块上传失败", map[string]string{"step": "share"})
}
logrus.Errorf("初始化分块上传失败: %v", err)
return err
}
chunkSize := int64(5 * 1024 * 1024)
buffer := make([]byte, chunkSize)
var partNum int64 = 1
var completedParts []*s3.CompletedPart
for {
n, readErr := io.ReadFull(srcFile, buffer)
if n <= 0 {
break
}
partInput := &s3.UploadPartInput{
Body: bytes.NewReader(buffer[:n]),
Bucket: aws.String(bucket),
Key: aws.String(key),
PartNumber: aws.Int64(partNum),
UploadId: resp.UploadId,
ContentLength: aws.Int64(int64(n)),
}
partResp, uploadErr := s3s.s3Client.UploadPart(partInput)
if uploadErr != nil {
abortInput := &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
UploadId: resp.UploadId,
}
_, _ = s3s.s3Client.AbortMultipartUpload(abortInput)
if logger != nil {
logger.Error("上传分块失败", map[string]string{"step": "share"})
}
logrus.Errorf("上传分块失败: %v", uploadErr)
return uploadErr
}
completedPart := &s3.CompletedPart{
ETag: partResp.ETag,
PartNumber: aws.Int64(partNum),
}
completedParts = append(completedParts, completedPart)
written += int64(n)
partNum++
if logger != nil {
progress := "["
i := int((float64(written) / float64(allSize)) * 50)
if i == 0 {
i = 1
}
for j := 0; j < i; j++ {
progress += "="
}
progress += ">"
for len(progress) < 50 {
progress += " "
}
progress += fmt.Sprintf("] %d MB/%d MB", int(written/1024/1024), int(allSize/1024/1024))
message := fmt.Sprintf(`{"progress":"%s","progressDetail":{"current":%d,"total":%d},"id":"%s"}`, progress, written, allSize, progressID)
logger.Debug(message, map[string]string{"step": "progress"})
}
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
break
}
if readErr != nil {
abortInput := &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
UploadId: resp.UploadId,
}
_, _ = s3s.s3Client.AbortMultipartUpload(abortInput)
logrus.Errorf("读取源文件失败: %v", readErr)
return readErr
}
}
completeInput := &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
UploadId: resp.UploadId,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
},
}
_, err = s3s.s3Client.CompleteMultipartUpload(completeInput)
if err != nil {
if logger != nil {
logger.Error("完成分块上传失败", map[string]string{"step": "share"})
}
logrus.Errorf("完成分块上传失败: %v", err)
return err
}
if written < allSize {
logrus.Warnf("文件上传不完整: 已上传 %d 字节,预期 %d 字节", written, allSize)
return io.ErrShortWrite
}
return nil
}
func extractFile(zipFile *zip.File, target string, currentDirectory bool) error {
run := func() error {
path := filepath.Join(target, zipFile.Name)
if currentDirectory {
p := strings.Split(zipFile.Name, "/")[1:]
path = filepath.Join(target, strings.Join(p, "/"))
}
if zipFile.FileInfo().IsDir() {
return os.MkdirAll(path, zipFile.Mode())
}
fileReader, err := zipFile.Open()
if err != nil {
return fmt.Errorf("error opening file in zip: %v", err)
}
defer fileReader.Close()
targetFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, zipFile.Mode())
if err != nil {
return fmt.Errorf("error opening target file: %v", err)
}
defer targetFile.Close()
if _, err := io.Copy(targetFile, fileReader); err != nil {
return fmt.Errorf("error copying file: %v", err)
}
if zipFile.Comment != "" && strings.Contains(zipFile.Comment, "/") {
guid := strings.Split(zipFile.Comment, "/")
if len(guid) == 2 {
uid, _ := strconv.Atoi(guid[0])
gid, _ := strconv.Atoi(guid[1])
if err := os.Chown(path, uid, gid); err != nil {
return fmt.Errorf("error changing owner: %v", err)
}
}
}
return nil
}
return run()
}
func (s3s *S3Storage) DownloadDirToDir(srcDir, dstDir string) error {
bucketName, prefix, err := s3s.ParseDirPath(srcDir, false)
if err != nil {
return fmt.Errorf("解析源路径失败: %v", err)
}
logrus.Infof("[S3下载] 开始下载, srcDir: %s, dstDir: %s, bucket: %s, prefix: %s", srcDir, dstDir, bucketName, prefix)
if err := os.MkdirAll(dstDir, 0755); err != nil {
return fmt.Errorf("无法创建目录 %s: %v", dstDir, err)
}
listInput := &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
}
result, err := s3s.s3Client.ListObjectsV2(listInput)
if err != nil {
logrus.Errorf("[S3下载] 列出S3对象失败: %v", err)
return fmt.Errorf("无法列出 S3 目录 %s: %v", srcDir, err)
}
logrus.Infof("[S3下载] S3返回对象数: %d", len(result.Contents))
for i, item := range result.Contents {
logrus.Infof("[S3下载] 对象[%d]: Key=%s, Size=%d", i, *item.Key, *item.Size)
}
downloadCount := 0
for _, item := range result.Contents {
if *item.Key == prefix {
logrus.Infof("[S3下载] 跳过目录自身: %s", prefix)
continue
}
key := *item.Key
dstFilePath := fmt.Sprintf("%s/%s", dstDir, filepath.Base(key))
logrus.Infof("[S3下载] 正在下载文件: %s -> %s", key, dstFilePath)
input := &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
}
result, err := s3s.s3Client.GetObject(input)
if err != nil {
return fmt.Errorf("无法从 S3 下载文件 %s: %v", key, err)
}
dstFile, err := os.OpenFile(dstFilePath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
result.Body.Close()
return fmt.Errorf("无法打开文件 %s: %v", dstFilePath, err)
}
written, err := io.Copy(dstFile, result.Body)
if err != nil {
dstFile.Close()
result.Body.Close()
logrus.Errorf("[S3下载] 写入文件失败: %s, error: %v", dstFilePath, err)
return fmt.Errorf("无法写入文件 %s: %v", dstFilePath, err)
}
dstFile.Close()
result.Body.Close()
downloadCount++
logrus.Infof("[S3下载] 文件下载成功: %s, 大小: %d bytes", dstFilePath, written)
}
logrus.Infof("[S3下载] 下载完成, 总共下载了 %d 个文件到 %s", downloadCount, dstDir)
return nil
}
func (s3s *S3Storage) DownloadFileToDir(srcFile, dstDir string) error {
bucketName, key, err := s3s.ParseDirPath(srcFile, true)
if err != nil {
return fmt.Errorf("解析源路径失败: %v", err)
}
if err := os.MkdirAll(dstDir, 0755); err != nil {
return fmt.Errorf("无法创建目录 %s: %v", dstDir, err)
}
dstFilePath := fmt.Sprintf("%s/%s", dstDir, filepath.Base(key))
input := &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
}
result, err := s3s.s3Client.GetObject(input)
if err != nil {
return fmt.Errorf("无法从 S3 下载文件 %s: %v", key, err)
}
defer result.Body.Close()
dstFile, err := os.OpenFile(dstFilePath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("无法打开文件 %s: %v", dstFilePath, err)
}
defer dstFile.Close()
if _, err := io.Copy(dstFile, result.Body); err != nil {
return fmt.Errorf("无法写入文件 %s: %v", dstFilePath, err)
}
return nil
}
func (s3s *S3Storage) GetChunkDir(sessionID string) string {
return fmt.Sprintf("package_build/temp/chunks/%s", sessionID)
}
func (s3s *S3Storage) SaveChunk(sessionID string, chunkIndex int, reader multipart.File) (string, error) {
bucketName := "grdata"
if err := s3s.ensureBucketExists(bucketName); err != nil {
return "", err
}
key := fmt.Sprintf("%s/chunk_%d", s3s.GetChunkDir(sessionID), chunkIndex)
content, err := io.ReadAll(reader)
if err != nil {
logrus.Errorf("Failed to read chunk data: %v", err)
return "", err
}
_, err = s3s.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: bytes.NewReader(content),
})
if err != nil {
logrus.Errorf("Failed to upload chunk to S3: %v", err)
return "", err
}
logrus.Debugf("Saved chunk %d to S3, size: %d bytes, key: %s", chunkIndex, len(content), key)
return key, nil
}
func (s3s *S3Storage) ChunkExists(sessionID string, chunkIndex int) bool {
bucketName := "grdata"
key := fmt.Sprintf("%s/chunk_%d", s3s.GetChunkDir(sessionID), chunkIndex)
_, err := s3s.s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
return err == nil
}
func (s3s *S3Storage) MergeChunks(sessionID string, outputPath string, totalChunks int) error {
bucketName := "grdata"
chunkKeyPrefix := s3s.GetChunkDir(sessionID)
if strings.HasPrefix(outputPath, "/grdata/") || strings.HasPrefix(outputPath, "grdata/") {
return s3s.mergeChunksToS3(bucketName, chunkKeyPrefix, outputPath, totalChunks)
}
return s3s.mergeChunksToLocal(bucketName, chunkKeyPrefix, outputPath, totalChunks)
}
func (s3s *S3Storage) mergeChunksToLocal(bucketName, chunkKeyPrefix, outputPath string, totalChunks int) error {
outputDir := filepath.Dir(outputPath)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %v", err)
}
outputFile, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create output file: %v", err)
}
defer outputFile.Close()
var totalWritten int64
for i := 0; i < totalChunks; i++ {
key := fmt.Sprintf("%s/chunk_%d", chunkKeyPrefix, i)
result, err := s3s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("failed to download chunk %d from S3: %v", i, err)
}
written, err := io.Copy(outputFile, result.Body)
result.Body.Close()
if err != nil {
return fmt.Errorf("failed to merge chunk %d: %v", i, err)
}
totalWritten += written
logrus.Debugf("Merged chunk %d from S3, size: %d bytes", i, written)
}
logrus.Infof("Successfully merged %d chunks from S3 to %s, total size: %d bytes", totalChunks, outputPath, totalWritten)
return nil
}
func (s3s *S3Storage) mergeChunksToS3(bucketName, chunkKeyPrefix, outputPath string, totalChunks int) error {
tempFile, err := os.CreateTemp("", "s3-merge-*")
if err != nil {
return fmt.Errorf("failed to create temp file: %v", err)
}
defer os.Remove(tempFile.Name())
defer tempFile.Close()
for i := 0; i < totalChunks; i++ {
key := fmt.Sprintf("%s/chunk_%d", chunkKeyPrefix, i)
result, err := s3s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err != nil {
return fmt.Errorf("failed to download chunk %d: %v", i, err)
}
_, err = io.Copy(tempFile, result.Body)
result.Body.Close()
if err != nil {
return fmt.Errorf("failed to write chunk %d to temp file: %v", i, err)
}
}
tempFile.Seek(0, 0)
_, outputKey, err := s3s.ParseDirPath(outputPath, true)
if err != nil {
return err
}
_, err = s3s.s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(outputKey),
Body: tempFile,
})
if err != nil {
return fmt.Errorf("failed to upload merged file to S3: %v", err)
}
logrus.Infof("Successfully merged %d chunks to S3: %s", totalChunks, outputKey)
return nil
}
func (s3s *S3Storage) CleanupChunks(sessionID string) error {
bucketName := "grdata"
prefix := s3s.GetChunkDir(sessionID)
result, err := s3s.s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: aws.String(prefix),
})
if err != nil {
logrus.Errorf("Failed to list chunks in S3: %v", err)
return err
}
if len(result.Contents) == 0 {
return nil
}
objects := make([]*s3.ObjectIdentifier, 0, len(result.Contents))
for _, obj := range result.Contents {
objects = append(objects, &s3.ObjectIdentifier{Key: obj.Key})
}
_, err = s3s.s3Client.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(bucketName),
Delete: &s3.Delete{
Objects: objects,
Quiet: aws.Bool(true),
},
})
if err != nil {
logrus.Errorf("Failed to delete chunks from S3: %v", err)
return err
}
logrus.Debugf("Cleaned up chunks for session: %s from S3", sessionID)
return nil
}
func (s3s *S3Storage) ReadFile(filePath string) (ReadCloser, error) {
bucketName, key, err := s3s.ParseDirPath(filePath, true)
if err != nil {
return nil, fmt.Errorf("failed to parse file path: %w", err)
}
result, err := s3s.s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
if err != nil {
return nil, fmt.Errorf("failed to get object from S3: %w", err)
}
return result.Body, nil
}
func (s3s *S3Storage) InitBucketLifecycle() error {
bucketName := "grdata"
logrus.Infof("[Lifecycle Init] Starting to initialize bucket lifecycle policy for: %s", bucketName)
logrus.Infof("[Lifecycle Init] Checking if bucket exists: %s", bucketName)
headResp, err := s3s.s3Client.HeadBucket(&s3.HeadBucketInput{
Bucket: aws.String(bucketName),
})
bucketExists := true
if err != nil {
logrus.Warnf("[Lifecycle Init] Bucket %s does not exist or error checking: %v", bucketName, err)
logrus.Infof("[Lifecycle Init] Attempting to create bucket: %s", bucketName)
createResp, err := s3s.s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
logrus.Errorf("[Lifecycle Init] Failed to create bucket %s: %v", bucketName, err)
return fmt.Errorf("failed to create bucket %s: %w", bucketName, err)
}
bucketExists = false
logrus.Infof("[Lifecycle Init] Successfully created new bucket: %s, response: %+v", bucketName, createResp)
} else {
logrus.Infof("[Lifecycle Init] Bucket %s already exists, response: %+v", bucketName, headResp)
}
logrus.Infof("[Lifecycle Init] Configuring lifecycle policy for bucket: %s (bucketExists=%v)", bucketName, bucketExists)
if err := s3s.ensureBucketLifecycle(bucketName, bucketExists); err != nil {
logrus.Errorf("[Lifecycle Init] Failed to configure lifecycle for bucket %s: %v", bucketName, err)
return fmt.Errorf("failed to configure lifecycle for bucket %s: %w", bucketName, err)
}
logrus.Infof("[Lifecycle Init] Successfully initialized bucket lifecycle policy for: %s", bucketName)
return nil
}