package op
import (
"context"
"crypto/rand"
"encoding/hex"
"sync"
"time"
"github.com/bestruirui/octopus/internal/db"
"github.com/bestruirui/octopus/internal/model"
"github.com/bestruirui/octopus/internal/utils/log"
"github.com/bestruirui/octopus/internal/utils/snowflake"
)
const relayLogMaxSize = 20
const relayLogMaxSizeNoDB = 100
var relayLogCache = make([]model.RelayLog, 0, relayLogMaxSize)
var relayLogCacheLock sync.Mutex
var relayLogFlushLock sync.Mutex
var relayLogSubscribers = make(map[chan model.RelayLog]struct{})
var relayLogSubscribersLock sync.RWMutex
var relayLogStreamTokens = make(map[string]struct{})
var relayLogStreamTokensLock sync.RWMutex
func RelayLogStreamTokenCreate() (string, error) {
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
token := hex.EncodeToString(bytes)
relayLogStreamTokensLock.Lock()
relayLogStreamTokens[token] = struct{}{}
relayLogStreamTokensLock.Unlock()
return token, nil
}
func RelayLogStreamTokenVerify(token string) bool {
relayLogStreamTokensLock.RLock()
_, ok := relayLogStreamTokens[token]
relayLogStreamTokensLock.RUnlock()
return ok
}
func RelayLogStreamTokenRevoke(token string) {
relayLogStreamTokensLock.Lock()
delete(relayLogStreamTokens, token)
relayLogStreamTokensLock.Unlock()
}
func RelayLogSubscribe() chan model.RelayLog {
ch := make(chan model.RelayLog, 10)
relayLogSubscribersLock.Lock()
relayLogSubscribers[ch] = struct{}{}
relayLogSubscribersLock.Unlock()
return ch
}
func RelayLogUnsubscribe(ch chan model.RelayLog) {
relayLogSubscribersLock.Lock()
delete(relayLogSubscribers, ch)
relayLogSubscribersLock.Unlock()
close(ch)
}
func notifySubscribers(relayLog model.RelayLog) {
relayLogSubscribersLock.RLock()
defer relayLogSubscribersLock.RUnlock()
for ch := range relayLogSubscribers {
select {
case ch <- relayLog:
default:
}
}
}
func relayLogFlushToDB(ctx context.Context) error {
relayLogFlushLock.Lock()
defer relayLogFlushLock.Unlock()
relayLogCacheLock.Lock()
if len(relayLogCache) == 0 {
relayLogCacheLock.Unlock()
return nil
}
batch := make([]model.RelayLog, len(relayLogCache))
copy(batch, relayLogCache)
flushedUpto := len(batch)
relayLogCacheLock.Unlock()
result := db.GetDB().WithContext(ctx).Create(&batch)
if result.Error != nil {
return result.Error
}
relayLogCacheLock.Lock()
if len(relayLogCache) >= flushedUpto {
relayLogCache = relayLogCache[flushedUpto:]
} else {
relayLogCache = relayLogCache[:0]
}
if len(relayLogCache) == 0 {
relayLogCache = make([]model.RelayLog, 0, relayLogMaxSize)
}
relayLogCacheLock.Unlock()
return nil
}
func RelayLogAdd(ctx context.Context, relayLog model.RelayLog) error {
enabled, err := SettingGetBool(model.SettingKeyRelayLogKeepEnabled)
if err != nil {
return err
}
maxSize := relayLogMaxSize
if !enabled {
maxSize = relayLogMaxSizeNoDB
}
relayLog.ID = snowflake.GenerateID()
go notifySubscribers(relayLog)
relayLogCacheLock.Lock()
relayLogCache = append(relayLogCache, relayLog)
if len(relayLogCache) >= maxSize {
if enabled {
relayLogCacheLock.Unlock()
return relayLogFlushToDB(ctx)
}
keepSize := maxSize / 2
if len(relayLogCache) > keepSize {
newCache := make([]model.RelayLog, keepSize, maxSize)
copy(newCache, relayLogCache[len(relayLogCache)-keepSize:])
relayLogCache = newCache
}
}
relayLogCacheLock.Unlock()
return nil
}
func RelayLogSaveDBTask(ctx context.Context) error {
log.Debugf("relay log save db task started")
startTime := time.Now()
defer func() {
log.Debugf("relay log save db task finished, save time: %s", time.Since(startTime))
}()
enabled, err := SettingGetBool(model.SettingKeyRelayLogKeepEnabled)
if err != nil {
return err
}
if enabled {
if err := relayLogFlushToDB(ctx); err != nil {
return err
}
return relayLogCleanup(ctx)
}
relayLogCacheLock.Lock()
if len(relayLogCache) > relayLogMaxSizeNoDB {
keepSize := relayLogMaxSizeNoDB / 2
newCache := make([]model.RelayLog, keepSize, relayLogMaxSizeNoDB)
copy(newCache, relayLogCache[len(relayLogCache)-keepSize:])
relayLogCache = newCache
}
relayLogCacheLock.Unlock()
return nil
}
func relayLogCleanup(ctx context.Context) error {
keepPeriod, err := SettingGetInt(model.SettingKeyRelayLogKeepPeriod)
if err != nil {
return err
}
if keepPeriod <= 0 {
return nil
}
cutoffTime := time.Now().Add(-time.Duration(keepPeriod) * 24 * time.Hour).Unix()
return db.GetDB().WithContext(ctx).Where("time < ?", cutoffTime).Delete(&model.RelayLog{}).Error
}
func RelayLogList(ctx context.Context, startTime, endTime *int, page, pageSize int) ([]model.RelayLog, error) {
enabled, err := SettingGetBool(model.SettingKeyRelayLogKeepEnabled)
if err != nil {
return nil, err
}
hasTimeFilter := startTime != nil && endTime != nil
relayLogCacheLock.Lock()
var cachedLogs []model.RelayLog
for _, log := range relayLogCache {
if hasTimeFilter {
if log.Time >= int64(*startTime) && log.Time <= int64(*endTime) {
cachedLogs = append(cachedLogs, log)
}
} else {
cachedLogs = append(cachedLogs, log)
}
}
relayLogCacheLock.Unlock()
for i, j := 0, len(cachedLogs)-1; i < j; i, j = i+1, j-1 {
cachedLogs[i], cachedLogs[j] = cachedLogs[j], cachedLogs[i]
}
cacheCount := len(cachedLogs)
offset := (page - 1) * pageSize
var result []model.RelayLog
if offset < cacheCount {
cacheEnd := offset + pageSize
if cacheEnd > cacheCount {
cacheEnd = cacheCount
}
result = append(result, cachedLogs[offset:cacheEnd]...)
}
if enabled {
remaining := pageSize - len(result)
if remaining > 0 {
dbOffset := 0
if offset > cacheCount {
dbOffset = offset - cacheCount
}
query := db.GetDB().WithContext(ctx)
if hasTimeFilter {
query = query.Where("time >= ? AND time <= ?", *startTime, *endTime)
}
var dbLogs []model.RelayLog
if err := query.Order("id DESC").Offset(dbOffset).Limit(remaining).Find(&dbLogs).Error; err != nil {
return nil, err
}
result = append(result, dbLogs...)
}
}
return result, nil
}
func RelayLogClear(ctx context.Context) error {
relayLogCacheLock.Lock()
relayLogCache = make([]model.RelayLog, 0, relayLogMaxSize)
relayLogCacheLock.Unlock()
return db.GetDB().WithContext(ctx).Where("1 = 1").Delete(&model.RelayLog{}).Error
}