package server
import (
"context"
"fmt"
"io"
"net"
"time"
"github.com/goodrain/rainbond/api/eventlog/conf"
"github.com/goodrain/rainbond/api/eventlog/entry/grpc/pb"
"github.com/goodrain/rainbond/api/eventlog/store"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)
type EventLogRPCServer struct {
conf conf.EventLogServerConf
log *logrus.Entry
cancel func()
context context.Context
storemanager store.Manager
messageChan chan []byte
listenErr chan error
lis net.Listener
}
func NewServer(conf conf.EventLogServerConf, log *logrus.Entry, storeManager store.Manager, listenErr chan error) *EventLogRPCServer {
ctx, cancel := context.WithCancel(context.Background())
return &EventLogRPCServer{
conf: conf,
log: log,
storemanager: storeManager,
context: ctx,
cancel: cancel,
messageChan: storeManager.ReceiveMessageChan(),
listenErr: listenErr,
}
}
func (s *EventLogRPCServer) Start() error {
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.conf.BindIP, s.conf.BindPort))
if err != nil {
logrus.Errorf("failed to listen: %v", err)
return err
}
s.lis = lis
kaServerParams := keepalive.ServerParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
}
kaEnforcementPolicy := keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}
server := grpc.NewServer(
grpc.KeepaliveParams(kaServerParams),
grpc.KeepaliveEnforcementPolicy(kaEnforcementPolicy),
)
pb.RegisterEventLogServer(server, s)
reflection.Register(server)
s.log.Infof("event message grpc server listen %s:%d", s.conf.BindIP, s.conf.BindPort)
if err := server.Serve(lis); err != nil {
s.log.Error("event log api grpc listen error.", err.Error())
s.listenErr <- err
}
return nil
}
func (s *EventLogRPCServer) Stop() {
s.cancel()
}
func (s *EventLogRPCServer) Log(stream pb.EventLog_LogServer) error {
for {
select {
case <-s.context.Done():
if err := stream.SendAndClose(&pb.Reply{Status: "success", Message: "server closed"}); err != nil {
return err
}
return nil
default:
}
log, err := stream.Recv()
if err != nil {
if err == io.EOF {
s.log.Error("receive log error:", err.Error())
if err := stream.SendAndClose(&pb.Reply{Status: "success"}); err != nil {
return err
}
return nil
}
return err
}
select {
case s.messageChan <- log.Log:
default:
}
}
}