package handler
import (
"context"
"fmt"
v2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
apisixversioned "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
apimodel "github.com/goodrain/rainbond/api/model"
apiutil "github.com/goodrain/rainbond/api/util"
"github.com/goodrain/rainbond/api/util/bcode"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/mq/client"
"github.com/goodrain/rainbond/pkg/component/k8s"
"github.com/goodrain/rainbond/pkg/component/mq"
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/appm/controller"
"github.com/jinzhu/gorm"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"os"
v1 "sigs.k8s.io/gateway-api/apis/v1"
"sigs.k8s.io/gateway-api/apis/v1beta1"
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1beta1"
"sort"
"strconv"
"strings"
)
type GatewayAction struct {
dbmanager db.Manager
mqclient client.MQClient
gatewayClient *gateway.GatewayV1beta1Client
kubeClient kubernetes.Interface
kubeClientset *kubernetes.Clientset
config *rest.Config
apisixClient *apisixversioned.Clientset
}
func CreateGatewayManager() *GatewayAction {
return &GatewayAction{
dbmanager: db.GetManager(),
mqclient: mq.Default().MqClient,
gatewayClient: k8s.Default().GatewayClient,
kubeClient: k8s.Default().Clientset,
kubeClientset: k8s.Default().Clientset,
config: k8s.Default().RestConfig,
apisixClient: k8s.Default().ApiSixClient,
}
}
func (g *GatewayAction) GetClient() *apisixversioned.Clientset {
return g.apisixClient
}
func (g *GatewayAction) GetK8sClient() kubernetes.Interface {
return g.kubeClient
}
func (g *GatewayAction) CreateCert(namespace, domain string) error {
secretName := strings.Replace(domain, ".", "-", -1)
cert, certKey, err := generateSelfSignedCertificate(domain)
if err != nil {
logrus.Errorf("Error generating self-signed certificate: %v", err)
return err
}
return createK8sSecret(g.kubeClientset, namespace, secretName, cert, certKey)
}
func (g *GatewayAction) BatchGetGatewayHTTPRoute(namespace, appID string) ([]*apimodel.GatewayHTTPRouteConcise, error) {
var httpRoutes []v1beta1.HTTPRoute
if appID != "" {
gatewayHTTPRoutes, err := g.gatewayClient.HTTPRoutes(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: "app_id=" + appID})
if err != nil {
logrus.Errorf("list http route by app_id = %v failure: %v", appID, err)
return nil, err
}
httpRoutes = gatewayHTTPRoutes.Items
} else {
gatewayHTTPRoutes, err := g.gatewayClient.HTTPRoutes(namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
logrus.Errorf("list http route failure: %v", err)
return nil, err
}
httpRoutes = gatewayHTTPRoutes.Items
}
var HTTPRouteConcise []*apimodel.GatewayHTTPRouteConcise
for _, httpRoute := range httpRoutes {
var gatewayName string
gatewayNamespace := namespace
if httpRoute.Spec.ParentRefs != nil {
gatewayName = string(httpRoute.Spec.ParentRefs[0].Name)
if httpRoute.Spec.ParentRefs[0].Namespace != nil {
gatewayNamespace = string(*httpRoute.Spec.ParentRefs[0].Namespace)
}
}
var hosts []string
if httpRoute.Spec.Hostnames != nil {
for _, hostname := range httpRoute.Spec.Hostnames {
hosts = append(hosts, string(hostname))
}
}
var id string
if httpRoute.Labels != nil {
id = httpRoute.Labels["app_id"]
}
HTTPRouteConcise = append(HTTPRouteConcise, &apimodel.GatewayHTTPRouteConcise{
Name: httpRoute.Name,
Hosts: hosts,
GatewayName: gatewayName,
GatewayNamespace: gatewayNamespace,
AppID: id,
})
}
return HTTPRouteConcise, nil
}
func (g *GatewayAction) AddGatewayCertificate(req *apimodel.GatewayCertificate) error {
secret := &corev1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: apimodel.Secret,
APIVersion: controller.APIVersionSecret,
},
ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
},
Data: map[string][]byte{
"tls.crt": []byte(req.Certificate),
"tls.key": []byte(req.PrivateKey),
},
Type: corev1.SecretTypeTLS,
}
_, err := g.kubeClient.CoreV1().Secrets(req.Namespace).Create(context.Background(), secret, metav1.CreateOptions{})
if err != nil {
if k8serror.IsAlreadyExists(err) {
_, err = g.kubeClient.CoreV1().Secrets(req.Namespace).Update(context.Background(), secret, metav1.UpdateOptions{})
if err != nil {
logrus.Errorf("update gateway certificate secret failure: %v", err)
return err
}
}
logrus.Errorf("add gateway certificate secret failure: %v", err)
return err
}
return nil
}
func (g *GatewayAction) UpdateGatewayCertificate(req *apimodel.GatewayCertificate) error {
secret, err := g.kubeClient.CoreV1().Secrets(req.Namespace).Get(context.Background(), req.Name, metav1.GetOptions{})
if err != nil {
if k8serror.IsNotFound(err) {
secret, err = g.kubeClient.CoreV1().Secrets(req.Namespace).Create(context.Background(), &corev1.Secret{
TypeMeta: metav1.TypeMeta{
Kind: apimodel.Secret,
APIVersion: controller.APIVersionSecret,
},
ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
},
Data: map[string][]byte{
"tls.crt": []byte(req.Certificate),
"tls.key": []byte(req.PrivateKey),
},
Type: corev1.SecretTypeTLS,
}, metav1.CreateOptions{})
if err != nil {
logrus.Errorf("get gateway certificate secret, add failure: %v", err)
return err
}
return nil
}
logrus.Errorf("update gateway certificate secret, get failure: %v", err)
return err
}
certificate := make(map[string][]byte)
certificate["tls.crt"] = []byte(req.Certificate)
certificate["tls.key"] = []byte(req.PrivateKey)
secret.Data = certificate
secret, err = g.kubeClient.CoreV1().Secrets(req.Namespace).Update(context.Background(), secret, metav1.UpdateOptions{})
if err != nil {
logrus.Errorf("update gateway certificate secret, update failure: %v", err)
return err
}
hosts, err := apiutil.GetCertificateDomains(secret)
if err != nil {
logrus.Errorf("get certificate domains failure: %v", err)
return err
}
if err := apiutil.CheckDomainConflict(context.Background(), hosts, req.Namespace, req.Name); err != nil {
logrus.Errorf("domain conflict detected: %s", err.Error())
return err
}
oldApisixTls, err := g.apisixClient.ApisixV2().ApisixTlses(req.Namespace).Get(context.Background(), req.Name, metav1.GetOptions{})
if err != nil {
if k8serror.IsNotFound(err) {
_, err = g.apisixClient.ApisixV2().ApisixTlses(req.Namespace).Create(context.Background(), &v2.ApisixTls{
TypeMeta: metav1.TypeMeta{
Kind: apiutil.ApisixTLS,
APIVersion: apiutil.APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
},
Spec: &v2.ApisixTlsSpec{
IngressClassName: "apisix",
Hosts: hosts,
Secret: v2.ApisixSecret{
Name: req.Name,
Namespace: req.Namespace,
},
},
}, metav1.CreateOptions{})
if err != nil {
logrus.Errorf("update gateway certificate apisix tls, add failure: %v", err)
return err
}
return nil
}
logrus.Errorf("update gateway certificate apisix tls, get failure: %v", err)
return err
}
oldApisixTls.Spec.Hosts = hosts
_, err = g.apisixClient.ApisixV2().ApisixTlses(req.Namespace).Update(context.Background(), oldApisixTls, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (g *GatewayAction) DeleteGatewayCertificate(name, namespace string) error {
err := g.kubeClient.CoreV1().Secrets(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
if err != nil && !k8serror.IsNotFound(err) {
logrus.Errorf("delete gateway certificate secret failure: %v", err)
return err
}
err = g.apisixClient.ApisixV2().ApisixTlses(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
if err != nil && !k8serror.IsNotFound(err) {
logrus.Errorf("delete gateway certificate apisix tls failure: %v", err)
return err
}
return nil
}
func handleGatewayRules(req *apimodel.GatewayHTTPRouteStruct) []v1.HTTPRouteRule {
var rules []v1.HTTPRouteRule
for _, rule := range req.Rules {
var (
backendRefs []v1.HTTPBackendRef
matches []v1.HTTPRouteMatch
filters []v1.HTTPRouteFilter
)
if rule.MatchesRules != nil {
for _, match := range rule.MatchesRules {
var httpRouteMatch v1.HTTPRouteMatch
if path := match.Path; path != nil {
pathType := v1.PathMatchType(path.Type)
value := path.Value
httpRouteMatch.Path = &v1.HTTPPathMatch{
Type: &pathType,
Value: &value,
}
}
if headers := match.Headers; headers != nil {
for _, header := range headers {
headerType := v1.HeaderMatchType(header.Type)
httpRouteMatch.Headers = append(httpRouteMatch.Headers, v1.HTTPHeaderMatch{
Name: v1.HTTPHeaderName(header.Name),
Type: &headerType,
Value: header.Value,
})
}
}
matches = append(matches, httpRouteMatch)
}
}
if rule.BackendRefsRules != nil {
for _, backendRef := range rule.BackendRefsRules {
var group v1.Group
if backendRef.Kind == apimodel.HTTPRoute {
group = v1.GroupName
}
kind := v1.Kind(backendRef.Kind)
namespace := v1.Namespace(backendRef.Namespace)
var port *v1.PortNumber
if backendRef.Port != 0 {
p := v1.PortNumber(backendRef.Port)
port = &p
}
weight := int32(backendRef.Weight)
backendRefs = append(backendRefs, v1.HTTPBackendRef{
BackendRef: v1.BackendRef{
BackendObjectReference: v1.BackendObjectReference{
Group: &group,
Kind: &kind,
Name: v1.ObjectName(backendRef.Name),
Namespace: &namespace,
Port: port,
},
Weight: &weight,
},
})
}
}
if rule.FiltersRules != nil {
for _, filter := range rule.FiltersRules {
var httpRoutefilter v1.HTTPRouteFilter
if filter.RequestHeaderModifier != nil {
var setHTTPHeader []v1.HTTPHeader
var addHTTPHeader []v1.HTTPHeader
if filter.RequestHeaderModifier.Set != nil {
for _, set := range filter.RequestHeaderModifier.Set {
setHTTPHeader = append(setHTTPHeader, v1.HTTPHeader{
Name: v1.HTTPHeaderName(set.Name),
Value: set.Value,
})
}
}
if filter.RequestHeaderModifier.Add != nil {
for _, add := range filter.RequestHeaderModifier.Add {
addHTTPHeader = append(addHTTPHeader, v1.HTTPHeader{
Name: v1.HTTPHeaderName(add.Name),
Value: add.Value,
})
}
}
httpRoutefilter.RequestHeaderModifier = &v1.HTTPHeaderFilter{
Set: setHTTPHeader,
Add: addHTTPHeader,
Remove: filter.RequestHeaderModifier.Remove,
}
}
if filter.RequestRedirect != nil {
scheme := filter.RequestRedirect.Scheme
hostname := v1.PreciseHostname(filter.RequestRedirect.Hostname)
var port *v1.PortNumber
var sc *int
if v1.PortNumber(filter.RequestRedirect.Port) != 0 {
p := v1.PortNumber(filter.RequestRedirect.Port)
port = &p
}
if filter.RequestRedirect.StatusCode != 0 {
s := filter.RequestRedirect.StatusCode
sc = &s
}
httpRoutefilter.RequestRedirect = &v1.HTTPRequestRedirectFilter{
Scheme: &scheme,
Hostname: &hostname,
Port: port,
StatusCode: sc,
}
}
httpRoutefilter.Type = v1.HTTPRouteFilterType(filter.Type)
filters = append(filters, httpRoutefilter)
}
}
rule := v1.HTTPRouteRule{
Matches: matches,
BackendRefs: backendRefs,
Filters: filters,
}
rules = append(rules, rule)
}
return rules
}
func (g *GatewayAction) AddGatewayHTTPRoute(req *apimodel.GatewayHTTPRouteStruct) (*model.K8sResource, error) {
gatewayNamespace := v1.Namespace(req.GatewayNamespace)
var hosts []v1.Hostname
for _, host := range req.Hosts {
hosts = append(hosts, v1.Hostname(host))
}
rules := handleGatewayRules(req)
labels := make(map[string]string)
labels["app_id"] = req.AppID
var sectionName *v1.SectionName
if req.SectionName != "" {
sn := v1.SectionName(req.SectionName)
sectionName = &sn
}
httpRoute, err := g.gatewayClient.HTTPRoutes(req.Namespace).Create(context.Background(), &v1beta1.HTTPRoute{
TypeMeta: metav1.TypeMeta{
Kind: apimodel.HTTPRoute,
APIVersion: controller.APIVersionHTTPRoute,
},
ObjectMeta: metav1.ObjectMeta{
Name: req.Name,
Namespace: req.Namespace,
Labels: labels,
},
Spec: v1.HTTPRouteSpec{
CommonRouteSpec: v1.CommonRouteSpec{
ParentRefs: []v1.ParentReference{{
Name: v1.ObjectName(req.GatewayName),
Namespace: &gatewayNamespace,
SectionName: sectionName,
}},
},
Hostnames: hosts,
Rules: rules,
},
}, metav1.CreateOptions{})
if err != nil {
logrus.Errorf("create gateway http route %v failure: %v", req.Name, err)
return nil, err
}
httpRoute.Kind = apimodel.HTTPRoute
httpRoute.APIVersion = controller.APIVersionHTTPRoute
httpRouteYaml, err := ObjectToJSONORYaml("yaml", &httpRoute)
if err != nil {
logrus.Errorf("create gateway http route object to yaml failure: %v", err)
return nil, err
}
k8sresource := []*model.K8sResource{{
AppID: req.AppID,
Name: req.Name,
Kind: apimodel.HTTPRoute,
Content: httpRouteYaml,
ErrorOverview: "创建成功",
State: apimodel.CreateSuccess,
}}
err = db.GetManager().K8sResourceDao().CreateK8sResource(k8sresource)
if err != nil {
logrus.Errorf("database operation gateway http route create k8s resource failure: %v", err)
return nil, err
}
return k8sresource[0], nil
}
func (g *GatewayAction) GetGatewayHTTPRoute(name, namespace string) (*apimodel.GatewayHTTPRouteStruct, error) {
var req apimodel.GatewayHTTPRouteStruct
route, err := g.gatewayClient.HTTPRoutes(namespace).Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("get gateway route failure: %v", err)
return nil, err
}
var gatewayName, gatewayNamespace, sectionName string
if route.Spec.ParentRefs != nil {
gatewayName = string(route.Spec.ParentRefs[0].Name)
if route.Spec.ParentRefs[0].Namespace != nil {
gatewayNamespace = string(*route.Spec.ParentRefs[0].Namespace)
}
if route.Spec.ParentRefs[0].SectionName != nil {
sectionName = string(*route.Spec.ParentRefs[0].SectionName)
}
}
var hosts []string
if route.Spec.Hostnames != nil {
for _, hostname := range route.Spec.Hostnames {
hosts = append(hosts, string(hostname))
}
}
var rules []*apimodel.Rules
for _, rule := range route.Spec.Rules {
var matchesRules []*apimodel.MatchesRule
var backendRefsRules []*apimodel.BackendRefsRule
var filtersRules []*apimodel.FiltersRule
if rule.Matches != nil {
for _, match := range rule.Matches {
var path apimodel.MatchesRulePath
var headers []*apimodel.MatchesRuleHeader
if match.Headers != nil {
for _, header := range match.Headers {
var ty string
if header.Type != nil {
ty = string(*header.Type)
}
headers = append(headers, &apimodel.MatchesRuleHeader{
Name: string(header.Name),
Type: ty,
Value: header.Value,
})
}
}
if match.Path != nil {
var ty, value string
if match.Path.Type != nil {
ty = string(*match.Path.Type)
}
if match.Path.Value != nil {
value = *match.Path.Value
}
path.Type = ty
path.Value = value
}
matchesRules = append(matchesRules, &apimodel.MatchesRule{
Path: &path,
Headers: headers,
})
}
}
if rule.Filters != nil {
for _, filter := range rule.Filters {
var filterRule apimodel.FiltersRule
filterRule.Type = string(filter.Type)
if filter.RequestHeaderModifier != nil {
var setHTTPHeader []*apimodel.HTTPHeader
var addHTTPHeader []*apimodel.HTTPHeader
var remove []string
if filter.RequestHeaderModifier.Add != nil {
for _, add := range filter.RequestHeaderModifier.Add {
addHTTPHeader = append(addHTTPHeader, &apimodel.HTTPHeader{
Name: string(add.Name),
Value: add.Value,
})
}
}
if filter.RequestHeaderModifier.Set != nil {
for _, set := range filter.RequestHeaderModifier.Set {
setHTTPHeader = append(setHTTPHeader, &apimodel.HTTPHeader{
Name: string(set.Name),
Value: set.Value,
})
}
}
if filter.RequestHeaderModifier.Remove != nil {
for _, re := range filter.RequestHeaderModifier.Remove {
remove = append(remove, re)
}
}
filterRule.RequestHeaderModifier = &apimodel.HTTPHeaderFilter{
Set: setHTTPHeader,
Add: addHTTPHeader,
Remove: remove,
}
}
if filter.RequestRedirect != nil {
var hostname, scheme string
var statusCode, port int
if filter.RequestRedirect.Hostname != nil {
hostname = string(*filter.RequestRedirect.Hostname)
}
if filter.RequestRedirect.Scheme != nil {
scheme = *filter.RequestRedirect.Scheme
}
if filter.RequestRedirect.StatusCode != nil {
statusCode = *filter.RequestRedirect.StatusCode
}
if filter.RequestRedirect.Port != nil {
port = int(*filter.RequestRedirect.Port)
}
filterRule.RequestRedirect = &apimodel.HTTPRequestRedirectFilter{
Scheme: scheme,
Hostname: hostname,
Port: port,
StatusCode: statusCode,
}
}
filtersRules = append(filtersRules, &filterRule)
}
}
if rule.BackendRefs != nil {
for _, backendRef := range rule.BackendRefs {
weight := 100
kind := apimodel.Service
if backendRef.Weight != nil {
weight = int(*backendRef.Weight)
}
if backendRef.Kind != nil {
kind = string(*backendRef.Kind)
}
namespace := namespace
if backendRef.Namespace != nil {
namespace = string(*backendRef.Namespace)
}
var port int
if backendRef.Port != nil {
port = int(*backendRef.Port)
}
backendRefsRules = append(backendRefsRules, &apimodel.BackendRefsRule{
Name: string(backendRef.Name),
Weight: weight,
Kind: kind,
Namespace: namespace,
Port: port,
})
}
}
rules = append(rules, &apimodel.Rules{
MatchesRules: matchesRules,
BackendRefsRules: backendRefsRules,
FiltersRules: filtersRules,
})
}
var id string
if route.Labels != nil {
id = route.Labels["app_id"]
}
req.Hosts = hosts
req.AppID = id
req.GatewayName = gatewayName
req.GatewayNamespace = gatewayNamespace
req.Name = name
req.SectionName = sectionName
req.Namespace = namespace
req.Rules = rules
return &req, nil
}
func (g *GatewayAction) UpdateGatewayHTTPRoute(req *apimodel.GatewayHTTPRouteStruct) (*model.K8sResource, error) {
rules := handleGatewayRules(req)
gatewayNamespace := v1.Namespace(req.GatewayNamespace)
var hosts []v1.Hostname
for _, host := range req.Hosts {
hosts = append(hosts, v1.Hostname(host))
}
httpRoute, err := g.gatewayClient.HTTPRoutes(req.Namespace).Get(context.Background(), req.Name, metav1.GetOptions{})
if err != nil {
logrus.Errorf("update gateway http route get failure: %v", err)
return nil, err
}
var sectionName *v1.SectionName
if req.SectionName != "" {
sn := v1.SectionName(req.SectionName)
sectionName = &sn
}
httpRoute.Spec.Hostnames = hosts
httpRoute.Spec.ParentRefs = []v1.ParentReference{{
Name: v1.ObjectName(req.GatewayName),
Namespace: &gatewayNamespace,
SectionName: sectionName,
}}
httpRoute.Spec.Rules = rules
newHTTPRoute, err := g.gatewayClient.HTTPRoutes(req.Namespace).Update(context.Background(), httpRoute, metav1.UpdateOptions{})
if err != nil {
logrus.Errorf("update gateway http route update failure: %v", err)
return nil, err
}
newHTTPRoute.Kind = apimodel.HTTPRoute
newHTTPRoute.APIVersion = controller.APIVersionHTTPRoute
httpRouteYaml, err := ObjectToJSONORYaml("yaml", &newHTTPRoute)
if err != nil {
logrus.Errorf("update gateway http route object to yaml failure: %v", err)
return nil, err
}
res, err := db.GetManager().K8sResourceDao().GetK8sResourceByName(req.AppID, req.Name, apimodel.HTTPRoute)
res.ErrorOverview = "更新成功"
res.Content = httpRouteYaml
res.State = apimodel.UpdateSuccess
err = db.GetManager().K8sResourceDao().UpdateModel(&res)
if err != nil {
logrus.Errorf("database operation gateway http route update k8s resource failure: %v", err)
return nil, err
}
return &res, nil
}
func (g *GatewayAction) DeleteGatewayHTTPRoute(name, namespace, appID string) error {
err := g.gatewayClient.HTTPRoutes(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
if err != nil {
logrus.Errorf("delete gateway http route failure: %v", err)
return err
}
err = db.GetManager().K8sResourceDao().DeleteK8sResource(appID, name, apimodel.HTTPRoute)
if err != nil {
logrus.Errorf("database operation gateway http route delete k8s resource failure: %v", err)
return err
}
return nil
}
func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
if err := g.CreateHTTPRule(tx, req); err != nil {
return err
}
err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
})
if err != nil {
return fmt.Errorf("send http rule task: %v", err)
}
return nil
})
}
func (g *GatewayAction) CreateHTTPRule(tx *gorm.DB, req *apimodel.AddHTTPRuleStruct) error {
httpRule := &model.HTTPRule{
UUID: req.HTTPRuleID,
ServiceID: req.ServiceID,
ContainerPort: req.ContainerPort,
Domain: req.Domain,
Path: func() string {
if !strings.HasPrefix(req.Path, "/") {
return "/" + req.Path
}
return req.Path
}(),
Header: req.Header,
Cookie: req.Cookie,
Weight: req.Weight,
IP: req.IP,
CertificateID: req.CertificateID,
PathRewrite: req.PathRewrite,
}
if err := db.GetManager().HTTPRuleDaoTransactions(tx).AddModel(httpRule); err != nil {
return fmt.Errorf("create http rule: %v", err)
}
if len(req.Rewrites) > 0 {
for _, rewrite := range req.Rewrites {
r := &model.HTTPRuleRewrite{
UUID: util.NewUUID(),
HTTPRuleID: httpRule.UUID,
Regex: rewrite.Regex,
Replacement: rewrite.Replacement,
Flag: rewrite.Flag,
}
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).AddModel(r); err != nil {
return fmt.Errorf("create http rule rewrite: %v", err)
}
}
}
if strings.Replace(req.CertificateID, " ", "", -1) != "" {
cert := &model.Certificate{
UUID: req.CertificateID,
CertificateName: fmt.Sprintf("cert-%s", util.NewUUID()[0:8]),
Certificate: req.Certificate,
PrivateKey: req.PrivateKey,
}
if err := db.GetManager().CertificateDaoTransactions(tx).AddOrUpdate(cert); err != nil {
return fmt.Errorf("create or update http rule: %v", err)
}
}
for _, ruleExtension := range req.RuleExtensions {
re := &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: httpRule.UUID,
Key: ruleExtension.Key,
Value: ruleExtension.Value,
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
return fmt.Errorf("create rule extensions: %v", err)
}
}
return nil
}
func (g *GatewayAction) UpdateHTTPRule(req *apimodel.UpdateHTTPRuleStruct) error {
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
rule, err := g.dbmanager.HTTPRuleDaoTransactions(tx).GetHTTPRuleByID(req.HTTPRuleID)
if err != nil {
tx.Rollback()
return err
}
if rule == nil || rule.UUID == "" {
tx.Rollback()
return fmt.Errorf("HTTPRule dosen't exist based on uuid(%s)", req.HTTPRuleID)
}
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).DeleteByHTTPRuleID(rule.UUID); err != nil {
tx.Rollback()
return err
}
if len(req.Rewrites) > 0 {
for _, rewrite := range req.Rewrites {
r := &model.HTTPRuleRewrite{
UUID: util.NewUUID(),
HTTPRuleID: rule.UUID,
Regex: rewrite.Regex,
Replacement: rewrite.Replacement,
Flag: rewrite.Flag,
}
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).AddModel(r); err != nil {
tx.Rollback()
return err
}
}
}
if strings.Replace(req.CertificateID, " ", "", -1) != "" {
cert := &model.Certificate{
UUID: req.CertificateID,
Certificate: req.Certificate,
PrivateKey: req.PrivateKey,
}
if err := g.dbmanager.CertificateDaoTransactions(tx).AddOrUpdate(cert); err != nil {
tx.Rollback()
return err
}
rule.CertificateID = req.CertificateID
} else {
rule.CertificateID = ""
}
if len(req.RuleExtensions) > 0 {
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(rule.UUID); err != nil {
tx.Rollback()
return err
}
for _, ruleExtension := range req.RuleExtensions {
re := &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: rule.UUID,
Key: ruleExtension.Key,
Value: ruleExtension.Value,
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
tx.Rollback()
return err
}
}
}
if req.ServiceID != "" {
rule.ServiceID = req.ServiceID
}
if req.ContainerPort != 0 {
rule.ContainerPort = req.ContainerPort
}
if req.Domain != "" {
rule.Domain = req.Domain
}
rule.Path = func() string {
if !strings.HasPrefix(req.Path, "/") {
return "/" + req.Path
}
return req.Path
}()
rule.Header = req.Header
rule.Cookie = req.Cookie
rule.Weight = req.Weight
rule.PathRewrite = req.PathRewrite
if req.IP != "" {
rule.IP = req.IP
}
if err := db.GetManager().HTTPRuleDaoTransactions(tx).UpdateModel(rule); err != nil {
tx.Rollback()
return err
}
if err := tx.Commit().Error; err != nil {
tx.Rollback()
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": rule.ServiceID,
"action": "update-http-rule",
"limit": map[string]string{"domain": req.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
func (g *GatewayAction) DeleteHTTPRule(req *apimodel.DeleteHTTPRuleStruct) error {
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
httpRule, err := g.dbmanager.HTTPRuleDaoTransactions(tx).GetHTTPRuleByID(req.HTTPRuleID)
if err != nil {
tx.Rollback()
return err
}
svcID := httpRule.ServiceID
if err := g.dbmanager.HTTPRuleDaoTransactions(tx).DeleteHTTPRuleByID(httpRule.UUID); err != nil {
tx.Rollback()
return err
}
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).DeleteByHTTPRuleID(httpRule.UUID); err != nil {
tx.Rollback()
return err
}
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(httpRule.UUID); err != nil {
tx.Rollback()
return err
}
if err := tx.Commit().Error; err != nil {
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": svcID,
"action": "delete-http-rule",
"limit": map[string]string{"domain": httpRule.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
func (g *GatewayAction) DeleteHTTPRuleByServiceIDWithTransaction(sid string, tx *gorm.DB) error {
rules, err := g.dbmanager.HTTPRuleDaoTransactions(tx).ListByServiceID(sid)
if err != nil {
return err
}
for _, rule := range rules {
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).DeleteByHTTPRuleID(rule.UUID); err != nil {
return err
}
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(rule.UUID); err != nil {
return err
}
if err := g.dbmanager.GwRuleConfigDaoTransactions(tx).DeleteByRuleID(rule.UUID); err != nil {
return err
}
if err := g.dbmanager.HTTPRuleDaoTransactions(tx).DeleteHTTPRuleByID(rule.UUID); err != nil {
return err
}
}
return nil
}
func (g *GatewayAction) AddCertificate(req *apimodel.AddHTTPRuleStruct, tx *gorm.DB) error {
cert := &model.Certificate{
UUID: req.CertificateID,
CertificateName: fmt.Sprintf("cert-%s", util.NewUUID()[0:8]),
Certificate: req.Certificate,
PrivateKey: req.PrivateKey,
}
return g.dbmanager.CertificateDaoTransactions(tx).AddModel(cert)
}
func (g *GatewayAction) UpdateCertificate(req apimodel.AddHTTPRuleStruct, httpRule *model.HTTPRule,
tx *gorm.DB) error {
cert, err := g.dbmanager.CertificateDaoTransactions(tx).GetCertificateByID(req.CertificateID)
if err != nil {
return err
}
if cert == nil {
return fmt.Errorf("certificate doesn't exist based on certificateID(%s)", req.CertificateID)
}
cert.CertificateName = fmt.Sprintf("cert-%s", util.NewUUID()[0:8])
cert.Certificate = req.Certificate
cert.PrivateKey = req.PrivateKey
return g.dbmanager.CertificateDaoTransactions(tx).UpdateModel(cert)
}
func (g *GatewayAction) AddTCPRule(req *apimodel.AddTCPRuleStruct) error {
return g.dbmanager.DB().Transaction(func(tx *gorm.DB) error {
if err := g.CreateTCPRule(tx, req); err != nil {
return err
}
err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", req.IP, req.Port)},
})
if err != nil {
return fmt.Errorf("send tcp rule task: %v", err)
}
return nil
})
}
func (g *GatewayAction) CreateTCPRule(tx *gorm.DB, req *apimodel.AddTCPRuleStruct) error {
tcpRule := &model.TCPRule{
UUID: req.TCPRuleID,
ServiceID: req.ServiceID,
ContainerPort: req.ContainerPort,
IP: req.IP,
Port: req.Port,
}
if err := g.dbmanager.TCPRuleDaoTransactions(tx).AddModel(tcpRule); err != nil {
return err
}
for _, ruleExtension := range req.RuleExtensions {
re := &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: tcpRule.UUID,
Value: ruleExtension.Value,
}
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
return err
}
}
return nil
}
func (g *GatewayAction) UpdateTCPRule(req *apimodel.UpdateTCPRuleStruct, minPort int) error {
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
tcpRule, err := g.dbmanager.TCPRuleDaoTransactions(tx).GetTCPRuleByID(req.TCPRuleID)
if err != nil {
tx.Rollback()
return err
}
if len(req.RuleExtensions) > 0 {
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(tcpRule.UUID); err != nil {
logrus.Debugf("TCP rule id: %s;error delete rule extension: %v", tcpRule.UUID, err)
tx.Rollback()
return err
}
for _, ruleExtension := range req.RuleExtensions {
re := &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: tcpRule.UUID,
Value: ruleExtension.Value,
}
if err := g.dbmanager.RuleExtensionDaoTransactions(tx).AddModel(re); err != nil {
tx.Rollback()
logrus.Debugf("TCP rule id: %s;error add rule extension: %v", tcpRule.UUID, err)
return err
}
}
}
if req.ContainerPort != 0 {
tcpRule.ContainerPort = req.ContainerPort
}
if req.IP != "" {
tcpRule.IP = req.IP
}
tcpRule.Port = req.Port
if req.ServiceID != "" {
tcpRule.ServiceID = req.ServiceID
}
if err := g.dbmanager.TCPRuleDaoTransactions(tx).UpdateModel(tcpRule); err != nil {
logrus.Debugf("TCP rule id: %s;error updating tcp rule: %v", tcpRule.UUID, err)
tx.Rollback()
return err
}
if err := tx.Commit().Error; err != nil {
tx.Rollback()
logrus.Debugf("TCP rule id: %s;error end transaction %v", tcpRule.UUID, err)
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": tcpRule.ServiceID,
"action": "update-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", tcpRule.IP, tcpRule.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
func (g *GatewayAction) DeleteTCPRule(req *apimodel.DeleteTCPRuleStruct) error {
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
tcpRule, err := db.GetManager().TCPRuleDaoTransactions(tx).GetTCPRuleByID(req.TCPRuleID)
if err != nil {
tx.Rollback()
return err
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(tcpRule.UUID); err != nil {
tx.Rollback()
return err
}
if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByID(tcpRule.UUID); err != nil {
tx.Rollback()
return err
}
err = db.GetManager().TenantServiceLBMappingPortDaoTransactions(tx).DELServiceLBMappingPortByServiceIDAndPort(
tcpRule.ServiceID, tcpRule.Port)
if err != nil {
tx.Rollback()
return err
}
if err := tx.Commit().Error; err != nil {
tx.Rollback()
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": tcpRule.ServiceID,
"action": "delete-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", tcpRule.IP, tcpRule.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
func (g *GatewayAction) DeleteTCPRuleByServiceIDWithTransaction(sid string, tx *gorm.DB) error {
rules, err := db.GetManager().TCPRuleDaoTransactions(tx).GetTCPRuleByServiceID(sid)
if err != nil {
return err
}
for _, rule := range rules {
if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteRuleExtensionByRuleID(rule.UUID); err != nil {
return err
}
if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByID(rule.UUID); err != nil {
return err
}
}
return nil
}
func (g *GatewayAction) AddRuleExtensions(ruleID string, ruleExtensions []*apimodel.RuleExtensionStruct,
tx *gorm.DB) error {
for _, ruleExtension := range ruleExtensions {
re := &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: ruleID,
Value: ruleExtension.Value,
}
err := g.dbmanager.RuleExtensionDaoTransactions(tx).AddModel(re)
if err != nil {
return err
}
}
return nil
}
func (g *GatewayAction) GetAvailablePort(ip string, lock bool) (int, error) {
roles, err := g.dbmanager.TCPRuleDao().GetUsedPortsByIP(ip)
if err != nil {
return 0, err
}
var ports []int
for _, p := range roles {
ports = append(ports, p.Port)
}
port := selectAvailablePort(ports)
if port != 0 {
return port, nil
}
return 0, fmt.Errorf("no more lb port can be use with ip %s", ip)
}
func selectAvailablePort(used []int) int {
maxPort, _ := strconv.Atoi(os.Getenv("MAX_LB_PORT"))
minPort, _ := strconv.Atoi(os.Getenv("MIN_LB_PORT"))
if minPort == 0 {
minPort = 30000
}
if maxPort == 0 {
maxPort = 65535
}
if len(used) == 0 {
return minPort
}
sort.Ints(used)
selectPort := minPort
for _, p := range used {
if p < minPort {
continue
}
if p == selectPort {
selectPort = selectPort + 1
continue
}
if p > selectPort {
if selectPort <= maxPort {
return selectPort
}
break
}
}
if selectPort <= maxPort {
return selectPort
}
return 0
}
func reassignConflictingTCPRulePorts(existingRules, incomingRules []*model.TCPRule) error {
usedPorts := make(map[int]struct{}, len(existingRules)+len(incomingRules))
for _, rule := range existingRules {
if rule == nil || rule.Port <= 0 {
continue
}
usedPorts[rule.Port] = struct{}{}
}
for _, rule := range incomingRules {
if rule == nil {
continue
}
if _, exists := usedPorts[rule.Port]; rule.Port <= 0 || exists {
originalPort := rule.Port
nextPort := selectAvailablePortFromSet(usedPorts)
if nextPort == 0 {
return fmt.Errorf("no more lb port can be used for service %s", rule.ServiceID)
}
rule.Port = nextPort
if originalPort > 0 {
logrus.Infof("tcp rule port %d for service %s is already used, reassign to %d", originalPort, rule.ServiceID, nextPort)
}
}
usedPorts[rule.Port] = struct{}{}
}
return nil
}
func selectAvailablePortFromSet(usedPorts map[int]struct{}) int {
used := make([]int, 0, len(usedPorts))
for port := range usedPorts {
used = append(used, port)
}
return selectAvailablePort(used)
}
func (g *GatewayAction) TCPIPPortExists(host string, port int) bool {
roles, _ := db.GetManager().TCPRuleDao().GetUsedPortsByIP(host)
for _, role := range roles {
if role.Port == port {
return true
}
}
return false
}
func (g *GatewayAction) SendTaskDeprecated(in map[string]interface{}) error {
sid := in["service_id"].(string)
service, err := db.GetManager().TenantServiceDao().GetServiceByID(sid)
if err != nil {
return fmt.Errorf("unexpected error occurred while getting Service by ServiceID(%s): %v", sid, err)
}
body := make(map[string]interface{})
body["deploy_version"] = service.DeployVersion
for k, v := range in {
body[k] = v
}
err = g.mqclient.SendBuilderTopic(client.TaskStruct{
Topic: client.WorkerTopic,
TaskType: "apply_rule",
TaskBody: body,
})
if err != nil {
return fmt.Errorf("unexpected error occurred while sending task: %v", err)
}
return nil
}
func (g *GatewayAction) SendTask(task *ComponentIngressTask) error {
err := g.mqclient.SendBuilderTopic(client.TaskStruct{
Topic: client.WorkerTopic,
TaskType: "apply_rule",
TaskBody: task,
})
if err != nil {
return errors.WithMessage(err, "send gateway task")
}
return nil
}
func (g *GatewayAction) RuleConfig(req *apimodel.RuleConfigReq) error {
var configs []*model.GwRuleConfig
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-connect-timeout",
Value: strconv.Itoa(req.Body.ProxyConnectTimeout),
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-send-timeout",
Value: strconv.Itoa(req.Body.ProxySendTimeout),
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-read-timeout",
Value: strconv.Itoa(req.Body.ProxyReadTimeout),
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-body-size",
Value: strconv.Itoa(req.Body.ProxyBodySize),
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-buffer-size",
Value: strconv.Itoa(req.Body.ProxyBufferSize) + "k",
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-buffer-numbers",
Value: strconv.Itoa(req.Body.ProxyBufferNumbers),
})
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: "proxy-buffering",
Value: req.Body.ProxyBuffering,
})
setheaders := make(map[string]string)
for _, item := range req.Body.SetHeaders {
if strings.TrimSpace(item.Key) == "" {
continue
}
if strings.TrimSpace(item.Value) == "" {
item.Value = "empty"
}
setheaders["set-header-"+item.Key] = item.Value
}
for k, v := range setheaders {
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: k,
Value: v,
})
}
responseHeaders := make(map[string]string)
for _, item := range req.Body.ResponseHeaders {
if strings.TrimSpace(item.Key) == "" {
continue
}
if strings.TrimSpace(item.Value) == "" {
item.Value = "empty"
}
responseHeaders["resp-header-"+item.Key] = item.Value
}
for k, v := range responseHeaders {
configs = append(configs, &model.GwRuleConfig{
RuleID: req.RuleID,
Key: k,
Value: v,
})
}
rule, err := g.dbmanager.HTTPRuleDao().GetHTTPRuleByID(req.RuleID)
if err != nil {
return err
}
tx := db.GetManager().Begin()
defer func() {
if r := recover(); r != nil {
logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
tx.Rollback()
}
}()
if err := g.dbmanager.GwRuleConfigDaoTransactions(tx).DeleteByRuleID(req.RuleID); err != nil {
tx.Rollback()
return err
}
for _, cfg := range configs {
if err := g.dbmanager.GwRuleConfigDaoTransactions(tx).AddModel(cfg); err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit().Error; err != nil {
tx.Rollback()
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "update-rule-config",
"event_id": req.EventID,
"limit": map[string]string{"domain": rule.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
func (g *GatewayAction) UpdCertificate(req *apimodel.UpdCertificateReq) error {
cert, err := db.GetManager().CertificateDao().GetCertificateByID(req.CertificateID)
if err != nil {
msg := "retrieve certificate: %v"
return fmt.Errorf(msg, err)
}
if cert == nil {
cert = &model.Certificate{
UUID: req.CertificateID,
CertificateName: req.CertificateName,
Certificate: req.Certificate,
PrivateKey: req.PrivateKey,
}
if err := db.GetManager().CertificateDao().AddModel(cert); err != nil {
msg := "update cert error :%s"
return fmt.Errorf(msg, err.Error())
}
return nil
}
cert.CertificateName = req.CertificateName
cert.Certificate = req.Certificate
cert.PrivateKey = req.PrivateKey
if err := db.GetManager().CertificateDao().UpdateModel(cert); err != nil {
msg := "update certificate: %v"
return fmt.Errorf(msg, err)
}
rules, err := g.ListHTTPRulesByCertID(req.CertificateID)
if err != nil {
msg := "certificate id: %s; list http rules: %v"
return fmt.Errorf(msg, req.CertificateID, err)
}
for _, rule := range rules {
eventID := util.NewUUID()
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": rule.ServiceID,
"action": "update-rule-config",
"event_id": eventID,
"limit": map[string]string{"domain": rule.Domain},
}); err != nil {
logrus.Warningf("send runtime message about gateway failure %v", err)
}
}
return nil
}
func (g *GatewayAction) ListHTTPRulesByCertID(certID string) ([]*model.HTTPRule, error) {
return db.GetManager().HTTPRuleDao().ListByCertID(certID)
}
type IPAndAvailablePort struct {
IP string `json:"ip"`
AvailablePort int `json:"available_port"`
}
func (g *GatewayAction) GetGatewayIPs() []IPAndAvailablePort {
defaultAvailablePort, _ := g.GetAvailablePort("0.0.0.0", false)
defaultIps := []IPAndAvailablePort{{
IP: "0.0.0.0",
AvailablePort: defaultAvailablePort,
}}
res, err := db.GetManager().KeyValueDao().WithPrefix("/rainbond/gateway/ips")
if err != nil || len(res) == 0 {
return defaultIps
}
var gatewayIps = make([]string, 0)
for _, v := range res {
gatewayIps = append(gatewayIps, v.V)
}
sort.Strings(gatewayIps)
for _, v := range gatewayIps {
availablePort, _ := g.GetAvailablePort(v, false)
defaultIps = append(defaultIps, IPAndAvailablePort{
IP: v,
AvailablePort: availablePort,
})
}
return defaultIps
}
func (g *GatewayAction) DeleteIngressRulesByComponentPort(tx *gorm.DB, componentID string, port int) error {
httpRuleIDs, err := g.listHTTPRuleIDs(componentID, port)
if err != nil {
return err
}
if err := db.GetManager().GwRuleConfigDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil {
return err
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteByRuleIDs(httpRuleIDs); err != nil {
return err
}
if err := db.GetManager().HTTPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil {
if !errors.Is(err, bcode.ErrIngressHTTPRuleNotFound) {
return err
}
}
if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByComponentPort(componentID, port); err != nil {
if !errors.Is(err, bcode.ErrIngressTCPRuleNotFound) {
return err
}
}
return nil
}
func (g *GatewayAction) listHTTPRuleIDs(componentID string, port int) ([]string, error) {
httpRules, err := db.GetManager().HTTPRuleDao().ListByComponentPort(componentID, port)
if err != nil {
return nil, err
}
var ruleIDs []string
for _, rule := range httpRules {
ruleIDs = append(ruleIDs, rule.UUID)
}
return ruleIDs, nil
}
func (g *GatewayAction) SyncHTTPRules(tx *gorm.DB, components []*apimodel.Component) error {
var (
componentIDs []string
httpRules []*model.HTTPRule
ruleExtensions []*model.RuleExtension
httpRuleRewrites []*model.HTTPRuleRewrite
)
for _, component := range components {
if len(component.HTTPRules) == 0 {
continue
}
componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
for _, httpRule := range component.HTTPRules {
httpRules = append(httpRules, httpRule.DbModel(component.ComponentBase.ComponentID))
for _, rewrite := range httpRule.Rewrites {
httpRuleRewrites = append(httpRuleRewrites, &model.HTTPRuleRewrite{
UUID: util.NewUUID(),
HTTPRuleID: httpRule.HTTPRuleID,
Regex: rewrite.Regex,
Replacement: rewrite.Replacement,
Flag: rewrite.Flag,
})
}
for _, ext := range httpRule.RuleExtensions {
ruleExtensions = append(ruleExtensions, &model.RuleExtension{
UUID: util.NewUUID(),
RuleID: httpRule.HTTPRuleID,
Key: ext.Key,
Value: ext.Value,
})
}
}
}
if err := g.syncHTTPRuleRewrites(tx, httpRules, httpRuleRewrites); err != nil {
return err
}
if err := g.syncRuleExtensions(tx, httpRules, ruleExtensions); err != nil {
return err
}
if err := db.GetManager().HTTPRuleDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
return err
}
return db.GetManager().HTTPRuleDaoTransactions(tx).CreateOrUpdateHTTPRuleInBatch(httpRules)
}
func (g *GatewayAction) syncHTTPRuleRewrites(tx *gorm.DB, httpRules []*model.HTTPRule, rewrites []*model.HTTPRuleRewrite) error {
var ruleIDs []string
for _, hr := range httpRules {
ruleIDs = append(ruleIDs, hr.UUID)
}
if err := db.GetManager().HTTPRuleRewriteDaoTransactions(tx).DeleteByHTTPRuleIDs(ruleIDs); err != nil {
return err
}
return db.GetManager().HTTPRuleRewriteDaoTransactions(tx).CreateOrUpdateHTTPRuleRewriteInBatch(rewrites)
}
func (g *GatewayAction) syncRuleExtensions(tx *gorm.DB, httpRules []*model.HTTPRule, exts []*model.RuleExtension) error {
var ruleIDs []string
for _, hr := range httpRules {
ruleIDs = append(ruleIDs, hr.UUID)
}
if err := db.GetManager().RuleExtensionDaoTransactions(tx).DeleteByRuleIDs(ruleIDs); err != nil {
return err
}
return db.GetManager().RuleExtensionDaoTransactions(tx).CreateOrUpdateRuleExtensionsInBatch(exts)
}
func (g *GatewayAction) SyncTCPRules(tx *gorm.DB, components []*apimodel.Component) error {
var (
componentIDs []string
tcpRules []*model.TCPRule
)
for _, component := range components {
if len(component.TCPRules) == 0 {
continue
}
componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
for _, tcpRule := range component.TCPRules {
tcpRules = append(tcpRules, tcpRule.DbModel(component.ComponentBase.ComponentID))
}
}
if len(tcpRules) == 0 {
return nil
}
if err := db.GetManager().TCPRuleDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
return err
}
existingTCPRules, err := db.GetManager().TCPRuleDaoTransactions(tx).GetUsedPortsByIP("0.0.0.0")
if err != nil {
return err
}
if err := reassignConflictingTCPRulePorts(existingTCPRules, tcpRules); err != nil {
return err
}
return db.GetManager().TCPRuleDaoTransactions(tx).CreateOrUpdateTCPRuleInBatch(tcpRules)
}
func (g *GatewayAction) SyncRuleConfigs(tx *gorm.DB, components []*apimodel.Component) error {
var configs []*model.GwRuleConfig
var componentIDs []string
for _, component := range components {
componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
if len(component.HTTPRuleConfigs) == 0 {
continue
}
for _, httpRuleConfig := range component.HTTPRuleConfigs {
configs = append(configs, httpRuleConfig.DbModel()...)
}
}
rules, err := db.GetManager().HTTPRuleDao().ListByComponentIDs(componentIDs)
if err != nil {
return err
}
var ruleIDs []string
for _, rule := range rules {
ruleIDs = append(ruleIDs, rule.UUID)
}
if err := db.GetManager().GwRuleConfigDaoTransactions(tx).DeleteByRuleIDs(ruleIDs); err != nil {
return err
}
return db.GetManager().GwRuleConfigDaoTransactions(tx).CreateOrUpdateGwRuleConfigsInBatch(configs)
}