package controller
import (
"fmt"
"mime/multipart"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/pkg/component/storage"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)
const (
DefaultChunkSize = 5 * 1024 * 1024
MinChunkSize = 1 * 1024 * 1024
MaxChunkSize = 20 * 1024 * 1024
SessionExpiryHours = 24
MaxConcurrentUploads = 10
)
type ChunkUploadManager struct {
sessionCache map[string]*model.UploadSession
cacheMutex sync.RWMutex
uploadSemaphore chan struct{}
cleanupTicker *time.Ticker
}
var chunkUploadManager *ChunkUploadManager
var chunkUploadOnce sync.Once
func GetChunkUploadManager() *ChunkUploadManager {
chunkUploadOnce.Do(func() {
chunkUploadManager = &ChunkUploadManager{
sessionCache: make(map[string]*model.UploadSession),
uploadSemaphore: make(chan struct{}, MaxConcurrentUploads),
cleanupTicker: time.NewTicker(1 * time.Hour),
}
go chunkUploadManager.startCleanupWorker()
})
return chunkUploadManager
}
func (m *ChunkUploadManager) InitUploadSession(eventID, fileName string, fileSize int64, fileMD5 string, chunkSize int) (*model.UploadSession, error) {
if chunkSize == 0 {
chunkSize = DefaultChunkSize
}
if chunkSize < MinChunkSize || chunkSize > MaxChunkSize {
return nil, fmt.Errorf("chunk size must be between %d and %d", MinChunkSize, MaxChunkSize)
}
existingSession, err := db.GetManager().UploadSessionDao().GetByEventIDAndFileName(eventID, fileName)
if err != nil {
return nil, fmt.Errorf("failed to check existing session: %v", err)
}
if existingSession != nil && existingSession.Status == "uploading" {
logrus.Infof("Resume existing upload session: %s", existingSession.ID)
m.cacheSession(existingSession)
return existingSession, nil
}
sessionID := uuid.New().String()
totalChunks := int((fileSize + int64(chunkSize) - 1) / int64(chunkSize))
session := &model.UploadSession{
ID: sessionID,
EventID: eventID,
FileName: fileName,
FileSize: fileSize,
FileMD5: fileMD5,
ChunkSize: chunkSize,
TotalChunks: totalChunks,
UploadedChunks: "",
Status: "uploading",
StoragePath: fmt.Sprintf("/grdata/package_build/temp/events/%s/%s", eventID, fileName),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
ExpiresAt: time.Now().Add(SessionExpiryHours * time.Hour),
}
if err := db.GetManager().UploadSessionDao().AddModel(session); err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
}
m.cacheSession(session)
logrus.Infof("Created new upload session: %s, total chunks: %d", sessionID, totalChunks)
return session, nil
}
func (m *ChunkUploadManager) SaveChunk(sessionID string, chunkIndex int, reader multipart.File) error {
session, err := m.getSession(sessionID)
if err != nil {
return err
}
if session.Status != "uploading" {
return fmt.Errorf("session status is %s, cannot upload", session.Status)
}
if chunkIndex < 0 || chunkIndex >= session.TotalChunks {
return fmt.Errorf("invalid chunk index: %d, total chunks: %d", chunkIndex, session.TotalChunks)
}
if storage.Default().StorageCli.ChunkExists(sessionID, chunkIndex) {
logrus.Debugf("Chunk %d already exists for session %s, skipping", chunkIndex, sessionID)
return nil
}
m.uploadSemaphore <- struct{}{}
defer func() { <-m.uploadSemaphore }()
_, err = storage.Default().StorageCli.SaveChunk(sessionID, chunkIndex, reader)
if err != nil {
return fmt.Errorf("failed to save chunk: %v", err)
}
if err := m.updateUploadedChunks(session, chunkIndex); err != nil {
return err
}
logrus.Infof("Saved chunk %d/%d for session %s", chunkIndex+1, session.TotalChunks, sessionID)
return nil
}
func (m *ChunkUploadManager) CompleteUpload(sessionID string) (string, error) {
session, err := m.getSession(sessionID)
if err != nil {
return "", err
}
uploadedChunks := m.parseUploadedChunks(session.UploadedChunks)
if len(uploadedChunks) != session.TotalChunks {
missingChunks := m.getMissingChunks(uploadedChunks, session.TotalChunks)
return "", fmt.Errorf("not all chunks uploaded, missing: %v", missingChunks)
}
logrus.Infof("Merging %d chunks for session %s", session.TotalChunks, sessionID)
err = storage.Default().StorageCli.MergeChunks(sessionID, session.StoragePath, session.TotalChunks)
if err != nil {
session.Status = "failed"
db.GetManager().UploadSessionDao().UpdateModel(session)
return "", fmt.Errorf("failed to merge chunks: %v", err)
}
if err := storage.Default().StorageCli.CleanupChunks(sessionID); err != nil {
logrus.Warnf("Failed to cleanup chunks for session %s: %v", sessionID, err)
}
session.Status = "completed"
session.UpdatedAt = time.Now()
if err := db.GetManager().UploadSessionDao().UpdateModel(session); err != nil {
logrus.Errorf("Failed to update session status: %v", err)
}
m.removeFromCache(sessionID)
logrus.Infof("Upload completed for session %s, file: %s", sessionID, session.StoragePath)
return session.StoragePath, nil
}
func (m *ChunkUploadManager) GetUploadStatus(sessionID string) (*UploadStatusResponse, error) {
session, err := m.getSession(sessionID)
if err != nil {
return nil, err
}
uploadedChunks := m.parseUploadedChunks(session.UploadedChunks)
missingChunks := m.getMissingChunks(uploadedChunks, session.TotalChunks)
progress := float64(len(uploadedChunks)) / float64(session.TotalChunks) * 100
return &UploadStatusResponse{
SessionID: session.ID,
FileName: session.FileName,
FileSize: session.FileSize,
ChunkSize: session.ChunkSize,
TotalChunks: session.TotalChunks,
UploadedChunks: uploadedChunks,
MissingChunks: missingChunks,
Progress: progress,
Status: session.Status,
CreatedAt: session.CreatedAt,
UpdatedAt: session.UpdatedAt,
}, nil
}
func (m *ChunkUploadManager) CancelUpload(sessionID string) error {
_, err := m.getSession(sessionID)
if err != nil {
return err
}
if err := storage.Default().StorageCli.CleanupChunks(sessionID); err != nil {
logrus.Warnf("Failed to cleanup chunks: %v", err)
}
if err := db.GetManager().UploadSessionDao().DeleteByID(sessionID); err != nil {
return fmt.Errorf("failed to delete session: %v", err)
}
m.removeFromCache(sessionID)
logrus.Infof("Cancelled upload session: %s", sessionID)
return nil
}
func (m *ChunkUploadManager) getSession(sessionID string) (*model.UploadSession, error) {
m.cacheMutex.RLock()
if session, ok := m.sessionCache[sessionID]; ok {
m.cacheMutex.RUnlock()
return session, nil
}
m.cacheMutex.RUnlock()
session, err := db.GetManager().UploadSessionDao().GetByID(sessionID)
if err != nil {
return nil, fmt.Errorf("session not found: %v", err)
}
m.cacheSession(session)
return session, nil
}
func (m *ChunkUploadManager) cacheSession(session *model.UploadSession) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
m.sessionCache[session.ID] = session
}
func (m *ChunkUploadManager) removeFromCache(sessionID string) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.sessionCache, sessionID)
}
func (m *ChunkUploadManager) updateUploadedChunks(session *model.UploadSession, chunkIndex int) error {
uploadedChunks := m.parseUploadedChunks(session.UploadedChunks)
for _, idx := range uploadedChunks {
if idx == chunkIndex {
return nil
}
}
uploadedChunks = append(uploadedChunks, chunkIndex)
sort.Ints(uploadedChunks)
strChunks := make([]string, len(uploadedChunks))
for i, idx := range uploadedChunks {
strChunks[i] = strconv.Itoa(idx)
}
session.UploadedChunks = strings.Join(strChunks, ",")
session.UpdatedAt = time.Now()
if err := db.GetManager().UploadSessionDao().UpdateModel(session); err != nil {
return fmt.Errorf("failed to update session: %v", err)
}
return nil
}
func (m *ChunkUploadManager) parseUploadedChunks(uploadedChunksStr string) []int {
if uploadedChunksStr == "" {
return []int{}
}
parts := strings.Split(uploadedChunksStr, ",")
chunks := make([]int, 0, len(parts))
for _, part := range parts {
if idx, err := strconv.Atoi(strings.TrimSpace(part)); err == nil {
chunks = append(chunks, idx)
}
}
return chunks
}
func (m *ChunkUploadManager) getMissingChunks(uploadedChunks []int, totalChunks int) []int {
uploaded := make(map[int]bool)
for _, idx := range uploadedChunks {
uploaded[idx] = true
}
missing := make([]int, 0)
for i := 0; i < totalChunks; i++ {
if !uploaded[i] {
missing = append(missing, i)
}
}
return missing
}
func (m *ChunkUploadManager) startCleanupWorker() {
for range m.cleanupTicker.C {
if err := m.cleanExpiredSessions(); err != nil {
logrus.Errorf("Failed to clean expired sessions: %v", err)
}
}
}
func (m *ChunkUploadManager) cleanExpiredSessions() error {
if err := db.GetManager().UploadSessionDao().CleanExpiredSessions(); err != nil {
return err
}
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
now := time.Now()
for id, session := range m.sessionCache {
if session.ExpiresAt.Before(now) {
storage.Default().StorageCli.CleanupChunks(id)
delete(m.sessionCache, id)
}
}
logrus.Debug("Cleaned up expired upload sessions")
return nil
}
type UploadStatusResponse struct {
SessionID string `json:"session_id"`
FileName string `json:"file_name"`
FileSize int64 `json:"file_size"`
ChunkSize int `json:"chunk_size"`
TotalChunks int `json:"total_chunks"`
UploadedChunks []int `json:"uploaded_chunks"`
MissingChunks []int `json:"missing_chunks"`
Progress float64 `json:"progress"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}