package db
import (
"bufio"
"encoding/json"
"fmt"
eventutil "github.com/goodrain/rainbond/api/eventlog/util"
"github.com/goodrain/rainbond/pkg/component/storage"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/goodrain/rainbond/util"
"github.com/sirupsen/logrus"
)
type EventFilePlugin struct {
HomePath string
uploadPool chan struct{}
uploadWg sync.WaitGroup
shutdownOnce sync.Once
cleanupStopCh chan struct{}
cleanupDone chan struct{}
}
const maxConcurrentUploads = 10
func NewEventFilePlugin(homePath string) *EventFilePlugin {
plugin := &EventFilePlugin{
HomePath: homePath,
uploadPool: make(chan struct{}, maxConcurrentUploads),
cleanupStopCh: make(chan struct{}),
cleanupDone: make(chan struct{}),
}
go plugin.startCleanupTask()
return plugin
}
func (m *EventFilePlugin) SaveMessage(events []*EventLogMessage) error {
if len(events) == 0 {
return nil
}
logrus.Debugf("init event file plugin save message")
filePath := eventutil.EventLogFilePath(m.HomePath)
if err := util.CheckAndCreateDir(filePath); err != nil {
return err
}
filename := eventutil.EventLogFileName(filePath, events[0].EventID)
writeFile, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0755)
if err != nil {
return err
}
defer writeFile.Close()
var lastTime int64
for _, e := range events {
if e == nil {
continue
}
writeFile.Write(GetLevelFlag(e.Level))
logtime := GetTimeUnix(e.Time)
if logtime != 0 {
lastTime = logtime
}
writeFile.Write([]byte(fmt.Sprintf("%d ", lastTime)))
writeFile.Write([]byte(e.Message))
writeFile.Write([]byte("\n"))
}
m.asyncUploadWithRetry(filename)
return nil
}
func (m *EventFilePlugin) asyncUploadWithRetry(filename string) {
m.uploadWg.Add(1)
go func() {
defer m.uploadWg.Done()
m.uploadPool <- struct{}{}
defer func() { <-m.uploadPool }()
maxRetries := 3
retryDelay := time.Second
for attempt := 1; attempt <= maxRetries; attempt++ {
err := storage.Default().StorageCli.UploadFileToFile(filename, filename, nil)
if err == nil {
logrus.Debugf("Successfully uploaded log file to storage: %s", filename)
return
}
if attempt < maxRetries {
logrus.Warnf("Failed to upload log file %s (attempt %d/%d): %v, retrying in %v...",
filename, attempt, maxRetries, err, retryDelay)
time.Sleep(retryDelay)
retryDelay *= 2
} else {
logrus.Errorf("Failed to upload log file %s after %d attempts: %v",
filename, maxRetries, err)
}
}
}()
}
type MessageData struct {
Message string `json:"message"`
Time string `json:"time"`
Unixtime int64 `json:"utime"`
}
type MessageDataList []MessageData
func (a MessageDataList) Len() int { return len(a) }
func (a MessageDataList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a MessageDataList) Less(i, j int) bool { return a[i].Unixtime <= a[j].Unixtime }
func (m *EventFilePlugin) GetMessages(eventID, level string, length int) (interface{}, error) {
if messages, err := m.getJSONLinesMessages(eventID, level, length); err != nil {
logrus.Debugf("read jsonl event log error for %s: %v", eventID, err)
} else if len(messages) > 0 {
return messages, nil
}
var message MessageDataList
s3Path := path.Join("grdata", "logs", "eventlog", eventID+".log")
fileReader, err := m.readStorageOrLocalFile(s3Path, path.Join(m.HomePath, "eventlog", eventID+".log"))
if err != nil {
if os.IsNotExist(err) {
logrus.Warnf("Event log file not found in storage or local: %s", eventID)
return message, nil
}
return nil, fmt.Errorf("failed to read event log from both storage and local: %w", err)
}
defer fileReader.Close()
reader := bufio.NewReader(fileReader)
for {
line, _, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
logrus.Error("read event log file error:", err.Error())
}
break
}
if len(line) > 2 {
flag := line[0]
if CheckLevel(string(flag), level) {
info := strings.SplitN(string(line), " ", 3)
if len(info) == 3 {
timeunix := info[1]
unix, _ := strconv.ParseInt(timeunix, 10, 64)
tm := time.Unix(unix, 0)
md := MessageData{
Message: info[2],
Unixtime: unix,
Time: tm.Format(time.RFC3339),
}
message = append(message, md)
if len(message) > length && length != 0 {
break
}
}
}
}
}
return message, nil
}
func (m *EventFilePlugin) getJSONLinesMessages(eventID, level string, length int) (MessageDataList, error) {
fileReader, err := m.readStorageOrLocalFile(
path.Join("grdata", "logs", "eventlog", eventID+".jsonl"),
path.Join(m.HomePath, "eventlog", eventID+".jsonl"))
if err != nil {
return nil, err
}
defer fileReader.Close()
var messages MessageDataList
reader := bufio.NewReader(fileReader)
for {
line, _, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
break
}
return messages, err
}
if len(line) == 0 {
continue
}
var eventMessage EventLogMessage
if err := json.Unmarshal(line, &eventMessage); err != nil {
logrus.Debugf("skip invalid jsonl line for event %s: %v", eventID, err)
continue
}
if !checkStructuredLevel(eventMessage.Level, level) {
continue
}
unix := GetTimeUnix(eventMessage.Time)
eventTime := eventMessage.Time
if unix != 0 {
eventTime = time.Unix(unix, 0).Format(time.RFC3339)
}
messages = append(messages, MessageData{
Message: eventMessage.Message,
Time: eventTime,
Unixtime: unix,
})
if len(messages) > length && length != 0 {
break
}
}
return messages, nil
}
func (m *EventFilePlugin) readStorageOrLocalFile(storagePath, localPath string) (storage.ReadCloser, error) {
if storage.Default() != nil && storage.Default().StorageCli != nil {
fileReader, err := storage.Default().StorageCli.ReadFile(storagePath)
if err == nil {
return fileReader, nil
}
logrus.Debugf("failed to read event log from storage, trying local path %s: %v", localPath, err)
}
localFile, err := os.Open(localPath)
if err != nil {
return nil, err
}
return localFile, nil
}
func checkStructuredLevel(logLevel, queryLevel string) bool {
switch logLevel {
case "error":
return true
case "info":
return queryLevel != "error"
case "debug":
return queryLevel == "debug"
default:
return queryLevel != "error"
}
}
func CheckLevel(flag, level string) bool {
switch flag {
case "0":
return true
case "1":
if level != "error" {
return true
}
case "2":
if level == "debug" {
return true
}
}
return false
}
func GetTimeUnix(timeStr string) int64 {
var timeLayout string
if strings.Contains(timeStr, ".") {
timeLayout = "2006-01-02T15:04:05"
} else {
timeLayout = "2006-01-02T15:04:05+08:00"
}
loc, _ := time.LoadLocation("Local")
utime, err := time.ParseInLocation(timeLayout, timeStr, loc)
if err != nil {
logrus.Errorf("Parse log time error %s", err.Error())
return 0
}
return utime.Unix()
}
func GetLevelFlag(level string) []byte {
switch level {
case "error":
return []byte("0 ")
case "info":
return []byte("1 ")
case "debug":
return []byte("2 ")
default:
return []byte("0 ")
}
}
func (m *EventFilePlugin) startCleanupTask() {
defer close(m.cleanupDone)
retentionDays := 3
if envDays := os.Getenv("EVENT_LOG_RETENTION_DAYS"); envDays != "" {
if days, err := strconv.Atoi(envDays); err == nil && days > 0 {
retentionDays = days
}
}
logrus.Infof("Event log local cleanup task started, retention days: %d", retentionDays)
m.cleanupOldLogs(retentionDays)
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
now := time.Now()
next2AM := time.Date(now.Year(), now.Month(), now.Day()+1, 2, 0, 0, 0, now.Location())
if now.Hour() < 2 {
next2AM = time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, now.Location())
}
firstDelay := next2AM.Sub(now)
firstTimer := time.NewTimer(firstDelay)
defer firstTimer.Stop()
select {
case <-firstTimer.C:
m.cleanupOldLogs(retentionDays)
case <-m.cleanupStopCh:
logrus.Info("Event log cleanup task stopped before first run")
return
}
for {
select {
case <-ticker.C:
m.cleanupOldLogs(retentionDays)
case <-m.cleanupStopCh:
logrus.Info("Event log cleanup task stopped")
return
}
}
}
func (m *EventFilePlugin) cleanupOldLogs(retentionDays int) {
logDir := path.Join(m.HomePath, "eventlog")
if _, err := os.Stat(logDir); os.IsNotExist(err) {
logrus.Debugf("Event log directory does not exist: %s", logDir)
return
}
logrus.Infof("Starting cleanup of event logs older than %d days in %s", retentionDays, logDir)
cutoffTime := time.Now().AddDate(0, 0, -retentionDays)
entries, err := os.ReadDir(logDir)
if err != nil {
logrus.Errorf("Failed to read event log directory %s: %v", logDir, err)
return
}
deletedCount := 0
deletedSize := int64(0)
errorCount := 0
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".log") {
continue
}
filePath := path.Join(logDir, entry.Name())
fileInfo, err := entry.Info()
if err != nil {
logrus.Warnf("Failed to get file info for %s: %v", filePath, err)
errorCount++
continue
}
if fileInfo.ModTime().Before(cutoffTime) {
if err := os.Remove(filePath); err != nil {
logrus.Errorf("Failed to delete old log file %s: %v", filePath, err)
errorCount++
} else {
deletedCount++
deletedSize += fileInfo.Size()
logrus.Debugf("Deleted old log file: %s (age: %v, size: %d bytes)",
entry.Name(),
time.Since(fileInfo.ModTime()).Round(time.Hour),
fileInfo.Size())
}
}
}
if deletedCount > 0 || errorCount > 0 {
logrus.Infof("Event log cleanup completed: deleted %d files (%.2f MB), %d errors",
deletedCount,
float64(deletedSize)/(1024*1024),
errorCount)
} else {
logrus.Debugf("Event log cleanup completed: no old files to delete")
}
}
func (m *EventFilePlugin) Close() error {
m.shutdownOnce.Do(func() {
logrus.Info("Waiting for all log file uploads to complete...")
m.uploadWg.Wait()
logrus.Info("All log file uploads completed")
close(m.cleanupStopCh)
<-m.cleanupDone
logrus.Info("Event log cleanup task stopped")
})
return nil
}