// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package handler

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"os"
	"strconv"
	"strings"
	"time"

	apisixversioned "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
	"github.com/goodrain/rainbond/builder/sources/registry"
	"github.com/goodrain/rainbond/config/configs"
	"github.com/goodrain/rainbond/pkg/component/grpc"
	"github.com/goodrain/rainbond/pkg/component/hubregistry"
	"github.com/goodrain/rainbond/pkg/component/k8s"
	"github.com/goodrain/rainbond/pkg/component/mq"
	"github.com/goodrain/rainbond/pkg/component/prom"
	"github.com/goodrain/rainbond/util/constants"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/remotecommand"
	v1 "kubevirt.io/api/core/v1"
	"kubevirt.io/client-go/kubecli"

	"github.com/goodrain/rainbond/api/client/prometheus"
	apimodel "github.com/goodrain/rainbond/api/model"
	"github.com/goodrain/rainbond/api/util"
	"github.com/goodrain/rainbond/api/util/bcode"
	"github.com/goodrain/rainbond/builder/parser"
	"github.com/goodrain/rainbond/db"
	dberr "github.com/goodrain/rainbond/db/errors"
	dbmodel "github.com/goodrain/rainbond/db/model"
	"github.com/goodrain/rainbond/event"
	gclient "github.com/goodrain/rainbond/mq/client"
	"github.com/goodrain/rainbond/pkg/generated/clientset/versioned"
	core_util "github.com/goodrain/rainbond/util"
	"github.com/goodrain/rainbond/worker/client"
	"github.com/goodrain/rainbond/worker/discover/model"
	"github.com/goodrain/rainbond/worker/server"
	"github.com/goodrain/rainbond/worker/server/pb"
	"github.com/google/uuid"
	"github.com/jinzhu/gorm"
	"github.com/pkg/errors"
	"github.com/pquerna/ffjson/ffjson"
	"github.com/sirupsen/logrus"
	corev1 "k8s.io/api/core/v1"
	networkingv1 "k8s.io/api/networking/v1"
	rbacv1 "k8s.io/api/rbac/v1"
	k8sErrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apiserver/pkg/util/flushwriter"
	"k8s.io/client-go/kubernetes"
)

// ErrServiceNotClosed -
var ErrServiceNotClosed = errors.New("Service has not been closed")

// ServiceAction service act
type ServiceAction struct {
	MQClient       gclient.MQClient
	statusCli      *client.AppRuntimeSyncClient
	prometheusCli  prometheus.Interface
	rainbondClient versioned.Interface
	kubeClient     kubernetes.Interface
	kubevirtClient kubecli.KubevirtClient
	dbmanager      db.Manager
	registryCli    *registry.Registry
	config         *rest.Config
	apisixClient   *apisixversioned.Clientset
}

type dCfg struct {
	Type     string   `json:"type"`
	Servers  []string `json:"servers"`
	Key      string   `json:"key"`
	Username string   `json:"username"`
	Password string   `json:"password"`
}

// CreateManager create Manger
func CreateManager() *ServiceAction {
	return &ServiceAction{
		MQClient:       mq.Default().MqClient,
		statusCli:      grpc.Default().StatusClient,
		prometheusCli:  prom.Default().PrometheusCli,
		rainbondClient: k8s.Default().RainbondClient,
		apisixClient:   k8s.Default().ApiSixClient,
		kubeClient:     k8s.Default().Clientset,
		kubevirtClient: k8s.Default().KubevirtCli,
		dbmanager:      db.GetManager(),
		registryCli:    hubregistry.Default().RegistryCli,
		config:         k8s.Default().RestConfig,
	}
}

// ServiceBuild service build
func (s *ServiceAction) ServiceBuild(tenantID, serviceID string, r *apimodel.BuildServiceStruct) error {
	eventID := r.Body.EventID
	logger := event.GetManager().GetLogger(eventID)
	defer event.CloseManager()

	// 检查 chaos 服务健康状态
	if err := s.checkChaosHealth(); err != nil {
		logger.Error("chaos service is not ready: "+err.Error(), map[string]string{"step": "health-check", "status": "failure"})
		return errors.New("build service is not ready, please try again later")
	}

	service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	db.GetManager().TenantServiceDao().UpdateModel(service)
	if err != nil {
		return err
	}
	if r.Body.Kind == "" {
		r.Body.Kind = "source"
	}
	switch r.Body.Kind {
	case "build_from_image":
		if err := s.buildFromImage(r, service); err != nil {
			logger.Error("The image build application task failed to send: "+err.Error(), map[string]string{"step": "callback", "status": "failure"})
			return err
		}
		logger.Info("The mirror build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"})
		return nil
	case "build_from_source_code":
		if err := s.buildFromSourceCode(r, service); err != nil {
			logger.Error("The source code build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"})
			return err
		}
		logger.Info("The source code build application task successed to send ", map[string]string{"step": "source-service", "status": "starting"})
		return nil
	case "build_from_market_image":
		if err := s.buildFromImage(r, service); err != nil {
			logger.Error("The cloud image build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"})
			return err
		}
		logger.Info("The cloud image build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"})
		return nil
	case "build_from_market_slug":
		if err := s.buildFromMarketSlug(r, service); err != nil {
			logger.Error("The cloud slug build application task failed to send "+err.Error(), map[string]string{"step": "callback", "status": "failure"})
			return err
		}
		logger.Info("The cloud slug build application task successed to send ", map[string]string{"step": "image-service", "status": "starting"})
		return nil
	default:
		return fmt.Errorf("unexpect kind")
	}
}
func (s *ServiceAction) buildFromMarketSlug(r *apimodel.BuildServiceStruct, service *dbmodel.TenantServices) error {
	body := make(map[string]interface{})
	if r.Body.Operator == "" {
		body["operator"] = "define"
	} else {
		body["operator"] = r.Body.Operator
	}
	body["deploy_version"] = r.Body.DeployVersion
	body["event_id"] = r.Body.EventID
	body["action"] = r.Body.Action
	body["tenant_name"] = r.Body.TenantName
	body["tenant_id"] = service.TenantID
	body["service_id"] = service.ServiceID
	body["service_alias"] = r.Body.ServiceAlias
	body["slug_info"] = r.Body.SlugInfo

	topic := gclient.BuilderTopic
	if s.isWindowsService(service.ServiceID) {
		topic = gclient.WindowsBuilderTopic
	}
	return s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		Topic:    topic,
		TaskType: "build_from_market_slug",
		TaskBody: body,
	})
}

func (s *ServiceAction) buildFromImage(r *apimodel.BuildServiceStruct, service *dbmodel.TenantServices) error {
	dependIds, err := db.GetManager().TenantServiceRelationDao().GetTenantServiceRelations(service.ServiceID)
	if err != nil {
		return err
	}
	body := make(map[string]interface{})
	if r.Body.Operator == "" {
		body["operator"] = "define"
	} else {
		body["operator"] = r.Body.Operator
	}
	body["image"] = r.Body.ImageURL
	body["service_id"] = service.ServiceID
	body["deploy_version"] = r.Body.DeployVersion
	body["namespace"] = service.Namespace
	body["operator"] = r.Body.Operator
	body["event_id"] = r.Body.EventID
	body["tenant_name"] = r.Body.TenantName
	body["service_alias"] = r.Body.ServiceAlias
	body["action"] = r.Body.Action
	body["dep_sids"] = dependIds
	body["code_from"] = "image_manual"
	if r.Body.User != "" && r.Body.Password != "" {
		body["user"] = r.Body.User
		body["password"] = r.Body.Password
	}
	topic := gclient.BuilderTopic
	if s.isWindowsService(service.ServiceID) {
		topic = gclient.WindowsBuilderTopic
	}
	return s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		Topic:    topic,
		TaskType: "build_from_image",
		TaskBody: body,
	})
}

func (s *ServiceAction) buildFromSourceCode(r *apimodel.BuildServiceStruct, service *dbmodel.TenantServices) error {
	logrus.Debugf("build_from_source_code")
	if r.Body.RepoURL == "" || r.Body.Branch == "" || r.Body.DeployVersion == "" || r.Body.EventID == "" {
		return fmt.Errorf("args error")
	}
	body := make(map[string]interface{})
	if r.Body.Operator == "" {
		body["operator"] = "define"
	} else {
		body["operator"] = r.Body.Operator
	}
	body["tenant_id"] = service.TenantID
	body["service_id"] = service.ServiceID
	body["repo_url"] = r.Body.RepoURL
	body["action"] = r.Body.Action
	body["lang"] = r.Body.Lang
	body["runtime"] = r.Body.Runtime
	body["deploy_version"] = r.Body.DeployVersion
	body["event_id"] = r.Body.EventID
	body["envs"] = r.Body.ENVS
	body["tenant_name"] = r.Body.TenantName
	body["branch"] = r.Body.Branch
	body["server_type"] = r.Body.ServerType
	body["service_alias"] = r.Body.ServiceAlias
	if r.Body.User != "" && r.Body.Password != "" {
		body["user"] = r.Body.User
		body["password"] = r.Body.Password
	}
	body["expire"] = 180
	topic := gclient.BuilderTopic
	if s.isWindowsService(service.ServiceID) {
		topic = gclient.WindowsBuilderTopic
	}
	return s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		Topic:    topic,
		TaskType: "build_from_source_code",
		TaskBody: body,
	})
}

func (s *ServiceAction) isWindowsService(serviceID string) bool {
	label, err := db.GetManager().TenantServiceLabelDao().GetLabelByNodeSelectorKey(serviceID, "windows")
	if label == nil || err != nil {
		return false
	}
	return true
}

// AddLabel add labels
func (s *ServiceAction) AddLabel(l *apimodel.LabelsStruct, serviceID string) error {

	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	//V5.2: do not support service type label
	for _, label := range l.Labels {
		labelModel := dbmodel.TenantServiceLable{
			ServiceID:  serviceID,
			LabelKey:   label.LabelKey,
			LabelValue: label.LabelValue,
		}
		if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil {
			tx.Rollback()
			return err
		}
	}
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}
	return nil
}

// UpdateLabel updates labels
func (s *ServiceAction) UpdateLabel(l *apimodel.LabelsStruct, serviceID string) error {
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	for _, label := range l.Labels {
		// delete old labels
		err := db.GetManager().TenantServiceLabelDaoTransactions(tx).
			DelTenantServiceLabelsByServiceIDKey(serviceID, label.LabelKey)
		if err != nil {
			logrus.Errorf("error deleting old labels: %v", err)
			tx.Rollback()
			return err
		}
		// V5.2 do not support service type label
		// add new labels
		labelModel := dbmodel.TenantServiceLable{
			ServiceID:  serviceID,
			LabelKey:   label.LabelKey,
			LabelValue: label.LabelValue,
		}
		if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil {
			logrus.Errorf("error adding new labels: %v", err)
			tx.Rollback()
			return err

		}
	}
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}
	return nil
}

// DeleteLabel deletes label
func (s *ServiceAction) DeleteLabel(l *apimodel.LabelsStruct, serviceID string) error {
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	for _, label := range l.Labels {
		err := db.GetManager().TenantServiceLabelDaoTransactions(tx).
			DelTenantServiceLabelsByServiceIDKeyValue(serviceID, label.LabelKey, label.LabelValue)
		if err != nil {
			logrus.Errorf("error deleting label: %v", err)
			tx.Rollback()
			return err
		}
	}
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}
	return nil
}

// StartStopService start service
func (s *ServiceAction) StartStopService(sss *apimodel.StartStopStruct) error {
	services, err := db.GetManager().TenantServiceDao().GetServiceByID(sss.ServiceID)
	if err != nil {
		logrus.Errorf("get service by id error, %v", err)
		return err
	}
	TaskBody := model.StopTaskBody{
		TenantID:      sss.TenantID,
		ServiceID:     sss.ServiceID,
		DeployVersion: services.DeployVersion,
		EventID:       sss.EventID,
	}
	err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: sss.TaskType,
		TaskBody: TaskBody,
		Topic:    gclient.WorkerTopic,
	})
	if err != nil {
		logrus.Errorf("equque mq error, %v", err)
		return err
	}
	logrus.Debugf("equeue mq startstop task success")
	return nil
}

// PauseUNPauseService -
func (s *ServiceAction) PauseUNPauseService(serviceID string, pauseORunpause string) error {
	vmis, err := s.kubevirtClient.VirtualMachineInstance("").List(context.Background(), &metav1.ListOptions{LabelSelector: "service_id=" + serviceID})
	if err != nil {
		return err
	}
	if vmis.Items != nil && len(vmis.Items) > 0 {
		vm := vmis.Items[0]
		if pauseORunpause == "pause" {
			err = s.kubevirtClient.VirtualMachineInstance(vm.Namespace).Pause(context.Background(), vm.Name, &v1.PauseOptions{})
		} else if pauseORunpause == "unpause" {
			err = s.kubevirtClient.VirtualMachineInstance(vm.Namespace).Unpause(context.Background(), vm.Name, &v1.UnpauseOptions{})
		}
		if err != nil {
			return err
		}
	} else {
		return fmt.Errorf("service id is %v vm is not exist", serviceID)
	}
	return nil
}

// ServiceVertical vertical service
func (s *ServiceAction) ServiceVertical(ctx context.Context, vs *model.VerticalScalingTaskBody) error {
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(vs.ServiceID)
	if err != nil {
		logrus.Errorf("get service by id %s error, %s", vs.ServiceID, err)
		db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure)
		return err
	}
	oldMemory := service.ContainerMemory
	oldCPU := service.ContainerCPU
	oldGPU := service.ContainerGPU
	var rollback = func() {
		service.ContainerMemory = oldMemory
		service.ContainerCPU = oldCPU
		service.ContainerGPU = oldGPU
		_ = db.GetManager().TenantServiceDao().UpdateModel(service)
	}
	if vs.ContainerCPU != nil {
		service.ContainerCPU = *vs.ContainerCPU
	}
	if vs.ContainerMemory != nil {
		service.ContainerMemory = *vs.ContainerMemory
	}
	if vs.ContainerGPU != nil {
		service.ContainerGPU = *vs.ContainerGPU
	}
	if service.ContainerMemory == oldMemory && service.ContainerCPU == oldCPU && service.ContainerGPU == oldGPU {
		db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusSuccess)
		return nil
	}
	err = db.GetManager().TenantServiceDao().UpdateModel(service)
	if err != nil {
		db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure)
		logrus.Errorf("update service memory and cpu failure. %v", err)
		return fmt.Errorf("vertical service faliure:%s", err.Error())
	}
	err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: "vertical_scaling",
		TaskBody: vs,
		Topic:    gclient.WorkerTopic,
	})
	if err != nil {
		// roll back service
		rollback()
		logrus.Errorf("equque mq error, %v", err)
		db.GetManager().ServiceEventDao().SetEventStatus(ctx, dbmodel.EventStatusFailure)
		return err
	}
	logrus.Debugf("equeue mq vertical task success")
	return nil
}

// ServiceHorizontal Service Horizontal
func (s *ServiceAction) ServiceHorizontal(hs *model.HorizontalScalingTaskBody) error {
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(hs.ServiceID)
	if err != nil {
		logrus.Errorf("get service by id %s error, %s", hs.ServiceID, err)
		return err
	}

	// for rollback database
	oldReplicas := service.Replicas
	pods, err := s.statusCli.GetServicePods(service.ServiceID)
	if err != nil {
		logrus.Errorf("get service pods error: %v", err)
		return fmt.Errorf("horizontal service faliure:%s", err.Error())
	}
	if int32(len(pods.NewPods)) == hs.Replicas {
		return bcode.ErrHorizontalDueToNoChange
	}

	service.Replicas = int(hs.Replicas)
	err = db.GetManager().TenantServiceDao().UpdateModel(service)
	if err != nil {
		logrus.Errorf("updtae service replicas failure. %v", err)
		return fmt.Errorf("horizontal service faliure:%s", err.Error())
	}

	var rollback = func() {
		service.Replicas = oldReplicas
		_ = db.GetManager().TenantServiceDao().UpdateModel(service)
	}

	err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: "horizontal_scaling",
		TaskBody: hs,
		Topic:    gclient.WorkerTopic,
	})
	if err != nil {
		// roll back service
		rollback()
		logrus.Errorf("equque mq error, %v", err)
		return err
	}

	// if send task success, return nil
	logrus.Debugf("enqueue mq horizontal task success")
	return nil
}

// ServiceUpgrade service upgrade
func (s *ServiceAction) ServiceUpgrade(ru *model.RollingUpgradeTaskBody) error {
	services, err := db.GetManager().TenantServiceDao().GetServiceByID(ru.ServiceID)
	if err != nil {
		logrus.Errorf("get service by id %s error %s", ru.ServiceID, err.Error())
		return err
	}
	version, err := db.GetManager().VersionInfoDao().GetVersionByDeployVersion(ru.NewDeployVersion, ru.ServiceID)
	if err != nil {
		logrus.Errorf("get service version by id %s version %s error, %s", ru.ServiceID, ru.NewDeployVersion, err.Error())
		return err
	}
	oldDeployVersion := services.DeployVersion
	var rollback = func() {
		services.DeployVersion = oldDeployVersion
		_ = db.GetManager().TenantServiceDao().UpdateModel(services)
	}
	if version.FinalStatus != "success" {
		logrus.Warnf("deploy version %s is not build success,can not change deploy version in this upgrade event", ru.NewDeployVersion)
	} else {
		services.DeployVersion = ru.NewDeployVersion
		err = db.GetManager().TenantServiceDao().UpdateModel(services)
		if err != nil {
			logrus.Errorf("update service deploy version error. %v", err)
			return fmt.Errorf("horizontal service faliure:%s", err.Error())
		}
	}
	err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskBody: ru,
		TaskType: "rolling_upgrade",
		Topic:    gclient.WorkerTopic,
	})
	if err != nil {
		// roll back service deploy version
		rollback()
		logrus.Errorf("equque upgrade message error, %v", err)
		return err
	}
	return nil
}

// ServiceCreate create service
func (s *ServiceAction) ServiceCreate(sc *apimodel.ServiceStruct) error {
	jsonSC, err := ffjson.Marshal(sc)
	if err != nil {
		logrus.Errorf("trans service struct to json failed. %v", err)
		return err
	}
	var ts dbmodel.TenantServices
	if err := ffjson.Unmarshal(jsonSC, &ts); err != nil {
		logrus.Errorf("trans json to tenant service error, %v", err)
		return err
	}
	if ts.ServiceName == "" {
		ts.ServiceName = ts.ServiceAlias
	}
	if ts.ContainerCPU < 0 {
		ts.ContainerCPU = 0
	}
	if ts.ContainerMemory < 0 {
		ts.ContainerMemory = 0
	}
	if ts.ContainerGPU < 0 {
		ts.ContainerGPU = 0
	}
	if ts.K8sComponentName != "" {
		if db.GetManager().TenantServiceDao().IsK8sComponentNameDuplicate(ts.AppID, ts.ServiceID, ts.K8sComponentName) {
			return bcode.ErrK8sComponentNameExists
		}
	}
	ts.UpdateTime = time.Now()
	var (
		ports         = sc.PortsInfo
		envs          = sc.EnvsInfo
		volumns       = sc.VolumesInfo
		dependVolumes = sc.DepVolumesInfo
		dependIds     = sc.DependIDs
		probes        = sc.ComponentProbes
		monitors      = sc.ComponentMonitors
		httpRules     = sc.HTTPRules
		tcpRules      = sc.TCPRules
	)
	ts.AppID = sc.AppID
	ts.DeployVersion = ""
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	//create app
	if err := db.GetManager().TenantServiceDaoTransactions(tx).AddModel(&ts); err != nil {
		logrus.Errorf("add service error, %v", err)
		tx.Rollback()
		return err
	}
	//set app envs
	if len(envs) > 0 {
		var batchEnvs []*dbmodel.TenantServiceEnvVar
		for _, env := range envs {
			env := env
			env.ServiceID = ts.ServiceID
			env.TenantID = ts.TenantID
			batchEnvs = append(batchEnvs, &env)
		}
		if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).CreateOrUpdateEnvsInBatch(batchEnvs); err != nil {
			logrus.Errorf("batch add env error, %v", err)
			tx.Rollback()
			return err
		}
	}
	//set app port
	if len(ports) > 0 {
		var batchPorts []*dbmodel.TenantServicesPort
		for _, port := range ports {
			port := port
			port.ServiceID = ts.ServiceID
			port.TenantID = ts.TenantID
			batchPorts = append(batchPorts, &port)
		}
		if err := db.GetManager().TenantServicesPortDaoTransactions(tx).CreateOrUpdatePortsInBatch(batchPorts); err != nil {
			logrus.Errorf("batch add port error, %v", err)
			tx.Rollback()
			return err
		}
	}
	//set app volumns
	if len(volumns) > 0 {
		localPath := os.Getenv("LOCAL_DATA_PATH")
		sharePath := os.Getenv("SHARE_DATA_PATH")
		if localPath == "" {
			localPath = "/grlocaldata"
		}
		if sharePath == "" {
			sharePath = "/grdata"
		}

		for _, volumn := range volumns {
			v := dbmodel.TenantServiceVolume{
				ServiceID:      ts.ServiceID,
				Category:       volumn.Category,
				VolumeType:     volumn.VolumeType,
				VolumeName:     volumn.VolumeName,
				HostPath:       volumn.HostPath,
				VolumePath:     volumn.VolumePath,
				IsReadOnly:     volumn.IsReadOnly,
				VolumeCapacity: volumn.VolumeCapacity,
				// AccessMode 读写模式(Important! A volume can only be mounted using one access mode at a time, even if it supports many. For example, a GCEPersistentDisk can be mounted as ReadWriteOnce by a single node or ReadOnlyMany by many nodes, but not at the same time. #https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes)
				AccessMode: volumn.AccessMode,
				// SharePolicy 共享模式
				SharePolicy: volumn.SharePolicy,
				// BackupPolicy 备份策略
				BackupPolicy: volumn.BackupPolicy,
				// ReclaimPolicy 回收策略
				ReclaimPolicy: volumn.ReclaimPolicy,
				// AllowExpansion 是否支持扩展
				AllowExpansion: volumn.AllowExpansion,
				// VolumeProviderName 使用的存储驱动别名
				VolumeProviderName: volumn.VolumeProviderName,
				// Mode 配置文件权限模式
				Mode: volumn.Mode,
			}
			v.ServiceID = ts.ServiceID
			if volumn.VolumeType == "" {
				v.VolumeType = dbmodel.ShareFileVolumeType.String()
			}
			if volumn.HostPath == "" {
				//step 1 设置主机目录
				switch volumn.VolumeType {
				//共享文件存储
				case dbmodel.ShareFileVolumeType.String():
					v.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, sc.TenantID, ts.ServiceID, volumn.VolumePath)
				//本地文件存储
				case dbmodel.LocalVolumeType.String():
					if !dbmodel.ServiceType(sc.ExtendMethod).IsState() {
						tx.Rollback()
						return util.CreateAPIHandleError(400, fmt.Errorf("local volume type only support state component"))
					}
					v.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", localPath, sc.TenantID, ts.ServiceID, volumn.VolumePath)
				case dbmodel.ConfigFileVolumeType.String(), dbmodel.MemoryFSVolumeType.String():
					logrus.Debug("simple volume type : ", volumn.VolumeType)
				default:
					if !dbmodel.ServiceType(sc.ExtendMethod).IsState() {
						tx.Rollback()
						return util.CreateAPIHandleError(400, fmt.Errorf("custom volume type only support state component"))
					}
				}
			}
			if volumn.VolumeName == "" {
				v.VolumeName = uuid.New().String()
			}
			if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).AddModel(&v); err != nil {
				logrus.Errorf("add volumn %v error, %v", volumn.HostPath, err)
				tx.Rollback()
				return err
			}
			if volumn.FileContent != "" {
				cf := &dbmodel.TenantServiceConfigFile{
					ServiceID:   sc.ServiceID,
					VolumeName:  volumn.VolumeName,
					FileContent: volumn.FileContent,
				}
				if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).AddModel(cf); err != nil {
					tx.Rollback()
					return util.CreateAPIHandleErrorFromDBError("error creating config file", err)
				}
			}
		}
	}
	//set app dependVolumes
	if len(dependVolumes) > 0 {
		for _, depVolume := range dependVolumes {
			depVolume.ServiceID = ts.ServiceID
			depVolume.TenantID = ts.TenantID
			volume, err := db.GetManager().TenantServiceVolumeDao().GetVolumeByServiceIDAndName(depVolume.DependServiceID, depVolume.VolumeName)
			if err != nil {
				tx.Rollback()
				return fmt.Errorf("find volume %s error %s", depVolume.VolumeName, err.Error())
			}
			depVolume.VolumeType = volume.VolumeType
			depVolume.HostPath = volume.HostPath
			if err := db.GetManager().TenantServiceMountRelationDaoTransactions(tx).AddModel(&depVolume); err != nil {
				tx.Rollback()
				return fmt.Errorf("add dep volume %s error %s", depVolume.VolumeName, err.Error())
			}
		}
	}
	//set app depends
	if len(dependIds) > 0 {
		for _, id := range dependIds {
			if err := db.GetManager().TenantServiceRelationDaoTransactions(tx).AddModel(&id); err != nil {
				logrus.Errorf("add depend_id %v error, %v", id.DependServiceID, err)
				tx.Rollback()
				return err
			}
		}
	}
	//set app label
	if sc.OSType == "windows" {
		if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&dbmodel.TenantServiceLable{
			ServiceID:  ts.ServiceID,
			LabelKey:   dbmodel.LabelKeyNodeSelector,
			LabelValue: sc.OSType,
		}); err != nil {
			logrus.Errorf("add label %s=%s  %v error, %v", dbmodel.LabelKeyNodeSelector, sc.OSType, ts.ServiceID, err)
			tx.Rollback()
			return err
		}
	}
	// sc.Endpoints can't be nil
	// sc.Endpoints.Discovery or sc.Endpoints.Static can't be nil
	if sc.Kind == dbmodel.ServiceKindThirdParty.String() { // TODO: validate request data
		if sc.Endpoints == nil {
			tx.Rollback()
			return fmt.Errorf("endpoints can not be empty for third-party service")
		}
		if sc.Endpoints.Kubernetes != nil {
			c := &dbmodel.ThirdPartySvcDiscoveryCfg{
				ServiceID:   sc.ServiceID,
				Type:        string(dbmodel.DiscorveryTypeKubernetes),
				Namespace:   sc.Endpoints.Kubernetes.Namespace,
				ServiceName: sc.Endpoints.Kubernetes.ServiceName,
			}
			if err := db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).
				AddModel(c); err != nil {
				logrus.Errorf("error saving discover center configuration: %v", err)
				tx.Rollback()
				return err
			}
		}
		if sc.Endpoints.Static != nil {
			for _, o := range sc.Endpoints.Static {
				ep := &dbmodel.Endpoint{
					ServiceID: sc.ServiceID,
					UUID:      core_util.NewUUID(),
				}
				address := o
				port := 0
				prefix := ""
				if strings.HasPrefix(address, "https://") {
					address = strings.Split(address, "https://")[1]
					prefix = "https://"
				}
				if strings.HasPrefix(address, "http://") {
					address = strings.Split(address, "http://")[1]
					prefix = "http://"
				}
				if strings.Contains(address, ":") {
					addressL := strings.Split(address, ":")
					address = addressL[0]
					port, _ = strconv.Atoi(addressL[1])
				}
				ep.IP = prefix + address
				ep.Port = port

				logrus.Debugf("add new endpoint: %v", ep)

				if err := db.GetManager().EndpointsDaoTransactions(tx).AddModel(ep); err != nil {
					tx.Rollback()
					logrus.Errorf("error saving o endpoint: %v", err)
					return err
				}
			}
		}
	}
	if len(probes) > 0 {
		for _, pb := range probes {
			probe := s.convertProbeModel(&pb, ts.ServiceID)
			if err := db.GetManager().ServiceProbeDaoTransactions(tx).AddModel(probe); err != nil {
				logrus.Errorf("add probe %v error, %v", probe.ProbeID, err)
				tx.Rollback()
				return err
			}
		}
	}
	if len(monitors) > 0 {
		for _, m := range monitors {
			monitor := dbmodel.TenantServiceMonitor{
				Name:            m.Name,
				TenantID:        ts.TenantID,
				ServiceID:       ts.ServiceID,
				ServiceShowName: m.ServiceShowName,
				Port:            m.Port,
				Path:            m.Path,
				Interval:        m.Interval,
			}
			if err := db.GetManager().TenantServiceMonitorDaoTransactions(tx).AddModel(&monitor); err != nil {
				logrus.Errorf("add monitor %v error, %v", monitor.Name, err)
				tx.Rollback()
				return err
			}
		}
	}
	if len(httpRules) > 0 {
		for _, httpRule := range httpRules {
			if err := GetGatewayHandler().CreateHTTPRule(tx, &httpRule); err != nil {
				logrus.Errorf("add service http rule error %v", err)
				tx.Rollback()
				return err
			}
		}
	}
	if len(tcpRules) > 0 {
		for _, tcpRule := range tcpRules {
			if GetGatewayHandler().TCPIPPortExists(tcpRule.IP, tcpRule.Port) {
				logrus.Debugf("tcp rule %v:%v exists", tcpRule.IP, tcpRule.Port)
				continue
			}
			if err := GetGatewayHandler().CreateTCPRule(tx, &tcpRule); err != nil {
				logrus.Errorf("add service tcp rule error %v", err)
				tx.Rollback()
				return err
			}
		}
	}
	labelModel := dbmodel.TenantServiceLable{
		ServiceID:  ts.ServiceID,
		LabelKey:   dbmodel.LabelKeyServiceType,
		LabelValue: core_util.StatelessServiceType,
	}
	if ts.IsState() {
		labelModel.LabelValue = core_util.StatefulServiceType
	}
	if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).AddModel(&labelModel); err != nil {
		tx.Rollback()
		return err
	}

	// TODO: create default probe for third-party service.
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}
	logrus.Debugf("create a new app %s success", ts.ServiceAlias)

	return nil
}

func (s *ServiceAction) convertProbeModel(req *apimodel.ServiceProbe, serviceID string) *dbmodel.TenantServiceProbe {
	return &dbmodel.TenantServiceProbe{
		ServiceID:          serviceID,
		Cmd:                req.Cmd,
		FailureThreshold:   req.FailureThreshold,
		HTTPHeader:         req.HTTPHeader,
		InitialDelaySecond: req.InitialDelaySecond,
		IsUsed:             &req.IsUsed,
		Mode:               req.Mode,
		Path:               req.Path,
		PeriodSecond:       req.PeriodSecond,
		Port:               req.Port,
		ProbeID:            req.ProbeID,
		Scheme:             req.Scheme,
		SuccessThreshold:   req.SuccessThreshold,
		TimeoutSecond:      req.TimeoutSecond,
		FailureAction:      req.FailureAction,
	}
}

// ServiceUpdate update service
func (s *ServiceAction) ServiceUpdate(sc map[string]interface{}) error {
	ts, err := db.GetManager().TenantServiceDao().GetServiceByID(sc["service_id"].(string))
	if err != nil {
		return err
	}
	if memory, ok := sc["container_memory"].(int); ok && memory >= 0 {
		ts.ContainerMemory = memory
	}
	if cpu, ok := sc["container_cpu"].(int); ok && cpu >= 0 {
		ts.ContainerCPU = cpu
	}
	if gpu, ok := sc["container_gpu"].(int); ok {
		ts.ContainerCPU = gpu
	}
	if name, ok := sc["service_name"].(string); ok && name != "" {
		ts.ServiceName = name
	}
	if appID, ok := sc["app_id"].(string); ok && appID != "" {
		ts.AppID = appID
	}
	if k8sComponentName, ok := sc["k8s_component_name"].(string); ok && k8sComponentName != "" {
		if db.GetManager().TenantServiceDao().IsK8sComponentNameDuplicate(ts.AppID, ts.ServiceID, k8sComponentName) {
			return bcode.ErrK8sComponentNameExists
		}
		ts.K8sComponentName = k8sComponentName
	}
	if sc["extend_method"] != nil {
		extendMethod := sc["extend_method"].(string)
		ts.ExtendMethod = extendMethod
		// if component replicas is more than 1, so can't change service type to singleton
		if ts.Replicas > 1 && ts.IsSingleton() {
			err := fmt.Errorf("service[%s] replicas > 1, can't change service typ to stateless_singleton", ts.ServiceAlias)
			return err
		}
		volumes, err := db.GetManager().TenantServiceVolumeDao().GetTenantServiceVolumesByServiceID(ts.ServiceID)
		if err != nil {
			return err
		}
		for _, vo := range volumes {
			if vo.VolumeType == dbmodel.ShareFileVolumeType.String() || vo.VolumeType == dbmodel.MemoryFSVolumeType.String() {
				continue
			}
			if vo.VolumeType == dbmodel.LocalVolumeType.String() && !ts.IsState() {
				err := fmt.Errorf("service[%s] has local volume type, can't change type to stateless", ts.ServiceAlias)
				return err
			}
			// if component use volume, what it accessMode is rwo, can't change volume type to stateless
			if vo.AccessMode == "RWO" && !ts.IsState() {
				err := fmt.Errorf("service[%s] volume[%s] access_mode is RWO, can't change type to stateless", ts.ServiceAlias, vo.VolumeName)
				return err
			}
		}
		ts.ExtendMethod = extendMethod
		ts.ServiceType = extendMethod
	}
	if js, ok := sc["job_strategy"].(string); ok {
		ts.JobStrategy = js
	}
	//update component
	if err := db.GetManager().TenantServiceDao().UpdateModel(ts); err != nil {
		logrus.Errorf("update service error, %v", err)
		return err
	}
	return nil
}

// LanguageSet language set
func (s *ServiceAction) LanguageSet(langS *apimodel.LanguageSet) error {
	logrus.Debugf("service id is %s, language is %s", langS.ServiceID, langS.Language)
	services, err := db.GetManager().TenantServiceDao().GetServiceByID(langS.ServiceID)
	if err != nil {
		logrus.Errorf("get service by id error, %v, %v", services, err)
		return err
	}
	if langS.Language == "java" {
		services.ContainerMemory = 512
		if err := db.GetManager().TenantServiceDao().UpdateModel(services); err != nil {
			logrus.Errorf("update tenant service error %v", err)
			return err
		}
	}
	return nil
}

// GetService get service(s)
func (s *ServiceAction) GetService(tenantID string) ([]*dbmodel.TenantServices, error) {
	services, err := db.GetManager().TenantServiceDao().GetServicesAllInfoByTenantID(tenantID)
	if err != nil {
		logrus.Errorf("get service by id error, %v, %v", services, err)
		return nil, err
	}
	var serviceIDs []string
	for _, s := range services {
		serviceIDs = append(serviceIDs, s.ServiceID)
	}
	status := s.statusCli.GetStatuss(strings.Join(serviceIDs, ","))
	for _, s := range services {
		if status, ok := status[s.ServiceID]; ok {
			s.CurStatus = status
		}
	}
	return services, nil
}

// GetServicesByAppID get service(s) by appID
func (s *ServiceAction) GetServicesByAppID(appID string, page, pageSize int) (*apimodel.ListServiceResponse, error) {
	var resp apimodel.ListServiceResponse
	services, total, err := db.GetManager().TenantServiceDao().GetServicesInfoByAppID(appID, page, pageSize)
	if err != nil {
		logrus.Errorf("get service by application id error, %v, %v", services, err)
		return nil, err
	}
	var serviceIDs []string
	for _, s := range services {
		serviceIDs = append(serviceIDs, s.ServiceID)
	}
	status := s.statusCli.GetStatuss(strings.Join(serviceIDs, ","))
	for _, s := range services {
		if status, ok := status[s.ServiceID]; ok {
			s.CurStatus = status
		}
	}
	if services != nil {
		resp.Services = services
	} else {
		resp.Services = make([]*dbmodel.TenantServices, 0)
	}

	resp.Page = page
	resp.Total = total
	resp.PageSize = pageSize
	return &resp, nil
}

// GetPagedTenantRes get pagedTenantServiceRes(s)
func (s *ServiceAction) GetPagedTenantRes(offset, len int) ([]*apimodel.TenantResource, int, error) {
	allstatus := s.statusCli.GetAllStatus()
	var serviceIDs []string
	for k, v := range allstatus {
		if !s.statusCli.IsClosedStatus(v) {
			serviceIDs = append(serviceIDs, k)
		}
	}
	services, count, err := db.GetManager().TenantServiceDao().GetPagedTenantService(offset, len, serviceIDs)
	if err != nil {
		logrus.Errorf("get service by id error, %v, %v", services, err)
		return nil, count, err
	}
	var result []*apimodel.TenantResource
	for _, v := range services {
		var res apimodel.TenantResource
		res.UUID, _ = v["tenant"].(string)
		res.Name, _ = v["tenant_name"].(string)
		res.EID, _ = v["eid"].(string)
		res.AllocatedCPU, _ = v["capcpu"].(int)
		res.AllocatedMEM, _ = v["capmem"].(int)
		res.UsedCPU, _ = v["usecpu"].(int)
		res.UsedMEM, _ = v["usemem"].(int)
		result = append(result, &res)
	}
	return result, count, nil
}

// GetTenantRes get pagedTenantServiceRes(s)
func (s *ServiceAction) GetTenantRes(uuid string) (*apimodel.TenantResource, error) {
	if logrus.IsLevelEnabled(logrus.DebugLevel) {
		defer core_util.Elapsed("[ServiceAction] get tenant resource")()
	}

	tenant, err := db.GetManager().TenantDao().GetTenantByUUID(uuid)
	if err != nil {
		logrus.Errorf("get tenant %s info failure %v", uuid, err.Error())
		return nil, err
	}
	services, err := db.GetManager().TenantServiceDao().GetServicesByTenantID(uuid)
	if err != nil {
		logrus.Errorf("get service by id error, %v, %v", services, err.Error())
		return nil, err
	}
	var serviceIDs string
	var AllocatedCPU, AllocatedMEM int
	for _, ser := range services {
		if serviceIDs == "" {
			serviceIDs += ser.ServiceID
		} else {
			serviceIDs += "," + ser.ServiceID
		}
		AllocatedCPU += ser.ContainerCPU * ser.Replicas
		AllocatedMEM += ser.ContainerMemory * ser.Replicas
	}
	tenantResUesd, err := s.statusCli.GetTenantResource(uuid)
	if err != nil {
		logrus.Errorf("get tenant %s resource failure %s", uuid, err.Error())
	}
	disks := GetServicesDiskDeprecated(strings.Split(serviceIDs, ","), s.prometheusCli)
	var value float64
	for _, v := range disks {
		value += v
	}
	var res apimodel.TenantResource
	res.UUID = uuid
	res.Name = tenant.Name
	res.EID = tenant.EID
	res.AllocatedCPU = AllocatedCPU
	res.AllocatedMEM = AllocatedMEM
	if tenantResUesd != nil {
		res.UsedCPU = int(tenantResUesd.CpuRequest)
		res.UsedMEM = int(tenantResUesd.MemoryRequest)
	}
	res.UsedDisk = value
	return &res, nil
}

// GetServicesDiskDeprecated -
func GetServicesDiskDeprecated(ids []string, prometheusCli prometheus.Interface) map[string]float64 {
	if logrus.IsLevelEnabled(logrus.DebugLevel) {
		defer core_util.Elapsed("[GetServicesDiskDeprecated] get tenant resource")()
	}

	result := make(map[string]float64)
	//query disk used in prometheus
	query := fmt.Sprintf(`max(app_resource_appfs{service_id=~"%s"}) by(service_id)`, strings.Join(ids, "|"))
	metric := prometheusCli.GetMetric(query, time.Now())
	for _, re := range metric.MetricData.MetricValues {
		var serviceID = re.Metadata["service_id"]
		if re.Sample != nil {
			result[serviceID] = re.Sample.Value()
		}
	}
	return result
}

// CodeCheck code check
func (s *ServiceAction) CodeCheck(c *apimodel.CheckCodeStruct) error {
	err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: "code_check",
		TaskBody: c.Body,
		Topic:    gclient.BuilderTopic,
	})
	if err != nil {
		logrus.Errorf("equque mq error, %v", err)
		return err
	}
	return nil
}

// ServiceDepend service depend
func (s *ServiceAction) ServiceDepend(action string, ds *apimodel.DependService) error {
	switch action {
	case "add":
		tsr := &dbmodel.TenantServiceRelation{
			TenantID:          ds.TenantID,
			ServiceID:         ds.ServiceID,
			DependServiceID:   ds.DepServiceID,
			DependServiceType: ds.DepServiceType,
			DependOrder:       1,
		}
		if err := db.GetManager().TenantServiceRelationDao().AddModel(tsr); err != nil {
			logrus.Errorf("add depend error, %v", err)
			if err == dberr.ErrRecordAlreadyExist {
				return nil
			}
			return err
		}
	case "delete":
		logrus.Debugf("serviceid is %v, depid is %v", ds.ServiceID, ds.DepServiceID)
		if err := db.GetManager().TenantServiceRelationDao().DeleteRelationByDepID(ds.ServiceID, ds.DepServiceID); err != nil {
			logrus.Errorf("delete depend error, %v", err)
			return err
		}
	}
	return nil
}

// EnvAttr env attr
func (s *ServiceAction) EnvAttr(action string, at *dbmodel.TenantServiceEnvVar, oldAttrNames ...string) error {
	switch action {
	case "add":
		if err := db.GetManager().TenantServiceEnvVarDao().AddModel(at); err != nil {
			logrus.Errorf("add env %v error, %v", at.AttrName, err)
			return err
		}
	case "delete":
		if err := db.GetManager().TenantServiceEnvVarDao().DeleteModel(at.ServiceID, at.AttrName); err != nil {
			logrus.Errorf("delete env %v error, %v", at.AttrName, err)
			return err
		}
	case "update":
		oldAttrName := at.AttrName
		if len(oldAttrNames) > 0 && oldAttrNames[0] != "" {
			oldAttrName = oldAttrNames[0]
		}
		if err := db.GetManager().TenantServiceEnvVarDao().UpdateModelByAttrName(at, oldAttrName); err != nil {
			logrus.Errorf("update env %v error,%v", at.AttrName, err)
			return err
		}
	}
	return nil
}

// CreatePorts -
func (s *ServiceAction) CreatePorts(tenantID, serviceID string, vps *apimodel.ServicePorts) error {
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()

	for _, vp := range vps.Port {
		// make sure K8sServiceName is unique
		if vp.K8sServiceName != "" {
			port, err := db.GetManager().TenantServicesPortDao().GetByTenantAndName(tenantID, vp.K8sServiceName)
			if err != nil && err != gorm.ErrRecordNotFound {
				tx.Rollback()
				return err
			}
			if port != nil && port.ServiceID != serviceID {
				tx.Rollback()
				return bcode.ErrK8sServiceNameExists
			}
		}

		var vpD dbmodel.TenantServicesPort
		vpD.ServiceID = serviceID
		vpD.TenantID = tenantID
		vpD.IsInnerService = &vp.IsInnerService
		vpD.IsOuterService = &vp.IsOuterService
		vpD.ContainerPort = vp.ContainerPort
		vpD.MappingPort = vp.MappingPort
		vpD.Protocol = vp.Protocol
		vpD.PortAlias = vp.PortAlias
		vpD.K8sServiceName = vp.K8sServiceName
		if err := db.GetManager().TenantServicesPortDaoTransactions(tx).AddModel(&vpD); err != nil {
			tx.Rollback()
			return err
		}
	}

	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}

	return nil
}

func (s *ServiceAction) deletePorts(componentID string, tenantID string, ports *apimodel.ServicePorts) error {
	component, err := db.GetManager().TenantServiceDao().GetServiceByID(componentID)
	if err != nil {
		return err
	}
	tenant, err := db.GetManager().TenantDao().GetTenantByUUID(tenantID)
	if err != nil {
		return err
	}
	return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
		for _, port := range ports.Port {
			err = k8s.Default().ApiSixClient.ApisixV2().ApisixRoutes(tenant.Namespace).DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{
				LabelSelector: "component_sort=" + component.ServiceAlias + ",port=" + strconv.Itoa(port.ContainerPort),
			})
			if err != nil {
				return err
			}
			if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteModel(componentID, port.ContainerPort); err != nil {
				return err
			}

			// delete related ingress rules
			if err := GetGatewayHandler().DeleteIngressRulesByComponentPort(tx, componentID, port.ContainerPort); err != nil {
				return err
			}
		}

		return nil
	})
}

// SyncComponentPorts -
func (s *ServiceAction) SyncComponentPorts(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs []string
		ports        []*dbmodel.TenantServicesPort
	)
	for _, component := range components {
		if component.Ports == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, port := range component.Ports {
			ports = append(ports, port.DbModel(app.TenantID, component.ComponentBase.ComponentID))
		}
	}
	if err := db.GetManager().TenantServicesPortDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServicesPortDaoTransactions(tx).CreateOrUpdatePortsInBatch(ports)
}

// PortVar port var
func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *apimodel.ServicePorts, oldPort int) error {
	crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
		serviceID,
		dbmodel.InBoundNetPlugin,
	)
	if err != nil {
		return err
	}
	switch action {
	case "delete":
		return s.deletePorts(serviceID, tenantID, vps)
	case "update":
		tx := db.GetManager().Begin()
		defer func() {
			if r := recover(); r != nil {
				logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
				tx.Rollback()
			}
		}()
		for _, vp := range vps.Port {
			//port更新单个请求
			oldPort = vp.ContainerPort
			vpD, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, oldPort)
			if err != nil {
				tx.Rollback()
				return err
			}
			// make sure K8sServiceName is unique
			if vp.K8sServiceName != "" {
				port, err := db.GetManager().TenantServicesPortDao().GetByTenantAndName(tenantID, vp.K8sServiceName)
				if err != nil && err != gorm.ErrRecordNotFound {
					tx.Rollback()
					return err
				}
				if port != nil && vpD.K8sServiceName != vp.K8sServiceName && port.ServiceID != serviceID {
					tx.Rollback()
					return bcode.ErrK8sServiceNameExists
				}
			}

			vpD.ServiceID = serviceID
			vpD.TenantID = tenantID
			vpD.IsInnerService = &vp.IsInnerService
			vpD.IsOuterService = &vp.IsOuterService
			vpD.ContainerPort = vp.ContainerPort
			vpD.MappingPort = vp.MappingPort
			vpD.Protocol = vp.Protocol
			vpD.PortAlias = vp.PortAlias
			vpD.K8sServiceName = vp.K8sServiceName
			if err := db.GetManager().TenantServicesPortDao().UpdateModel(vpD); err != nil {
				logrus.Errorf("update port var error, %v", err)
				tx.Rollback()
				return err
			}
			if crt {
				pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
					serviceID,
					dbmodel.InBoundNetPlugin,
					oldPort,
				)
				goon := true
				if err != nil {
					if strings.Contains(err.Error(), "record not found") {
						goon = false
					} else {
						logrus.Errorf("get plugin mapping port error:(%s)", err)
						tx.Rollback()
						return err
					}
				}
				if goon {
					pluginPort.ContainerPort = vp.ContainerPort
					if err := db.GetManager().TenantServicesStreamPluginPortDao().UpdateModel(pluginPort); err != nil {
						logrus.Errorf("update plugin mapping port error:(%s)", err)
						tx.Rollback()
						return err
					}
				}
			}
		}
		if err := tx.Commit().Error; err != nil {
			tx.Rollback()
			logrus.Debugf("commit update port error, %v", err)
			return err
		}
	}
	return nil
}

// PortOuter 端口对外服务操作
func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort int,
	servicePort *apimodel.ServicePortInnerOrOuter) (*dbmodel.TenantServiceLBMappingPort, string, error) {
	p, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, containerPort)
	if err != nil {
		return nil, "", fmt.Errorf("find service port error:%s", err.Error())
	}
	_, err = db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		return nil, "", fmt.Errorf("find service error:%s", err.Error())
	}
	hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
		serviceID,
		dbmodel.InBoundNetPlugin,
	)
	if err != nil {
		return nil, "", fmt.Errorf("get plugin relations error: %s", err.Error())
	}
	//if stream 创建vs端口
	vsPort := &dbmodel.TenantServiceLBMappingPort{}
	switch servicePort.Body.Operation {
	case "close":
		if *p.IsOuterService { //如果端口已经开了对外
			falsev := false
			p.IsOuterService = &falsev
			tx := db.GetManager().Begin()
			defer func() {
				if r := recover(); r != nil {
					logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
					tx.Rollback()
				}
			}()
			if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil {
				tx.Rollback()
				return nil, "", err
			}

			if hasUpStream {
				pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
					serviceID,
					dbmodel.InBoundNetPlugin,
					containerPort,
				)
				if err != nil {
					if err.Error() == gorm.ErrRecordNotFound.Error() {
						logrus.Debugf("outer, plugin port (%d) is not exist, do not need delete", containerPort)
						goto OUTERCLOSEPASS
					}
					tx.Rollback()
					return nil, "", fmt.Errorf("outer, get plugin mapping port error:(%s)", err)
				}
				if *p.IsInnerService {
					//发现内网未关闭则不删除该映射
					logrus.Debugf("outer, close outer, but plugin inner port (%d) is exist, do not need delete", containerPort)
					goto OUTERCLOSEPASS
				}
				if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort(
					serviceID,
					dbmodel.InBoundNetPlugin,
					containerPort,
				); err != nil {
					tx.Rollback()
					return nil, "", fmt.Errorf("outer, delete plugin mapping port %d error:(%s)", containerPort, err)
				}
				logrus.Debugf(fmt.Sprintf("outer, delete plugin port %d->%d", containerPort, pluginPort.PluginPort))
			OUTERCLOSEPASS:
			}
			if err := tx.Commit().Error; err != nil {
				tx.Rollback()
				return nil, "", err
			}
		} else {
			return nil, "", nil
		}

	case "open":
		truev := true
		p.IsOuterService = &truev
		tx := db.GetManager().Begin()
		defer func() {
			if r := recover(); r != nil {
				logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
				tx.Rollback()
			}
		}()
		if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil {
			tx.Rollback()
			return nil, "", err
		}
		if hasUpStream {
			pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
				serviceID,
				dbmodel.InBoundNetPlugin,
				containerPort,
			)
			var pPort int
			if err != nil {
				if err.Error() == gorm.ErrRecordNotFound.Error() {
					ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
						p.TenantID,
						serviceID,
						dbmodel.InBoundNetPlugin,
						containerPort,
					)
					if err != nil {
						tx.Rollback()
						logrus.Errorf("outer, set plugin mapping port error:(%s)", err)
						return nil, "", fmt.Errorf("outer, set plugin mapping port error:(%s)", err)
					}
					pPort = ppPort
					goto OUTEROPENPASS
				}
				tx.Rollback()
				return nil, "", fmt.Errorf("outer, in setting plugin mapping port, get plugin mapping port error:(%s)", err)
			}
			logrus.Debugf("outer, plugin mapping port is already exist, %d->%d", pluginPort.ContainerPort, pluginPort.PluginPort)
		OUTEROPENPASS:
			logrus.Debugf("outer, set plugin mapping port %d->%d", containerPort, pPort)
		}
		if err := tx.Commit().Error; err != nil {
			tx.Rollback()
			return nil, "", err
		}
	}
	return vsPort, p.Protocol, nil
}

// PortInner 端口对内服务操作
// TODO: send task to worker
func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port int) error {
	p, err := db.GetManager().TenantServicesPortDao().GetPort(serviceID, port)
	if err != nil {
		return err
	}
	_, err = db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		return fmt.Errorf("get service error:%s", err.Error())
	}
	hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
		serviceID,
		dbmodel.InBoundNetPlugin,
	)
	if err != nil {
		return fmt.Errorf("get plugin relations error: %s", err.Error())
	}
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	switch operation {
	case "close":
		if *p.IsInnerService { //如果端口已经开了对内
			falsev := false
			p.IsInnerService = &falsev
			if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil {
				tx.Rollback()
				return fmt.Errorf("update service port error: %s", err.Error())
			}
			if hasUpStream {
				pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
					serviceID,
					dbmodel.InBoundNetPlugin,
					port,
				)
				if err != nil {
					if err.Error() == gorm.ErrRecordNotFound.Error() {
						logrus.Debugf("inner, plugin port (%d) is not exist, do not need delete", port)
						goto INNERCLOSEPASS
					}
					tx.Rollback()
					return fmt.Errorf("inner, get plugin mapping port error:(%s)", err)
				}
				if *p.IsOuterService {
					logrus.Debugf("inner, close inner, but plugin outerport (%d) is exist, do not need delete", port)
					goto INNERCLOSEPASS
				}
				if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort(
					serviceID,
					dbmodel.InBoundNetPlugin,
					port,
				); err != nil {
					tx.Rollback()
					return fmt.Errorf("inner, delete plugin mapping port %d error:(%s)", port, err)
				}
				logrus.Debugf(fmt.Sprintf("inner, delete plugin port %d->%d", port, pluginPort.PluginPort))
			INNERCLOSEPASS:
			}
		} else {
			tx.Rollback()
			return fmt.Errorf("already close")
		}
	case "open":
		if *p.IsInnerService {
			tx.Rollback()
			return fmt.Errorf("already open")
		}
		truv := true
		p.IsInnerService = &truv
		if err = db.GetManager().TenantServicesPortDaoTransactions(tx).UpdateModel(p); err != nil {
			tx.Rollback()
			return err
		}
		if hasUpStream {
			pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
				serviceID,
				dbmodel.InBoundNetPlugin,
				port,
			)
			var pPort int
			if err != nil {
				if err.Error() == gorm.ErrRecordNotFound.Error() {
					ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
						p.TenantID,
						serviceID,
						dbmodel.InBoundNetPlugin,
						port,
					)
					if err != nil {
						tx.Rollback()
						logrus.Errorf("inner, set plugin mapping port error:(%s)", err)
						return fmt.Errorf("inner, set plugin mapping port error:(%s)", err)
					}
					pPort = ppPort
					goto INNEROPENPASS
				}
				tx.Rollback()
				return fmt.Errorf("inner, in setting plugin mapping port, get plugin mapping port error:(%s)", err)
			}
			logrus.Debugf("inner, plugin mapping port is already exist, %d->%d", pluginPort.ContainerPort, pluginPort.PluginPort)
		INNEROPENPASS:
			logrus.Debugf("inner, set plugin mapping port %d->%d", port, pPort)
		}
	}
	if err := tx.Commit().Error; err != nil {
		tx.Rollback()
		return err
	}
	return nil
}

// VolumnVar var volumn
func (s *ServiceAction) VolumnVar(tsv *dbmodel.TenantServiceVolume, tenantID, fileContent, action string) *util.APIHandleError {
	localPath := os.Getenv("LOCAL_DATA_PATH")
	sharePath := os.Getenv("SHARE_DATA_PATH")
	if localPath == "" {
		localPath = "/grlocaldata"
	}
	if sharePath == "" {
		sharePath = "/grdata"
	}

	switch action {
	case "add":
		if tsv.HostPath == "" {
			//step 1 设置主机目录
			switch tsv.VolumeType {
			//共享文件存储
			case dbmodel.ShareFileVolumeType.String():
				tsv.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, tenantID, tsv.ServiceID, tsv.VolumePath)
			//本地文件存储
			case dbmodel.LocalVolumeType.String():
				serviceInfo, err := db.GetManager().TenantServiceDao().GetServiceTypeByID(tsv.ServiceID)
				if err != nil {
					return util.CreateAPIHandleErrorFromDBError("service type", err)
				}
				// local volume just only support state component
				if serviceInfo == nil || !serviceInfo.IsState() {
					return util.CreateAPIHandleError(400, fmt.Errorf("应用类型为'无状态'.不支持本地存储"))
				}
				tsv.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", localPath, tenantID, tsv.ServiceID, tsv.VolumePath)
			case dbmodel.VMVolumeType.String():
				tsv.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, tenantID, tsv.ServiceID, tsv.VolumePath)
			}
		}
		util.SetVolumeDefaultValue(tsv)
		// begin transaction
		tx := db.GetManager().Begin()
		defer func() {
			if r := recover(); r != nil {
				logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
				tx.Rollback()
			}
		}()
		if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).AddModel(tsv); err != nil {
			tx.Rollback()
			return util.CreateAPIHandleErrorFromDBError("add volume", err)
		}
		if fileContent != "" {
			cf := &dbmodel.TenantServiceConfigFile{
				ServiceID:   tsv.ServiceID,
				VolumeName:  tsv.VolumeName,
				FileContent: fileContent,
			}
			if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).AddModel(cf); err != nil {
				tx.Rollback()
				return util.CreateAPIHandleErrorFromDBError("error creating config file", err)
			}
		}
		// end transaction
		if err := tx.Commit().Error; err != nil {
			tx.Rollback()
			return util.CreateAPIHandleErrorFromDBError("error ending transaction", err)
		}
	case "delete":
		// begin transaction
		tx := db.GetManager().Begin()
		defer func() {
			if r := recover(); r != nil {
				logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
				tx.Rollback()
			}
		}()
		if tsv.VolumeName != "" {
			volume, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(tsv.ServiceID, tsv.VolumeName)
			if err != nil {
				tx.Rollback()
				return util.CreateAPIHandleErrorFromDBError("find volume", err)
			}

			if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
				tx.Rollback()
				return util.CreateAPIHandleErrorFromDBError("delete volume", err)
			}

			err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
				Topic:    gclient.WorkerTopic,
				TaskType: "volume_gc",
				TaskBody: map[string]interface{}{
					"tenant_id":   tenantID,
					"service_id":  volume.ServiceID,
					"volume_id":   volume.ID,
					"volume_path": volume.VolumePath,
				},
			})
			if err != nil {
				logrus.Errorf("send 'volume_gc' task: %v", err)
				tx.Rollback()
				return util.CreateAPIHandleErrorFromDBError("send 'volume_gc' task", err)
			}
		} else {
			if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByServiceIDAndVolumePath(tsv.ServiceID, tsv.VolumePath); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
				tx.Rollback()
				return util.CreateAPIHandleErrorFromDBError("delete volume", err)
			}
		}
		if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByVolumeID(tsv.ServiceID, tsv.VolumeName); err != nil {
			tx.Rollback()
			return util.CreateAPIHandleErrorFromDBError("error deleting config files", err)
		}
		// end transaction
		if err := tx.Commit().Error; err != nil {
			tx.Rollback()
			return util.CreateAPIHandleErrorFromDBError("error ending transaction", err)
		}
	}
	return nil
}

// UpdVolume updates service volume.
func (s *ServiceAction) UpdVolume(sid string, req *apimodel.UpdVolumeReq) error {
	tx := db.GetManager().Begin()
	defer func() {
		if r := recover(); r != nil {
			logrus.Errorf("Unexpected panic occurred, rollback transaction: %v", r)
			tx.Rollback()
		}
	}()
	v, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(sid, req.VolumeName)
	if err != nil {
		tx.Rollback()
		return err
	}
	v.VolumePath = req.VolumePath
	if req.VolumeCapacity != nil {
		v.VolumeCapacity = *req.VolumeCapacity
	}
	v.Mode = req.Mode
	if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).UpdateModel(v); err != nil {
		tx.Rollback()
		return err
	}
	if req.VolumeType == "config-file" {
		configfile, err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).GetByVolumeName(sid, req.VolumeName)
		if err != nil {
			tx.Rollback()
			return err
		}
		configfile.FileContent = req.FileContent
		if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).UpdateModel(configfile); err != nil {
			tx.Rollback()
			return err
		}
	}
	tx.Commit()
	return nil
}

// GetVolumes 获取应用全部存储
func (s *ServiceAction) GetVolumes(serviceID string) ([]*apimodel.VolumeWithStatusStruct, *util.APIHandleError) {
	volumeWithStatusList := make([]*apimodel.VolumeWithStatusStruct, 0)
	vs, err := db.GetManager().TenantServiceVolumeDao().GetTenantServiceVolumesByServiceID(serviceID)
	if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
		return nil, util.CreateAPIHandleErrorFromDBError("get volumes", err)
	}

	volumeStatusList, err := s.statusCli.GetAppVolumeStatus(serviceID)
	if err != nil {
		logrus.Warnf("get volume status error: %s", err.Error())
	}
	volumeStatus := make(map[string]pb.ServiceVolumeStatus)
	if volumeStatusList != nil && volumeStatusList.GetStatus() != nil {
		volumeStatus = volumeStatusList.GetStatus()
	}
	isMountedShareVolume := false
	mountStatus := pb.ServiceVolumeStatus_NOT_READY.String()
	for _, volume := range vs {
		vws := &apimodel.VolumeWithStatusStruct{
			ServiceID:          volume.ServiceID,
			Category:           volume.Category,
			VolumeType:         volume.VolumeType,
			VolumeName:         volume.VolumeName,
			HostPath:           volume.HostPath,
			VolumePath:         volume.VolumePath,
			IsReadOnly:         volume.IsReadOnly,
			VolumeCapacity:     volume.VolumeCapacity,
			AccessMode:         volume.AccessMode,
			SharePolicy:        volume.SharePolicy,
			BackupPolicy:       volume.BackupPolicy,
			ReclaimPolicy:      volume.ReclaimPolicy,
			AllowExpansion:     volume.AllowExpansion,
			VolumeProviderName: volume.VolumeProviderName,
		}
		volumeID := strconv.FormatInt(int64(volume.ID), 10)
		if phrase, ok := volumeStatus[volumeID]; ok {
			vws.Status = phrase.String()
			if os.Getenv("ENABLE_SUBPATH") == "true" && vws.VolumeType == "share-file" && strings.HasPrefix(vws.HostPath, "/grdata") {
				isMountedShareVolume = true
				mountStatus = vws.Status
			}
		} else {
			vws.Status = pb.ServiceVolumeStatus_NOT_READY.String()
			if isMountedShareVolume && strings.HasPrefix(vws.HostPath, "/grdata") {
				vws.Status = mountStatus
			}
		}
		volumeWithStatusList = append(volumeWithStatusList, vws)
	}

	return volumeWithStatusList, nil
}

// VolumeDependency VolumeDependency
func (s *ServiceAction) VolumeDependency(tsr *dbmodel.TenantServiceMountRelation, action string) *util.APIHandleError {
	switch action {
	case "add":
		if tsr.VolumeName != "" {
			vm, err := db.GetManager().TenantServiceVolumeDao().GetVolumeByServiceIDAndName(tsr.DependServiceID, tsr.VolumeName)
			if err != nil {
				return util.CreateAPIHandleErrorFromDBError("get volume", err)
			}
			tsr.HostPath = vm.HostPath
			if err := db.GetManager().TenantServiceMountRelationDao().AddModel(tsr); err != nil {
				return util.CreateAPIHandleErrorFromDBError("add volume mount relation", err)
			}
		} else {
			if tsr.HostPath == "" {
				return util.CreateAPIHandleError(400, fmt.Errorf("host path can not be empty when create volume dependency in api v2"))
			}
			if err := db.GetManager().TenantServiceMountRelationDao().AddModel(tsr); err != nil {
				return util.CreateAPIHandleErrorFromDBError("add volume mount relation", err)
			}
		}
	case "delete":
		if tsr.VolumeName != "" {
			if err := db.GetManager().TenantServiceMountRelationDao().DElTenantServiceMountRelationByServiceAndName(tsr.ServiceID, tsr.VolumeName); err != nil {
				return util.CreateAPIHandleErrorFromDBError("delete mount relation", err)
			}
		} else {
			if err := db.GetManager().TenantServiceMountRelationDao().DElTenantServiceMountRelationByDepService(tsr.ServiceID, tsr.DependServiceID); err != nil {
				return util.CreateAPIHandleErrorFromDBError("delete mount relation", err)
			}
		}
	}
	return nil
}

// GetDepVolumes 获取依赖存储
func (s *ServiceAction) GetDepVolumes(serviceID string) ([]*dbmodel.TenantServiceMountRelation, *util.APIHandleError) {
	dbManager := db.GetManager()
	mounts, err := dbManager.TenantServiceMountRelationDao().GetTenantServiceMountRelationsByService(serviceID)
	if err != nil {
		return nil, util.CreateAPIHandleErrorFromDBError("get dep volume", err)
	}
	return mounts, nil
}

// ServiceProbe ServiceProbe
func (s *ServiceAction) ServiceProbe(tsp *dbmodel.TenantServiceProbe, action string) error {
	switch action {
	case "add":
		if err := db.GetManager().ServiceProbeDao().AddModel(tsp); err != nil {
			return err
		}
	case "update":
		if err := db.GetManager().ServiceProbeDao().UpdateModel(tsp); err != nil {
			return err
		}
	case "delete":
		if err := db.GetManager().ServiceProbeDao().DeleteModel(tsp.ServiceID, tsp.ProbeID); err != nil {
			return err
		}
	}
	return nil
}

// RollBack RollBack
func (s *ServiceAction) RollBack(rs *apimodel.RollbackStruct) error {
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(rs.ServiceID)
	if err != nil {
		return err
	}
	oldDeployVersion := service.DeployVersion
	if service.DeployVersion == rs.DeployVersion {
		return fmt.Errorf("current version is %v, don't need rollback", rs.DeployVersion)
	}
	service.DeployVersion = rs.DeployVersion
	if err := db.GetManager().TenantServiceDao().UpdateModel(service); err != nil {
		return err
	}
	//发送重启消息到MQ
	startStopStruct := &apimodel.StartStopStruct{
		TenantID:  rs.TenantID,
		ServiceID: rs.ServiceID,
		EventID:   rs.EventID,
		TaskType:  "rolling_upgrade",
	}
	if err := GetServiceManager().StartStopService(startStopStruct); err != nil {
		// rollback
		service.DeployVersion = oldDeployVersion
		if err := db.GetManager().TenantServiceDao().UpdateModel(service); err != nil {
			logrus.Warningf("error deploy version rollback: %v", err)
		}
		return err
	}
	return nil
}

// GetStatus GetStatus
func (s *ServiceAction) GetStatus(serviceID string) (*apimodel.StatusList, error) {
	services, errS := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if errS != nil {
		return nil, errS
	}
	sl := &apimodel.StatusList{
		TenantID:      services.TenantID,
		ServiceID:     serviceID,
		ServiceAlias:  services.ServiceAlias,
		DeployVersion: services.DeployVersion,
		Replicas:      services.Replicas,
		ContainerMem:  services.ContainerMemory,
		ContainerCPU:  services.ContainerCPU,
		CurStatus:     services.CurStatus,
		StatusCN:      TransStatus(services.CurStatus),
	}
	// kubeblocks_component 的 workload 由 KubeBlocks 管理,Rainbond 不读取其运行状态
	if services.IsKubeBlocksComponent() {
		return sl, nil
	}
	status := s.statusCli.GetStatus(serviceID)
	if status != "" {
		sl.CurStatus = status
		sl.StatusCN = TransStatus(status)
	}
	di, err := s.statusCli.GetServiceDeployInfo(serviceID)
	if err != nil {
		logrus.Warningf("service id: %s; failed to get deploy info: %v", serviceID, err)
	} else {
		sl.StartTime = di.GetStartTime()
	}
	return sl, nil
}

// GetServicesStatus  获取一组应用状态,若 serviceIDs为空,获取租户所有应用状态
func (s *ServiceAction) GetServicesStatus(tenantID string, serviceIDs []string) []map[string]interface{} {
	if len(serviceIDs) == 0 {
		services, _ := db.GetManager().TenantServiceDao().GetServicesByTenantID(tenantID)
		for _, s := range services {
			serviceIDs = append(serviceIDs, s.ServiceID)
		}
	}
	if len(serviceIDs) == 0 {
		return []map[string]interface{}{}
	}
	statusList := s.statusCli.GetStatuss(strings.Join(serviceIDs, ","))
	var info = make([]map[string]interface{}, 0)
	for k, v := range statusList {
		serviceInfo := map[string]interface{}{"service_id": k, "status": v, "status_cn": TransStatus(v), "used_mem": 0}
		info = append(info, serviceInfo)
	}
	return info
}

// GetEnterpriseServicesStatus get services
func (s *ServiceAction) GetEnterpriseServicesStatus(enterpriseID string) (map[string]string, *util.APIHandleError) {
	var tenantIDs []string
	tenants, err := db.GetManager().EnterpriseDao().GetEnterpriseTenants(enterpriseID)
	if err != nil {
		logrus.Errorf("list tenant failed: %s", err.Error())
		return nil, util.CreateAPIHandleErrorFromDBError(fmt.Sprintf("enterprise[%s] get tenant failed", enterpriseID), err)
	}
	if len(tenants) == 0 {
		return nil, util.CreateAPIHandleErrorf(400, "enterprise[%s] has not tenants", enterpriseID)
	}
	for _, tenant := range tenants {
		tenantIDs = append(tenantIDs, tenant.UUID)
	}
	services, err := db.GetManager().TenantServiceDao().GetServicesByTenantIDs(tenantIDs)
	if err != nil {
		logrus.Errorf("list tenants service failed: %s", err.Error())
		return nil, util.CreateAPIHandleErrorf(500, "get enterprise[%s] service failed: %s", enterpriseID, err.Error())
	}
	var serviceIDs []string
	for _, svc := range services {
		serviceIDs = append(serviceIDs, svc.ServiceID)
	}
	statusList := s.statusCli.GetStatuss(strings.Join(serviceIDs, ","))
	return statusList, nil
}

// CreateTenant create tenant
func (s *ServiceAction) CreateTenant(t *dbmodel.Tenants, bindExisting bool) error {
	tenant, _ := db.GetManager().TenantDao().GetTenantIDByName(t.Name)
	if tenant != nil {
		return fmt.Errorf("tenant name %s is exist", t.Name)
	}
	labels := map[string]string{
		constants.ResourceManagedByLabel: constants.Rainbond,
	}
	return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
		if err := db.GetManager().TenantDaoTransactions(tx).AddModel(t); err != nil {
			if !strings.HasSuffix(err.Error(), "is exist") {
				return err
			}
		}
		if t.Namespace == "default" {
			ns, err := s.kubeClient.CoreV1().Namespaces().Get(context.Background(), t.Namespace, metav1.GetOptions{})
			if err != nil && !k8sErrors.IsNotFound(err) {
				return err
			}
			if err == nil {
				ns.Labels = labels
				_, err = s.kubeClient.CoreV1().Namespaces().Update(context.Background(), ns, metav1.UpdateOptions{})
				return err
			}
		}
		if _, err := s.kubeClient.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{
			ObjectMeta: metav1.ObjectMeta{
				Name:   t.Namespace,
				Labels: labels,
			},
		}, metav1.CreateOptions{}); err != nil {
			if k8sErrors.IsAlreadyExists(err) {
				if bindExisting {
					// Bind mode: patch managed-by label onto existing namespace
					// Intentionally skip NetworkPolicy and RBAC blocks below
					ns, getErr := s.kubeClient.CoreV1().Namespaces().Get(context.Background(), t.Namespace, metav1.GetOptions{})
					if getErr != nil {
						return getErr
					}
					if ns.Labels == nil {
						ns.Labels = make(map[string]string)
					}
					ns.Labels[constants.ResourceManagedByLabel] = constants.Rainbond
					_, updateErr := s.kubeClient.CoreV1().Namespaces().Update(context.Background(), ns, metav1.UpdateOptions{})
					return updateErr
				}
				return bcode.ErrNamespaceExists
			}
			return err
		}

		if os.Getenv("USE_SAAS") == "true" {
			// 创建简化版 NetworkPolicy
			networkPolicy := &networkingv1.NetworkPolicy{
				ObjectMeta: metav1.ObjectMeta{
					Name:      t.Namespace,
					Namespace: t.Namespace, // 使用创建的命名空间
				},
				Spec: networkingv1.NetworkPolicySpec{
					PodSelector: metav1.LabelSelector{}, // 选择所有Pod
					Ingress: []networkingv1.NetworkPolicyIngressRule{
						{
							From: []networkingv1.NetworkPolicyPeer{
								{
									PodSelector: &metav1.LabelSelector{}, // 允许所有Pod的流量
								},
							},
						},
					},
					PolicyTypes: []networkingv1.PolicyType{
						networkingv1.PolicyTypeIngress,
					},
				},
			}

			// 创建 NetworkPolicy
			if _, err := s.kubeClient.NetworkingV1().NetworkPolicies(t.Namespace).Create(context.Background(), networkPolicy, metav1.CreateOptions{}); err != nil {
				return err
			}
		}

		// Create RBAC resources for rbd-plugins namespace
		if t.Namespace == "rbd-plugins" {
			if err := s.createPluginTeamRBAC(t.Namespace); err != nil {
				return err
			}
		}

		return nil
	})
}

// createPluginTeamRBAC creates ClusterRole and ClusterRoleBinding for rbd-plugins namespace
func (s *ServiceAction) createPluginTeamRBAC(namespace string) error {
	ctx := context.Background()

	// Create ClusterRole
	clusterRole := &rbacv1.ClusterRole{
		ObjectMeta: metav1.ObjectMeta{
			Name: "default-cluster-admin",
			Labels: map[string]string{
				"created-by": "rainbond",
			},
		},
		Rules: []rbacv1.PolicyRule{
			{
				APIGroups: []string{"*"},
				Resources: []string{"*"},
				Verbs:     []string{"*"},
			},
		},
	}

	_, err := s.kubeClient.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{})
	if err != nil && !k8sErrors.IsAlreadyExists(err) {
		return fmt.Errorf("failed to create ClusterRole: %v", err)
	}

	// Create ClusterRoleBinding
	clusterRoleBinding := &rbacv1.ClusterRoleBinding{
		ObjectMeta: metav1.ObjectMeta{
			Name: "default-cluster-admin-binding",
			Labels: map[string]string{
				"created-by": "rainbond",
			},
		},
		RoleRef: rbacv1.RoleRef{
			APIGroup: "rbac.authorization.k8s.io",
			Kind:     "ClusterRole",
			Name:     "default-cluster-admin",
		},
		Subjects: []rbacv1.Subject{
			{
				Kind:      "ServiceAccount",
				Name:      "default",
				Namespace: namespace,
			},
		},
	}

	_, err = s.kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{})
	if err != nil && !k8sErrors.IsAlreadyExists(err) {
		return fmt.Errorf("failed to create ClusterRoleBinding: %v", err)
	}

	logrus.Infof("Successfully created RBAC resources for namespace %s", namespace)
	return nil
}

// CreateTenandIDAndName create tenant_id and tenant_name
func (s *ServiceAction) CreateTenandIDAndName(eid string) (string, string, error) {
	id := uuid.New().String()
	uid := strings.Replace(id, "-", "", -1)
	name := strings.Split(id, "-")[0]
	logrus.Debugf("uuid is %v, name is %v", uid, name)
	return uid, name, nil
}

// K8sPodInfos -
type K8sPodInfos struct {
	NewPods []*K8sPodInfo `json:"new_pods"`
	OldPods []*K8sPodInfo `json:"old_pods"`
}

// K8sPodInfo for api
type K8sPodInfo struct {
	PodName   string                       `json:"pod_name"`
	PodIP     string                       `json:"pod_ip"`
	PodStatus string                       `json:"pod_status"`
	ServiceID string                       `json:"service_id"`
	Container map[string]map[string]string `json:"container"`
}

// GetPods get pods
func (s *ServiceAction) GetPods(serviceID string) (*K8sPodInfos, error) {
	// kubeblocks_component 的 workload 由 KubeBlocks 管理,Rainbond 不查询其 Pods
	if svc, _ := db.GetManager().TenantServiceDao().GetServiceByID(serviceID); svc != nil && svc.IsKubeBlocksComponent() {
		return &K8sPodInfos{NewPods: []*K8sPodInfo{}, OldPods: []*K8sPodInfo{}}, nil
	}
	pods, err := s.statusCli.GetServicePods(serviceID)
	if err != nil && !strings.Contains(err.Error(), server.ErrAppServiceNotFound.Error()) &&
		!strings.Contains(err.Error(), server.ErrPodNotFound.Error()) {
		logrus.Error("GetPodByService Error:", err)
		return nil, err
	}
	if pods == nil {
		return nil, nil
	}
	convpod := func(pods []*pb.ServiceAppPod) []*K8sPodInfo {
		var podsInfoList []*K8sPodInfo
		var podNames []string
		for _, v := range pods {
			var podInfo K8sPodInfo
			podInfo.PodName = v.PodName
			podInfo.PodIP = v.PodIp
			podInfo.PodStatus = v.PodStatus
			podInfo.ServiceID = serviceID
			containerInfos := make(map[string]map[string]string, 10)
			for _, container := range v.Containers {
				containerInfos[container.ContainerName] = map[string]string{
					"memory_limit": fmt.Sprintf("%d", container.MemoryLimit),
					"memory_usage": "0",
				}
			}
			podInfo.Container = containerInfos
			podNames = append(podNames, v.PodName)
			podsInfoList = append(podsInfoList, &podInfo)
		}
		containerMemInfo, _ := s.GetPodContainerMemory(podNames)
		for _, c := range podsInfoList {
			for k := range c.Container {
				if info, exist := containerMemInfo[c.PodName][k]; exist {
					c.Container[k]["memory_usage"] = info
				}
			}
		}
		return podsInfoList
	}
	newpods := convpod(pods.NewPods)
	oldpods := convpod(pods.OldPods)
	return &K8sPodInfos{
		NewPods: newpods,
		OldPods: oldpods,
	}, nil
}

// GetMultiServicePods get pods
func (s *ServiceAction) GetMultiServicePods(serviceIDs []string) (*K8sPodInfos, error) {
	mpods, err := s.statusCli.GetMultiServicePods(serviceIDs)
	if err != nil && !strings.Contains(err.Error(), server.ErrAppServiceNotFound.Error()) &&
		!strings.Contains(err.Error(), server.ErrPodNotFound.Error()) {
		logrus.Error("GetPodByService Error:", err)
		return nil, err
	}
	if mpods == nil {
		return nil, nil
	}
	convpod := func(serviceID string, pods []*pb.ServiceAppPod) []*K8sPodInfo {
		var podsInfoList []*K8sPodInfo
		for _, v := range pods {
			var podInfo K8sPodInfo
			podInfo.PodName = v.PodName
			podInfo.PodIP = v.PodIp
			podInfo.PodStatus = v.PodStatus
			podInfo.ServiceID = serviceID
			podsInfoList = append(podsInfoList, &podInfo)
		}
		return podsInfoList
	}
	var re K8sPodInfos
	for serviceID, pods := range mpods.ServicePods {
		if pods != nil {
			re.NewPods = append(re.NewPods, convpod(serviceID, pods.NewPods)...)
			re.OldPods = append(re.OldPods, convpod(serviceID, pods.OldPods)...)
		}
	}
	return &re, nil
}

// GetComponentPodNums get pods
func (s *ServiceAction) GetComponentPodNums(ctx context.Context, componentIDs []string) (map[string]int32, error) {
	if logrus.IsLevelEnabled(logrus.DebugLevel) {
		defer core_util.Elapsed(fmt.Sprintf("[AppRuntimeSyncClient] [GetComponentPodNums] component nums: %d", len(componentIDs)))()
	}

	podNums, err := s.statusCli.GetComponentPodNums(ctx, componentIDs)
	if err != nil {
		return nil, errors.Wrap(err, "get component nums")
	}

	return podNums, nil
}

// GetPodContainerMemory Use Prometheus to query memory resources
func (s *ServiceAction) GetPodContainerMemory(podNames []string) (map[string]map[string]string, error) {
	memoryUsageMap := make(map[string]map[string]string, 10)
	queryName := strings.Join(podNames, "|")
	query := fmt.Sprintf(`container_memory_rss{pod=~"%s"}`, queryName)
	metric := s.prometheusCli.GetMetric(query, time.Now())

	for _, re := range metric.MetricData.MetricValues {
		var containerName = re.Metadata["container"]
		var podName = re.Metadata["pod"]
		var valuesBytes string
		if re.Sample != nil {
			valuesBytes = fmt.Sprintf("%d", int(re.Sample.Value()))
		}
		if _, ok := memoryUsageMap[podName]; ok {
			memoryUsageMap[podName][containerName] = valuesBytes
		} else {
			memoryUsageMap[podName] = map[string]string{
				containerName: valuesBytes,
			}
		}
	}
	return memoryUsageMap, nil
}

// TransServieToDelete trans service info to delete table
func (s *ServiceAction) TransServieToDelete(ctx context.Context, tenantID, serviceID string) error {
	component, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil && gorm.ErrRecordNotFound == err {
		logrus.Infof("service[%s] of tenant[%s] do not exist, ignore it", serviceID, tenantID)
		return nil
	}

	body, err := s.gcTaskBody(tenantID, serviceID, component.ServiceAlias)
	if err != nil {
		return fmt.Errorf("GC task body: %v", err)
	}

	// let rbd-chaos remove related persistent data
	logrus.Info("let rbd-chaos remove related persistent data")
	topic := gclient.WorkerTopic
	if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		Topic:    topic,
		TaskType: "service_gc",
		TaskBody: body,
	}); err != nil {
		logrus.Warningf("send gc task: %v", err)
	}

	if err := s.delServiceMetadata(ctx, serviceID); err != nil {
		return fmt.Errorf("delete service-related metadata: %v", err)
	}

	return nil
}

// isServiceClosed checks if the service has been closed according to the serviceID.
func (s *ServiceAction) isServiceClosed(serviceID string) error {
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		return err
	}
	status := s.statusCli.GetStatus(serviceID)
	if service.Kind != dbmodel.ServiceKindThirdParty.String() {
		if !s.statusCli.IsClosedStatus(status) {
			return ErrServiceNotClosed
		}
	}
	return nil
}

func (s *ServiceAction) deleteComponent(tx *gorm.DB, service *dbmodel.TenantServices) error {
	delService := service.ChangeDelete()
	delService.ID = 0
	if err := db.GetManager().TenantServiceDeleteDaoTransactions(tx).AddModel(delService); err != nil {
		return err
	}
	var deleteServicePropertyFunc = []func(serviceID string) error{
		db.GetManager().CodeCheckResultDaoTransactions(tx).DeleteByServiceID,
		db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DELServiceEnvsByServiceID,
		db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeletePluginConfigByServiceID,
		db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteALLRelationByServiceID,
		db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteAllPluginMappingPortByServiceID,
		db.GetManager().TenantServiceDaoTransactions(tx).DeleteServiceByServiceID,
		db.GetManager().TenantServicesPortDaoTransactions(tx).DELPortsByServiceID,
		db.GetManager().TenantServiceRelationDaoTransactions(tx).DELRelationsByServiceID,
		db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DELTenantServiceMountRelationByServiceID,
		db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteTenantServiceVolumesByServiceID,
		db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByServiceID,
		db.GetManager().EndpointsDaoTransactions(tx).DeleteByServiceID,
		db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByServiceID,
		db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteLabelByServiceID,
		db.GetManager().VersionInfoDaoTransactions(tx).DeleteVersionByServiceID,
		db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteEnvByServiceID,
		db.GetManager().ServiceProbeDaoTransactions(tx).DELServiceProbesByServiceID,
		db.GetManager().ServiceEventDaoTransactions(tx).DelEventByServiceID,
		db.GetManager().TenantServiceMonitorDaoTransactions(tx).DeleteServiceMonitorByServiceID,
		db.GetManager().AppConfigGroupServiceDaoTransactions(tx).DeleteEffectiveServiceByServiceID,
	}
	if err := GetGatewayHandler().DeleteTCPRuleByServiceIDWithTransaction(service.ServiceID, tx); err != nil {
		return err
	}
	if err := GetGatewayHandler().DeleteHTTPRuleByServiceIDWithTransaction(service.ServiceID, tx); err != nil {
		return err
	}
	for _, del := range deleteServicePropertyFunc {
		if err := del(service.ServiceID); err != nil {
			if err != gorm.ErrRecordNotFound {
				return err
			}
		}
	}
	return nil
}

// delServiceMetadata deletes service-related metadata in the database.
func (s *ServiceAction) delServiceMetadata(ctx context.Context, serviceID string) error {
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		return err
	}
	if db.GetManager().DB().Dialect().GetName() == "sqlite3" {
		if err := s.deleteThirdComponent(ctx, service); err != nil {
			return err
		}
		return s.deleteComponent(db.GetManager().DB(), service)
	}
	logrus.Infof("delete service %s %s", serviceID, service.ServiceAlias)
	return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
		if err := s.deleteThirdComponent(ctx, service); err != nil {
			return err
		}
		return s.deleteComponent(tx, service)
	})
}

func (s *ServiceAction) deleteThirdComponent(ctx context.Context, component *dbmodel.TenantServices) error {
	if component.Kind != "third_party" {
		return nil
	}
	tenant, err := db.GetManager().TenantDao().GetTenantByUUID(component.TenantID)
	if err != nil {
		return err
	}
	thirdPartySvcDiscoveryCfg, err := db.GetManager().ThirdPartySvcDiscoveryCfgDao().GetByServiceID(component.ServiceID)
	if err != nil {
		return err
	}
	if thirdPartySvcDiscoveryCfg == nil {
		return nil
	}
	if thirdPartySvcDiscoveryCfg.Type != string(dbmodel.DiscorveryTypeKubernetes) {
		return nil
	}

	newCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
	defer cancel()

	err = s.rainbondClient.RainbondV1alpha1().ThirdComponents(tenant.Namespace).Delete(newCtx, component.ServiceID, metav1.DeleteOptions{})
	if err != nil && !k8sErrors.IsNotFound(err) {
		return err
	}
	return nil
}

func (s *ServiceAction) gcTaskBody(tenantID, serviceID, serviceAlias string) (map[string]interface{}, error) {
	events, err := db.GetManager().ServiceEventDao().ListByTargetID(serviceID)
	if err != nil {
		logrus.Errorf("list events based on serviceID: %v", err)
	}
	var eventIDs []string
	for _, event := range events {
		eventIDs = append(eventIDs, event.EventID)
	}

	return map[string]interface{}{
		"tenant_id":     tenantID,
		"service_id":    serviceID,
		"event_ids":     eventIDs,
		"service_alias": serviceAlias,
	}, nil
}

// GetServiceDeployInfo get service deploy info
func (s *ServiceAction) GetServiceDeployInfo(tenantID, serviceID string) (*pb.DeployInfo, *util.APIHandleError) {
	info, err := s.statusCli.GetServiceDeployInfo(serviceID)
	if err != nil {
		return nil, util.CreateAPIHandleError(500, err)
	}
	return info, nil
}

// ListVersionInfo lists version info
func (s *ServiceAction) ListVersionInfo(serviceID string) (*apimodel.BuildListRespVO, error) {
	versionInfos, err := db.GetManager().VersionInfoDao().GetAllVersionByServiceID(serviceID)
	if err != nil && err != gorm.ErrRecordNotFound {
		logrus.Errorf("error getting all version by service id: %v", err)
		return nil, fmt.Errorf("error getting all version by service id: %v", err)
	}
	svc, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		logrus.Errorf("error getting service by uuid: %v", err)
		return nil, fmt.Errorf("error getting service by uuid: %v", err)
	}
	b, err := json.Marshal(versionInfos)
	if err != nil {
		return nil, fmt.Errorf("error marshaling version infos: %v", err)
	}
	var bversions []*apimodel.BuildVersion
	if err := json.Unmarshal(b, &bversions); err != nil {
		return nil, fmt.Errorf("error unmarshaling version infos: %v", err)
	}
	for idx := range bversions {
		bv := bversions[idx]
		if bv.Kind == "build_from_image" || bv.Kind == "build_from_market_image" {
			image := parser.ParseImageName(bv.RepoURL)
			bv.ImageDomain = image.GetDomain()
			bv.ImageRepo = image.GetRepostory()
			bv.ImageTag = image.GetTag()
		}
	}
	result := &apimodel.BuildListRespVO{
		DeployVersion: svc.DeployVersion,
		List:          bversions,
	}
	return result, nil
}

// EventBuildVersion -
func (s *ServiceAction) EventBuildVersion(serviceID, buildVersion string) (*apimodel.BuildListRespVO, error) {
	versionInfo, err := db.GetManager().VersionInfoDao().GetVersionByDeployVersion(buildVersion, serviceID)
	if err != nil && err != gorm.ErrRecordNotFound {
		logrus.Errorf("error getting all version by service id: %v", err)
		return nil, fmt.Errorf("error getting all version by service id: %v", err)
	}
	b, err := json.Marshal(versionInfo)
	if err != nil {
		return nil, fmt.Errorf("error marshaling version infos: %v", err)
	}
	var bversion *apimodel.BuildVersion
	if err := json.Unmarshal(b, &bversion); err != nil {
		return nil, fmt.Errorf("error unmarshaling version infos: %v", err)
	}

	result := &apimodel.BuildListRespVO{
		DeployVersion: buildVersion,
		List:          bversion,
	}
	return result, nil
}

// AddAutoscalerRule -
func (s *ServiceAction) AddAutoscalerRule(req *apimodel.AutoscalerRuleReq) error {
	tx := db.GetManager().Begin()
	defer db.GetManager().EnsureEndTransactionFunc()

	r := &dbmodel.TenantServiceAutoscalerRules{
		RuleID:      req.RuleID,
		ServiceID:   req.ServiceID,
		Enable:      req.Enable,
		XPAType:     req.XPAType,
		MinReplicas: req.MinReplicas,
		MaxReplicas: req.MaxReplicas,
	}
	if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).AddModel(r); err != nil {
		tx.Rollback()
		return err
	}

	for _, metric := range req.Metrics {
		m := &dbmodel.TenantServiceAutoscalerRuleMetrics{
			RuleID:            req.RuleID,
			MetricsType:       metric.MetricsType,
			MetricsName:       metric.MetricsName,
			MetricTargetType:  metric.MetricTargetType,
			MetricTargetValue: metric.MetricTargetValue,
		}
		if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).AddModel(m); err != nil {
			tx.Rollback()
			return err
		}
	}

	taskbody := map[string]interface{}{
		"service_id": r.ServiceID,
		"rule_id":    r.RuleID,
	}
	if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: "refreshhpa",
		TaskBody: taskbody,
		Topic:    gclient.WorkerTopic,
	}); err != nil {
		logrus.Errorf("send 'refreshhpa' task: %v", err)
		return err
	}
	logrus.Infof("rule id: %s; successfully send 'refreshhpa' task.", r.RuleID)

	return tx.Commit().Error
}

// UpdAutoscalerRule -
func (s *ServiceAction) UpdAutoscalerRule(req *apimodel.AutoscalerRuleReq) error {
	rule, err := db.GetManager().TenantServceAutoscalerRulesDao().GetByRuleID(req.RuleID)
	if err != nil {
		return err
	}

	rule.Enable = req.Enable
	rule.XPAType = req.XPAType
	rule.MinReplicas = req.MinReplicas
	rule.MaxReplicas = req.MaxReplicas

	tx := db.GetManager().Begin()
	defer db.GetManager().EnsureEndTransactionFunc()

	if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).UpdateModel(rule); err != nil {
		tx.Rollback()
		return err
	}

	// delete metrics
	if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).DeleteByRuleID(req.RuleID); err != nil {
		tx.Rollback()
		return err
	}

	for _, metric := range req.Metrics {
		m := &dbmodel.TenantServiceAutoscalerRuleMetrics{
			RuleID:            req.RuleID,
			MetricsType:       metric.MetricsType,
			MetricsName:       metric.MetricsName,
			MetricTargetType:  metric.MetricTargetType,
			MetricTargetValue: metric.MetricTargetValue,
		}
		if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).AddModel(m); err != nil {
			tx.Rollback()
			return err
		}
	}

	taskbody := map[string]interface{}{
		"service_id": rule.ServiceID,
		"rule_id":    rule.RuleID,
	}
	if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{
		TaskType: "refreshhpa",
		TaskBody: taskbody,
		Topic:    gclient.WorkerTopic,
	}); err != nil {
		logrus.Errorf("send 'refreshhpa' task: %v", err)
		return err
	}
	logrus.Infof("rule id: %s; successfully send 'refreshhpa' task.", rule.RuleID)

	return tx.Commit().Error
}

// ListScalingRecords -
func (s *ServiceAction) ListScalingRecords(serviceID string, page, pageSize int) ([]*dbmodel.TenantServiceScalingRecords, int, error) {
	records, err := db.GetManager().TenantServiceScalingRecordsDao().ListByServiceID(serviceID, (page-1)*pageSize, pageSize)
	if err != nil {
		return nil, 0, err
	}

	count, err := db.GetManager().TenantServiceScalingRecordsDao().CountByServiceID(serviceID)
	if err != nil {
		return nil, 0, err
	}

	return records, count, nil
}

// SyncComponentBase -
func (s *ServiceAction) SyncComponentBase(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs []string
		dbComponents []*dbmodel.TenantServices
	)
	for _, component := range components {
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
	}
	oldComponents, err := db.GetManager().TenantServiceDao().GetServiceByIDs(componentIDs)
	if err != nil {
		return err
	}
	existComponents := make(map[string]*dbmodel.TenantServices)
	for _, oc := range oldComponents {
		existComponents[oc.ServiceID] = oc
	}
	for _, component := range components {
		var deployVersion string
		if oldComponent, ok := existComponents[component.ComponentBase.ComponentID]; ok {
			deployVersion = oldComponent.DeployVersion
		}
		dbComponents = append(dbComponents, component.ComponentBase.DbModel(app.TenantID, app.AppID, deployVersion))
	}
	if err := db.GetManager().TenantServiceDaoTransactions(tx).DeleteByComponentIDs(app.TenantID, app.AppID, componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceDaoTransactions(tx).CreateOrUpdateComponentsInBatch(dbComponents)
}

// SyncComponentRelations -
func (s *ServiceAction) SyncComponentRelations(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs []string
		relations    []*dbmodel.TenantServiceRelation
	)
	for _, component := range components {
		if component.Relations == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, relation := range component.Relations {
			relations = append(relations, relation.DbModel(app.TenantID, component.ComponentBase.ComponentID))
		}
	}
	if err := db.GetManager().TenantServiceRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceRelationDaoTransactions(tx).CreateOrUpdateRelationsInBatch(relations)
}

// SyncComponentEnvs -
func (s *ServiceAction) SyncComponentEnvs(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs []string
		envs         []*dbmodel.TenantServiceEnvVar
	)
	for _, component := range components {
		if component.Envs == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, env := range component.Envs {
			envs = append(envs, env.DbModel(app.TenantID, component.ComponentBase.ComponentID))
		}
	}
	if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceEnvVarDaoTransactions(tx).CreateOrUpdateEnvsInBatch(envs)
}

// SyncComponentVolumeRels -
func (s *ServiceAction) SyncComponentVolumeRels(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs []string
		volRels      []*dbmodel.TenantServiceMountRelation
	)
	// Get the storage of all components under the application
	appComponents, err := db.GetManager().TenantServiceDao().ListByAppID(app.AppID)
	if err != nil {
		return err
	}
	var appComponentIDs []string
	for _, ac := range appComponents {
		appComponentIDs = append(appComponentIDs, ac.ServiceID)
	}
	existVolume, err := s.getExistVolumes(appComponentIDs)
	if err != nil {
		return err
	}
	// Get the storage that needs to be newly created
	for _, component := range components {
		componentID := component.ComponentBase.ComponentID
		if component.Volumes == nil {
			continue
		}
		for _, vol := range component.Volumes {
			if _, ok := existVolume[vol.Key(componentID)]; !ok {
				existVolume[vol.Key(componentID)] = vol.DbModel(componentID)
			}
		}
	}

	for _, component := range components {
		if component.VolumeRelations == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		//The hostpath attribute should not be recorded in the mount relationship table,
		//and should be processed when the worker takes effect
		for _, volumeRelation := range component.VolumeRelations {
			if vol, ok := existVolume[volumeRelation.Key()]; ok {
				volRels = append(volRels, volumeRelation.DbModel(app.TenantID, component.ComponentBase.ComponentID, vol.HostPath, vol.VolumeType))
			}
		}
	}
	if err := db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceMountRelationDaoTransactions(tx).CreateOrUpdateVolumeRelsInBatch(volRels)
}

// SyncComponentVolumes -
func (s *ServiceAction) SyncComponentVolumes(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs []string
		volumes      []*dbmodel.TenantServiceVolume
	)
	for _, component := range components {
		if component.Volumes == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, volume := range component.Volumes {
			volumes = append(volumes, volume.DbModel(component.ComponentBase.ComponentID))
		}
	}
	existVolumes, err := s.getExistVolumes(componentIDs)
	if err != nil {
		return err
	}
	deleteVolumeIDs := s.getDeleteVolumeIDs(existVolumes, volumes)
	createOrUpdates := s.getCreateOrUpdateVolumes(existVolumes, volumes)
	if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByVolumeIDs(deleteVolumeIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceVolumeDaoTransactions(tx).CreateOrUpdateVolumesInBatch(createOrUpdates)
}

func (s *ServiceAction) getExistVolumes(componentIDs []string) (existVolumes map[string]*dbmodel.TenantServiceVolume, err error) {
	existVolumes = make(map[string]*dbmodel.TenantServiceVolume)
	volumes, err := db.GetManager().TenantServiceVolumeDao().ListVolumesByComponentIDs(componentIDs)
	if err != nil {
		return nil, err
	}
	for _, volume := range volumes {
		existVolumes[volume.Key()] = volume
	}
	return existVolumes, nil
}

func (s *ServiceAction) getCreateOrUpdateVolumes(existVolumes map[string]*dbmodel.TenantServiceVolume, incomeVolumes []*dbmodel.TenantServiceVolume) (volumes []*dbmodel.TenantServiceVolume) {
	for _, incomeVolume := range incomeVolumes {
		if _, ok := existVolumes[incomeVolume.Key()]; ok {
			incomeVolume.ID = existVolumes[incomeVolume.Key()].ID
		}
		volumes = append(volumes, incomeVolume)
	}
	return volumes
}

func (s *ServiceAction) getDeleteVolumeIDs(existVolumes map[string]*dbmodel.TenantServiceVolume, incomeVolumes []*dbmodel.TenantServiceVolume) (deleteVolumeIDs []uint) {
	newVolumes := make(map[string]struct{})
	for _, volume := range incomeVolumes {
		newVolumes[volume.Key()] = struct{}{}
	}
	for existKey, existVolume := range existVolumes {
		if _, ok := newVolumes[existKey]; !ok {
			deleteVolumeIDs = append(deleteVolumeIDs, existVolume.ID)
		}
	}
	return deleteVolumeIDs
}

// SyncComponentConfigFiles -
func (s *ServiceAction) SyncComponentConfigFiles(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs []string
		configFiles  []*dbmodel.TenantServiceConfigFile
	)
	for _, component := range components {
		if component.ConfigFiles == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, configFile := range component.ConfigFiles {
			configFiles = append(configFiles, configFile.DbModel(component.ComponentBase.ComponentID))
		}
	}
	if err := db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceConfigFileDaoTransactions(tx).CreateOrUpdateConfigFilesInBatch(configFiles)
}

// SyncComponentProbes -
func (s *ServiceAction) SyncComponentProbes(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs []string
		probes       []*dbmodel.TenantServiceProbe
	)
	for _, component := range components {
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		modes := make(map[string]struct{})
		for _, probe := range component.Probes {
			_, ok := modes[probe.Mode]
			if ok {
				continue
			}
			probes = append(probes, probe.DbModel(component.ComponentBase.ComponentID))
			modes[probe.Mode] = struct{}{}
		}
	}
	if err := db.GetManager().ServiceProbeDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().ServiceProbeDaoTransactions(tx).CreateOrUpdateProbesInBatch(probes)
}

// SyncComponentLabels -
func (s *ServiceAction) SyncComponentLabels(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs []string
		labels       []*dbmodel.TenantServiceLable
	)
	for _, component := range components {
		if component.Labels == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, label := range component.Labels {
			labels = append(labels, label.DbModel(component.ComponentBase.ComponentID))
		}
	}
	if err := db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().TenantServiceLabelDaoTransactions(tx).CreateOrUpdateLabelsInBatch(labels)
}

// SyncComponentPlugins -
func (s *ServiceAction) SyncComponentPlugins(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs           []string
		portConfigComponentIDs []string
		envComponentIDs        []string
		pluginRelations        []*dbmodel.TenantServicePluginRelation
		pluginVersionEnvs      []*dbmodel.TenantPluginVersionEnv
		pluginVersionConfigs   []*dbmodel.TenantPluginVersionDiscoverConfig
		pluginStreamPorts      []*dbmodel.TenantServicesStreamPluginPort
	)
	for _, component := range components {
		if component.Plugins == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, plugin := range component.Plugins {
			pluginRelations = append(pluginRelations, plugin.DbModel(component.ComponentBase.ComponentID))
			if plugin.ConfigEnvs.NormalEnvs != nil {
				envComponentIDs = append(envComponentIDs, component.ComponentBase.ComponentID)
				for _, versionEnv := range plugin.ConfigEnvs.NormalEnvs {
					pluginVersionEnvs = append(pluginVersionEnvs, versionEnv.DbModel(component.ComponentBase.ComponentID, plugin.PluginID))
				}
			}

			if configs := plugin.ConfigEnvs.ComplexEnvs; configs != nil {
				portConfigComponentIDs = append(portConfigComponentIDs, component.ComponentBase.ComponentID)
				if configs.BasePorts != nil && checkPluginHaveInbound(plugin.PluginModel) {
					psPorts := s.handlePluginMappingPort(app.TenantID, component.ComponentBase.ComponentID, plugin.PluginModel, configs.BasePorts)
					pluginStreamPorts = append(pluginStreamPorts, psPorts...)
				}
				config, err := ffjson.Marshal(configs)
				if err != nil {
					return err
				}
				pluginVersionConfigs = append(pluginVersionConfigs, &dbmodel.TenantPluginVersionDiscoverConfig{
					PluginID:  plugin.PluginID,
					ServiceID: component.ComponentBase.ComponentID,
					ConfigStr: string(config),
				})
			}
		}
	}

	if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteByComponentIDs(portConfigComponentIDs); err != nil {
		return err
	}
	if err := db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeleteByComponentIDs(portConfigComponentIDs); err != nil {
		return err
	}
	if err := db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	if err := db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteByComponentIDs(envComponentIDs); err != nil {
		return err
	}

	if err := db.GetManager().TenantServicePluginRelationDaoTransactions(tx).CreateOrUpdatePluginRelsInBatch(pluginRelations); err != nil {
		return err
	}
	if err := db.GetManager().TenantPluginVersionENVDaoTransactions(tx).CreateOrUpdatePluginVersionEnvsInBatch(pluginVersionEnvs); err != nil {
		return err
	}
	if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).CreateOrUpdateStreamPluginPortsInBatch(pluginStreamPorts); err != nil {
		return err
	}
	return db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).CreateOrUpdatePluginVersionConfigsInBatch(pluginVersionConfigs)
}

// handlePluginMappingPort -
func (s *ServiceAction) handlePluginMappingPort(tenantID, componentID, pluginModel string, ports []*apimodel.BasePort) []*dbmodel.TenantServicesStreamPluginPort {
	existPorts := make(map[int]struct{})
	for _, port := range ports {
		existPorts[port.Port] = struct{}{}
	}

	minPort := 65301
	var newPorts []*dbmodel.TenantServicesStreamPluginPort
	for _, port := range ports {
		newPort := &dbmodel.TenantServicesStreamPluginPort{
			TenantID:      tenantID,
			ServiceID:     componentID,
			PluginModel:   pluginModel,
			ContainerPort: port.Port,
		}
		if _, ok := existPorts[minPort]; ok {
			minPort = minPort + 1
		}
		newPluginPort := minPort
		if _, ok := existPorts[newPluginPort]; ok {
			minPort = minPort + 1
			newPluginPort = minPort
		}

		existPorts[newPluginPort] = struct{}{}
		port.ListenPort = newPluginPort
		newPort.PluginPort = newPluginPort
		newPorts = append(newPorts, newPort)
	}
	return newPorts
}

// SyncComponentScaleRules -
func (s *ServiceAction) SyncComponentScaleRules(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs         []string
		autoScaleRuleIDs     []string
		autoScaleRules       []*dbmodel.TenantServiceAutoscalerRules
		autoScaleRuleMetrics []*dbmodel.TenantServiceAutoscalerRuleMetrics
	)
	for _, component := range components {
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		autoScaleRuleIDs = append(autoScaleRuleIDs, component.AutoScaleRule.RuleID)
		autoScaleRules = append(autoScaleRules, component.AutoScaleRule.DbModel(component.ComponentBase.ComponentID))

		for _, metric := range component.AutoScaleRule.RuleMetrics {
			autoScaleRuleMetrics = append(autoScaleRuleMetrics, metric.DbModel(component.AutoScaleRule.RuleID))
		}
	}
	if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	if err := db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).DeleteByRuleIDs(autoScaleRuleIDs); err != nil {
		return err
	}
	if err := db.GetManager().TenantServceAutoscalerRulesDaoTransactions(tx).CreateOrUpdateScaleRulesInBatch(autoScaleRules); err != nil {
		return err
	}
	return db.GetManager().TenantServceAutoscalerRuleMetricsDaoTransactions(tx).CreateOrUpdateScaleRuleMetricsInBatch(autoScaleRuleMetrics)
}

// SyncComponentEndpoints -
func (s *ServiceAction) SyncComponentEndpoints(tx *gorm.DB, components []*apimodel.Component) error {
	var (
		componentIDs               []string
		thirdPartySvcDiscoveryCfgs []*dbmodel.ThirdPartySvcDiscoveryCfg
	)
	for _, component := range components {
		if component.Endpoint == nil {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		if component.Endpoint.Kubernetes != nil {
			thirdPartySvcDiscoveryCfgs = append(thirdPartySvcDiscoveryCfgs, component.Endpoint.DbModel(component.ComponentBase.ComponentID))
		}
	}

	if err := db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).CreateOrUpdate3rdSvcDiscoveryCfgInBatch(thirdPartySvcDiscoveryCfgs)
}

// SyncComponentK8sAttributes -
func (s *ServiceAction) SyncComponentK8sAttributes(tx *gorm.DB, app *dbmodel.Application, components []*apimodel.Component) error {
	var (
		componentIDs  []string
		k8sAttributes []*dbmodel.ComponentK8sAttributes
	)
	for _, component := range components {
		if component.ComponentK8sAttributes == nil || len(component.ComponentK8sAttributes) == 0 {
			continue
		}
		componentIDs = append(componentIDs, component.ComponentBase.ComponentID)
		for _, k8sAttribute := range component.ComponentK8sAttributes {
			k8sAttributes = append(k8sAttributes, k8sAttribute.DbModel(app.TenantID, component.ComponentBase.ComponentID))
		}
	}

	if err := db.GetManager().ComponentK8sAttributeDaoTransactions(tx).DeleteByComponentIDs(componentIDs); err != nil {
		return err
	}
	return db.GetManager().ComponentK8sAttributeDaoTransactions(tx).CreateOrUpdateAttributesInBatch(k8sAttributes)
}

// Log returns the logs reader for a container in a pod, a pod or a component.
func (s *ServiceAction) Log(w http.ResponseWriter, r *http.Request, component *dbmodel.TenantServices, podName, containerName string, follow bool) error {
	// If podName and containerName is missing, return the logs reader for the component
	// If containerName is missing, return the logs reader for the pod.
	if podName == "" || containerName == "" {
		// Only support return the logs reader for a container now.
		return errors.WithStack(bcode.NewBadRequest("the field 'podName' and 'containerName' is required"))
	}
	tenant, err := db.GetManager().TenantDao().GetTenantByUUID(component.TenantID)
	if err != nil {
		return fmt.Errorf("get tenant info failure %s", err.Error())
	}
	request := s.kubeClient.CoreV1().Pods(tenant.Namespace).GetLogs(podName, &corev1.PodLogOptions{
		Container: containerName,
		Follow:    follow,
	})

	out, err := request.Stream(context.TODO())
	if err != nil {
		if k8sErrors.IsNotFound(err) {
			return errors.Wrap(bcode.ErrPodNotFound, "get pod log")
		}
		return errors.Wrap(err, "get stream from request")
	}
	defer out.Close()

	w.Header().Set("Transfer-Encoding", "chunked")
	w.WriteHeader(http.StatusOK)

	// Flush headers, if possible
	if flusher, ok := w.(http.Flusher); ok {
		flusher.Flush()
	}

	writer := flushwriter.Wrap(w)

	_, err = io.Copy(writer, out)
	if err != nil {
		if strings.HasSuffix(err.Error(), "write: broken pipe") {
			return nil
		}
		logrus.Warningf("write stream to response: %v", err)
	}
	return nil
}

// TransStatus trans service status
func TransStatus(eStatus string) string {
	switch eStatus {
	case "starting":
		return "启动中"
	case "waiting":
		return "等待运行"
	case "abnormal":
		return "运行异常"
	case "upgrade":
		return "升级中"
	case "closed":
		return "已关闭"
	case "stopping":
		return "关闭中"
	case "checking":
		return "检测中"
	case "unusual":
		return "运行异常"
	case "running":
		return "运行中"
	case "failure":
		return "未知"
	case "undeploy":
		return "未部署"
	case "deployed":
		return "已部署"
	case "succeeded":
		return "已完成"
	case "paused":
		return "挂起"
	}
	return ""
}

func (s *ServiceAction) FileManageInfo(serviceID, podName, tarPath, namespace string) ([]apimodel.FileInfo, error) {
	var fileInfos []apimodel.FileInfo

	// 获取服务信息
	service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
	if err != nil {
		return nil, err
	}
	containerName := service.K8sComponentName

	// 执行 shell 命令 `ls -p -1`,列出指定路径下的文件和目录
	output, err := s.executeCommand(podName, namespace, containerName, []string{"ls", "-p", "-1", tarPath})
	if err != nil {
		return nil, err
	}

	// 按行解析命令输出
	files := strings.Split(output, "\n")
	for _, file := range files {
		file = strings.TrimSpace(file) // 去除行首尾空格
		if len(file) == 0 {
			continue // 跳过空行
		}

		// 判断是否为目录(以 "/" 结尾)
		if strings.HasSuffix(file, "/") {
			fileInfos = append(fileInfos, apimodel.FileInfo{
				Title:  strings.TrimSuffix(file, "/"), // 去掉末尾的 "/"
				IsLeaf: true,                          // 目录
			})
		} else {
			fileInfos = append(fileInfos, apimodel.FileInfo{
				Title:  file,  // 文件名
				IsLeaf: false, // 文件
			})
		}
	}

	return fileInfos, nil
}

func (s *ServiceAction) executeCommand(podName, namespace, containerName string, command []string) (string, error) {
	req := s.kubeClient.CoreV1().RESTClient().Post().
		Resource("pods").
		Name(podName).
		Namespace(namespace).
		SubResource("exec")
	req.VersionedParams(&corev1.PodExecOptions{
		Container: containerName,
		Command:   command,
		Stdin:     false,
		Stdout:    true,
		Stderr:    true,
		TTY:       false,
	}, scheme.ParameterCodec)

	executor, err := remotecommand.NewSPDYExecutor(s.config, "POST", req.URL())
	if err != nil {
		return "", err
	}

	var stdout, stderr bytes.Buffer
	err = executor.Stream(remotecommand.StreamOptions{
		Stdout: &stdout,
		Stderr: &stderr,
	})
	if err != nil {
		return "", err
	}
	// 返回输出结果
	return strings.TrimSpace(stdout.String()), nil
}

// checkChaosHealth 检查 chaos 服务健康状态,如果服务未就绪则重试
func (s *ServiceAction) checkChaosHealth() error {
	builderAPI := configs.Default().APIConfig.BuilderAPI
	if len(builderAPI) == 0 {
		logrus.Warn("builder API is not configured, skip health check")
		return nil
	}

	// 使用第一个 builder API 地址
	builderAddr := builderAPI[0]
	healthURL := fmt.Sprintf("http://%s/v2/builder/health", builderAddr)

	// 重试配置:最多重试 5 次,每次等待 2 秒
	maxRetries := 5
	retryInterval := 2 * time.Second

	for i := 0; i < maxRetries; i++ {
		if i > 0 {
			logrus.Infof("chaos service not ready, retrying (%d/%d)...", i, maxRetries)
			time.Sleep(retryInterval)
		}

		client := &http.Client{
			Timeout: 5 * time.Second,
		}
		resp, err := client.Get(healthURL)
		if err != nil {
			logrus.Warnf("failed to check chaos health: %v", err)
			continue
		}
		defer resp.Body.Close()

		if resp.StatusCode == 200 {
			logrus.Info("chaos service is ready")
			return nil
		} else if resp.StatusCode == 503 {
			// 服务正在启动中
			body, _ := io.ReadAll(resp.Body)
			logrus.Warnf("chaos service is starting: %s", string(body))
			continue
		} else {
			// 其他错误
			body, _ := io.ReadAll(resp.Body)
			logrus.Warnf("chaos health check failed with status %d: %s", resp.StatusCode, string(body))
			continue
		}
	}

	return errors.New("chaos service is not ready after multiple retries, please try again later")
}