package client
import (
"encoding/json"
"fmt"
"time"
"github.com/goodrain/rainbond/mq/api/grpc/pb"
"github.com/sirupsen/logrus"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)
var BuilderTopic = "builder"
var BuilderHealth = "builder-health"
var WindowsBuilderTopic = "windows_builder"
var WorkerTopic = "worker"
var WorkerHealth = "worker-health"
var SourceScanTopic = "source-scan"
type MQClient interface {
pb.TaskQueueClient
Close()
SendBuilderTopic(t TaskStruct) error
}
type mqClient struct {
pb.TaskQueueClient
ctx context.Context
cancel context.CancelFunc
}
func NewMqClient(mqAddr string) (MQClient, error) {
ctx, cancel := context.WithCancel(context.Background())
kaParams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}
backoffConfig := backoff.Config{
BaseDelay: 1 * time.Second,
Multiplier: 1.5,
Jitter: 0.2,
MaxDelay: 10 * time.Second,
}
conn, err := grpc.DialContext(ctx, mqAddr,
grpc.WithInsecure(),
grpc.WithKeepaliveParams(kaParams),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: 5 * time.Second,
}),
)
if err != nil {
cancel()
return nil, err
}
cli := pb.NewTaskQueueClient(conn)
client := &mqClient{
ctx: ctx,
cancel: cancel,
}
client.TaskQueueClient = cli
return client, nil
}
func (m *mqClient) Close() {
m.cancel()
}
type TaskStruct struct {
Topic string
Arch string
TaskType string
TaskBody interface{}
}
func buildTask(t TaskStruct) (*pb.EnqueueRequest, error) {
var er pb.EnqueueRequest
taskJSON, err := json.Marshal(t.TaskBody)
if err != nil {
logrus.Errorf("tran task json error")
return &er, err
}
er.Topic = t.Topic
er.Message = &pb.TaskMessage{
TaskType: t.TaskType,
CreateTime: time.Now().Format(time.RFC3339),
TaskBody: taskJSON,
User: "rainbond",
Arch: t.Arch,
}
return &er, nil
}
func (m *mqClient) SendBuilderTopic(t TaskStruct) error {
request, err := buildTask(t)
if err != nil {
return fmt.Errorf("create task body error %s", err.Error())
}
ctx, cancel := context.WithTimeout(m.ctx, time.Second*5)
defer cancel()
_, err = m.TaskQueueClient.Enqueue(ctx, request)
if err != nil {
return fmt.Errorf("send enqueue request error %s", err.Error())
}
return nil
}