package store
import (
"github.com/goodrain/rainbond/api/eventlog/db"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
)
type MessageStore interface {
InsertMessage(*db.EventLogMessage)
InsertGarbageMessage(...*db.EventLogMessage)
GetHistoryMessage(eventID string, length int) []string
SubChan(eventID, subID string) chan *db.EventLogMessage
RealseSubChan(eventID, subID string)
GetMonitorData() *db.MonitorData
Run()
Gc()
stop()
Scrape(ch chan<- prometheus.Metric, namespace, exporter, from string) error
}
func NewStore(storeType string, manager *storeManager) MessageStore {
ctx, cancel := context.WithCancel(context.Background())
if storeType == "handle" {
handle := &handleMessageStore{
barrels: make(map[string]*EventBarrel, 100),
conf: manager.conf,
log: manager.log.WithField("module", "HandleMessageStore"),
garbageGC: make(chan int),
ctx: ctx,
cancel: cancel,
barrelEvent: make(chan []string, 100),
dbPlugin: manager.dbPlugin,
handleEventCoreSize: 2,
stopGarbage: make(chan struct{}),
manager: manager,
}
handle.pool = &sync.Pool{
New: func() interface{} {
barrel := &EventBarrel{
barrel: make([]*db.EventLogMessage, 0),
persistenceBarrel: make([]*db.EventLogMessage, 0),
barrelEvent: handle.barrelEvent,
cacheNumber: manager.conf.PeerEventMaxCacheLogNumber,
maxNumber: manager.conf.PeerEventMaxLogNumber,
}
return barrel
},
}
go handle.handleGarbageMessage()
for i := 0; i < handle.handleEventCoreSize; i++ {
go handle.handleBarrelEvent()
}
return handle
}
if storeType == "read" {
read := &readMessageStore{
barrels: make(map[string]*readEventBarrel, 100),
conf: manager.conf,
log: manager.log.WithField("module", "SubMessageStore"),
ctx: ctx,
cancel: cancel,
fileStore: manager.messageFileStore,
}
read.pool = &sync.Pool{
New: func() interface{} {
reb := &readEventBarrel{
subSocketChan: make(map[string]chan *db.EventLogMessage, 0),
fileStore: manager.messageFileStore,
}
return reb
},
}
return read
}
if storeType == "docker_log" {
docker := &dockerLogStore{
barrels: make(map[string]*dockerLogEventBarrel, 100),
conf: manager.conf,
log: manager.log.WithField("module", "DockerLogStore"),
ctx: ctx,
cancel: cancel,
filePlugin: manager.filePlugin,
fileStore: manager.messageFileStore,
barrelEvent: make(chan []string, 100),
}
docker.pool = &sync.Pool{
New: func() interface{} {
reb := &dockerLogEventBarrel{
subSocketChan: make(map[string]chan *db.EventLogMessage, 0),
cacheSize: manager.conf.PeerDockerMaxCacheLogNumber,
persistenceTime: time.Now(),
fileStore: manager.messageFileStore,
}
return reb
},
}
return docker
}
if storeType == "newmonitor" {
monitor := &newMonitorMessageStore{
barrels: make(map[string]*CacheMonitorMessageList, 100),
conf: manager.conf,
log: manager.log.WithField("module", "NewMonitorMessageStore"),
ctx: ctx,
cancel: cancel,
}
return monitor
}
return nil
}