package store
import (
"encoding/json"
"errors"
"github.com/goodrain/rainbond/api/eventlog/db"
"sync"
"time"
"github.com/sirupsen/logrus"
)
type EventBarrel struct {
eventID string
barrel []*db.EventLogMessage
persistenceBarrel []*db.EventLogMessage
needPersistence bool
persistencelock sync.Mutex
barrelEvent chan []string
maxNumber int64
cacheNumber int
size int64
isCallback bool
updateTime time.Time
}
func (e *EventBarrel) insert(m *db.EventLogMessage) error {
if e.size > e.maxNumber {
return errors.New("received message number more than peer event max message number")
}
if m.Step == "progress" {
return nil
}
e.barrel = append(e.barrel, m)
e.size++
e.analysis(m)
if len(e.barrel) >= e.cacheNumber {
e.persistence()
}
e.updateTime = time.Now()
return nil
}
func (e *EventBarrel) empty() {
e.size = 0
e.eventID = ""
e.barrel = e.barrel[:0]
e.persistenceBarrel = e.persistenceBarrel[:0]
e.needPersistence = false
e.isCallback = false
}
func (e *EventBarrel) analysis(newMessage *db.EventLogMessage) {
if newMessage.Step == "last" || newMessage.Step == "callback" {
e.persistence()
e.barrelEvent <- []string{"callback", e.eventID, newMessage.Status, newMessage.Message}
}
if newMessage.Step == "code-version" {
e.barrelEvent <- []string{"code-version", e.eventID, newMessage.Message}
}
}
func (e *EventBarrel) persistence() {
e.persistencelock.Lock()
defer e.persistencelock.Unlock()
e.needPersistence = true
e.persistenceBarrel = append(e.persistenceBarrel, e.barrel...)
e.barrel = e.barrel[:0]
select {
case e.barrelEvent <- []string{"persistence", e.eventID}:
default:
logrus.Debug("event message log persistence delay")
}
}
func (e *EventBarrel) gcPersistence() {
e.needPersistence = true
e.persistenceBarrel = append(e.persistenceBarrel, e.barrel...)
e.barrel = nil
}
type readEventBarrel struct {
subSocketChan map[string]chan *db.EventLogMessage
subLock sync.Mutex
updateTime time.Time
eventID string
fileStore FileStore
}
func rebuildMessageContent(m *db.EventLogMessage) {
data := map[string]string{
"event_id": m.EventID,
"step": m.Step,
"status": m.Status,
"message": m.Message,
"level": m.Level,
"time": m.Time,
}
if content, err := json.Marshal(data); err == nil {
m.Content = content
}
}
func (r *readEventBarrel) empty() {
r.subLock.Lock()
defer r.subLock.Unlock()
for _, ch := range r.subSocketChan {
close(ch)
}
r.subSocketChan = make(map[string]chan *db.EventLogMessage)
}
func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) {
r.updateTime = time.Now()
if r.fileStore != nil && message != nil {
if err := r.fileStore.Append(r.eventID, message); err != nil {
logrus.Errorf("Failed to append message to file for event %s: %v", r.eventID, err)
}
}
r.subLock.Lock()
defer r.subLock.Unlock()
for _, v := range r.subSocketChan {
select {
case v <- message:
default:
}
}
}
func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
r.subSocketChan[subID] = ch
r.subLock.Unlock()
if r.fileStore != nil && r.eventID != "" {
messages, err := r.fileStore.ReadLast(r.eventID, 1000)
if err != nil {
logrus.Errorf("Failed to read history for event %s: %v", r.eventID, err)
} else {
for _, m := range messages {
if m.Content == nil {
rebuildMessageContent(m)
}
select {
case ch <- m:
case <-time.After(5 * time.Second):
logrus.Warnf("Timeout pushing history for event %s", r.eventID)
return
}
}
}
}
}
func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage {
r.subLock.Lock()
if sub, ok := r.subSocketChan[subID]; ok {
r.subLock.Unlock()
return sub
}
r.subLock.Unlock()
ch := make(chan *db.EventLogMessage, 1024)
go r.pushCashMessage(ch, subID)
return ch
}
func (r *readEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
type dockerLogEventBarrel struct {
name string
subSocketChan map[string]chan *db.EventLogMessage
subLock sync.Mutex
updateTime time.Time
size int
cacheSize int64
persistencelock sync.Mutex
persistenceTime time.Time
fileStore FileStore
}
func (r *dockerLogEventBarrel) empty() {
r.subLock.Lock()
defer r.subLock.Unlock()
for _, ch := range r.subSocketChan {
close(ch)
}
r.subSocketChan = make(map[string]chan *db.EventLogMessage)
r.size = 0
r.name = ""
}
func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) {
if r.name == "" {
r.name = message.EventID
}
r.updateTime = time.Now()
r.size++
if r.fileStore != nil && message != nil {
if err := r.fileStore.Append(r.name, message); err != nil {
logrus.Errorf("Failed to append docker log for %s: %v", r.name, err)
}
}
r.subLock.Lock()
defer r.subLock.Unlock()
for _, v := range r.subSocketChan {
select {
case v <- message:
default:
}
}
}
func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
r.subSocketChan[subID] = ch
r.subLock.Unlock()
if r.fileStore != nil && r.name != "" {
messages, err := r.fileStore.ReadLast(r.name, 1000)
if err != nil {
logrus.Errorf("Failed to read docker log history for %s: %v", r.name, err)
} else {
for _, m := range messages {
if m.Content == nil {
rebuildMessageContent(m)
}
select {
case ch <- m:
case <-time.After(5 * time.Second):
logrus.Warnf("Timeout pushing docker log history for %s", r.name)
return
}
}
}
}
}
func (r *dockerLogEventBarrel) addSubChan(subID string) chan *db.EventLogMessage {
r.subLock.Lock()
if sub, ok := r.subSocketChan[subID]; ok {
r.subLock.Unlock()
return sub
}
r.subLock.Unlock()
ch := make(chan *db.EventLogMessage, 100)
go r.pushCashMessage(ch, subID)
return ch
}
func (r *dockerLogEventBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}
func (r *dockerLogEventBarrel) persistence() {
r.persistencelock.Lock()
defer r.persistencelock.Unlock()
r.persistenceTime = time.Now()
}
func (r *dockerLogEventBarrel) gcPersistence() {
}
func (r *dockerLogEventBarrel) GetSubChanLength() int {
r.subLock.Lock()
defer r.subLock.Unlock()
return len(r.subSocketChan)
}
type monitorMessageBarrel struct {
barrel []*db.EventLogMessage
subSocketChan map[string]chan *db.EventLogMessage
subLock sync.Mutex
updateTime time.Time
}
func (r *monitorMessageBarrel) empty() {
r.subLock.Lock()
defer r.subLock.Unlock()
if r.barrel != nil {
r.barrel = r.barrel[:0]
}
for _, ch := range r.subSocketChan {
close(ch)
}
r.subSocketChan = make(map[string]chan *db.EventLogMessage)
}
func (r *monitorMessageBarrel) insertMessage(message *db.EventLogMessage) {
r.updateTime = time.Now()
r.subLock.Lock()
defer r.subLock.Unlock()
for _, v := range r.subSocketChan {
select {
case v <- message:
default:
}
}
}
func (r *monitorMessageBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
r.subSocketChan[subID] = ch
}
func (r *monitorMessageBarrel) addSubChan(subID string) chan *db.EventLogMessage {
r.subLock.Lock()
defer r.subLock.Unlock()
if sub, ok := r.subSocketChan[subID]; ok {
return sub
}
ch := make(chan *db.EventLogMessage, 10)
go r.pushCashMessage(ch, subID)
return ch
}
func (r *monitorMessageBarrel) delSubChan(subID string) {
r.subLock.Lock()
defer r.subLock.Unlock()
if ch, ok := r.subSocketChan[subID]; ok {
close(ch)
delete(r.subSocketChan, subID)
}
}