package utils
import (
"context"
"fmt"
"strings"
"time"
"gitcode.com/openFuyao/e2e-auto-test/e2e/framework/executor"
config "gitcode.com/openFuyao/e2e-auto-test/e2e/installation/bke-config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)
const (
DefaultBootstraoClusterKubeconfig = "/etc/rancher/k3s/k3s.yaml"
)
var BKEClusterGVR = schema.GroupVersionResource{
Group: "bke.bocloud.com",
Version: "v1beta1",
Resource: "bkeclusters",
}
var BKENodeGVR = schema.GroupVersionResource{
Group: "bke.bocloud.com",
Version: "v1beta1",
Resource: "bkenodes",
}
type ClusterManager struct {
executor *executor.SSHExecutor
dynamicClient dynamic.Interface
configGenerator *ClusterConfigGenerator
}
func NewClusterManager(exec *executor.SSHExecutor, dynamicClient dynamic.Interface) *ClusterManager {
return &ClusterManager{
executor: exec,
dynamicClient: dynamicClient,
configGenerator: NewClusterConfigGenerator(exec),
}
}
func (m *ClusterManager) CreateBinaryConfig(kubeletConfigPath, containerdConfigPath string) error {
cmd := fmt.Sprintf("kubectl apply -f %s -f %s", kubeletConfigPath, containerdConfigPath)
result, err := m.executor.Exec(cmd)
if err != nil {
return fmt.Errorf("新增自定义配置命令失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("新增自定义配置失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) CreateCluster(clusterConfigPath, nodeConfigPath string) error {
return m.CreateClusterWithConfig(clusterConfigPath, nodeConfigPath, "")
}
func (m *ClusterManager) CreateClusterWithConfig(clusterConfigPath, nodeConfigPath, kubeconfigPath string) error {
cmd := fmt.Sprintf("bke cluster create -f %s -n %s", clusterConfigPath, nodeConfigPath)
if kubeconfigPath != "" {
cmd = fmt.Sprintf("bke --kubeconfig %s cluster create -f %s -n %s", kubeconfigPath, clusterConfigPath, nodeConfigPath)
}
result, err := m.executor.Exec(cmd)
if err != nil {
return fmt.Errorf("执行创建集群命令失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("创建集群失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) CreateClusterInBackground(clusterConfigPath, nodeConfigPath string) error {
return m.CreateClusterInBackgroundWithKubeconfig(clusterConfigPath, nodeConfigPath, "")
}
func (m *ClusterManager) CreateClusterInBackgroundWithKubeconfig(clusterConfigPath, nodeConfigPath, kubeconfigPath string) error {
clusterName := extractClusterNameFromConfigPath(clusterConfigPath)
logPath := fmt.Sprintf("/tmp/bke-create-%s.log", clusterName)
cmd := fmt.Sprintf("nohup bke cluster create -f %s -n %s > %s 2>&1 &", clusterConfigPath, nodeConfigPath, logPath)
if kubeconfigPath != "" {
cmd = fmt.Sprintf("nohup bke --kubeconfig %s cluster create -f %s -n %s > %s 2>&1 &", kubeconfigPath, clusterConfigPath, nodeConfigPath, logPath)
}
result, err := m.executor.Exec(cmd)
if err != nil {
return fmt.Errorf("执行后台创建集群命令失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("后台创建集群失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) CreateClusterWithKubeconfig(clusterConfigPath, nodeConfigPath, kubeconfigPath string) error {
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
getNamespaceCmd := fmt.Sprintf("yq eval '.metadata.namespace' %s", clusterConfigPath)
result, err := m.executor.Exec(getNamespaceCmd)
if err != nil {
return fmt.Errorf("failed to read namespace: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("failed to read namespace: %s", result.Stderr)
}
namespace := strings.TrimSpace(result.Stdout)
if namespace == "" {
return fmt.Errorf("unable to get namespace from config file")
}
createNsCmd := fmt.Sprintf("%skubectl create namespace %s --dry-run=client -o yaml | %skubectl apply -f -", kubeconfigArg, namespace, kubeconfigArg)
result, err = m.executor.Exec(createNsCmd)
if err != nil {
verifyCmd := fmt.Sprintf("%skubectl cluster-info --request-timeout=5s", kubeconfigArg)
verifyResult, verifyErr := m.executor.Exec(verifyCmd)
if verifyErr != nil || verifyResult.ExitCode != 0 {
return fmt.Errorf("failed to create namespace (kubeconfig may be invalid): %s, cluster-info check failed: %s", result.Stderr, verifyResult.Stderr)
}
return fmt.Errorf("failed to create namespace: %w, stdout: %s, stderr: %s", err, result.Stdout, result.Stderr)
}
if result.ExitCode != 0 {
if strings.Contains(result.Stderr, "AlreadyExists") {
} else {
verifyCmd := fmt.Sprintf("%skubectl cluster-info --request-timeout=5s", kubeconfigArg)
verifyResult, verifyErr := m.executor.Exec(verifyCmd)
if verifyErr != nil || verifyResult.ExitCode != 0 {
return fmt.Errorf("failed to create namespace (kubeconfig may be invalid): %s, cluster-info check failed: %s", result.Stderr, verifyResult.Stderr)
}
return fmt.Errorf("failed to create namespace: %s, stdout: %s", result.Stderr, result.Stdout)
}
}
applyNodesCmd := fmt.Sprintf("%skubectl apply -f %s", kubeconfigArg, nodeConfigPath)
result, err = m.executor.Exec(applyNodesCmd)
if err != nil {
return fmt.Errorf("failed to apply node config: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("failed to apply node config: %s", result.Stderr)
}
applyClusterCmd := fmt.Sprintf("%skubectl apply -f %s", kubeconfigArg, clusterConfigPath)
result, err = m.executor.Exec(applyClusterCmd)
if err != nil {
return fmt.Errorf("failed to apply cluster config: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("failed to apply cluster config: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) DeleteCluster(clusterName string) error {
return m.DeleteClusterWithKubeconfig(clusterName, "")
}
func (m *ClusterManager) DeleteClusterWithKubeconfig(clusterName string, kubeconfigPath string) error {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
if !m.ClusterExistsWithKubeconfig(clusterName, kubeconfigPath) {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
pollInterval := 30 * time.Second
var finalState, finalClusterStatus string
for {
_, state, clusterStatus, err := m.GetClusterFullStatusWithKubeconfig(clusterName, kubeconfigPath)
if err != nil {
if !m.ClusterExistsWithKubeconfig(clusterName, kubeconfigPath) {
return nil
}
} else {
finalState = state
finalClusterStatus = clusterStatus
if state == "Healthy" && clusterStatus == "Ready" {
break
} else if state == "DeployFailed" || state == "Managing" || state == "Unhealthy" {
break
} else if clusterStatus == "InitializationFailed" {
break
}
}
select {
case <-ctx.Done():
break
case <-time.After(pollInterval):
continue
}
break
}
annotateCmd := fmt.Sprintf(`%skubectl annotate bc %s -n %s bke.bocloud.com/ignore-target-cluster-delete="false" --overwrite`,
kubeconfigArg, bcName, namespace)
result, err := m.executor.Exec(annotateCmd)
if err != nil {
return fmt.Errorf("设置删除注解失败: %w", err)
}
if result.ExitCode != 0 && !strings.Contains(result.Stderr, "not found") {
return fmt.Errorf("设置删除注解失败: %s", result.Stderr)
}
annotateNamespaceCmd := fmt.Sprintf(`%skubectl annotate bc %s -n %s bke.bocloud.com/ignore-namespace-delete="false" --overwrite`,
kubeconfigArg, bcName, namespace)
result, err = m.executor.Exec(annotateNamespaceCmd)
if err != nil {
return fmt.Errorf("设置命名空间删除注解失败: %w", err)
}
if result.ExitCode != 0 && !strings.Contains(result.Stderr, "not found") {
return fmt.Errorf("设置命名空间删除注解失败: %s", result.Stderr)
}
patchCmd := fmt.Sprintf(`%skubectl patch bc %s -n %s --type=merge -p '{"spec":{"reset":true}}'`, kubeconfigArg, bcName, namespace)
result, err = m.executor.Exec(patchCmd)
if err != nil {
return fmt.Errorf("触发集群删除失败: %w", err)
}
if result.ExitCode != 0 {
if strings.Contains(result.Stderr, "not found") {
return nil
}
return fmt.Errorf("触发集群删除失败: %s", result.Stderr)
}
if finalState == "Healthy" && finalClusterStatus == "Ready" {
return nil
}
deleteCmd := fmt.Sprintf(`%skubectl delete bc %s -n %s`, kubeconfigArg, bcName, namespace)
result, err = m.executor.Exec(deleteCmd)
if err != nil {
return fmt.Errorf("删除 BC 资源失败: %w", err)
}
if result.ExitCode != 0 {
if strings.Contains(result.Stderr, "not found") {
return nil
}
return fmt.Errorf("删除 BC 资源失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) DeleteManagementClusterSelfBC(mgmtKubeconfigPath string) error {
return m.DeleteClusterWithKubeconfig("cluster", mgmtKubeconfigPath)
}
func (m *ClusterManager) ForceResetManagementCluster(nodes []config.NodeInfo, localExecutor *executor.LocalExecutor) error {
if len(nodes) == 0 {
return fmt.Errorf("管理集群节点列表为空")
}
if localExecutor == nil {
return fmt.Errorf("localExecutor 不能为空")
}
var lastErr error
for i, node := range nodes {
nodeErr := m.resetNode(node, localExecutor, i+1, len(nodes))
if nodeErr != nil {
lastErr = nodeErr
}
}
return lastErr
}
func (m *ClusterManager) resetNode(node config.NodeInfo, localExecutor *executor.LocalExecutor, nodeIndex, totalNodes int) error {
nodeLabel := fmt.Sprintf("节点 %d/%d (%s)", nodeIndex, totalNodes, node.IP)
_ = localExecutor
downloadCmd := "curl -sfL https://openfuyao.obs.cn-north-4.myhuaweicloud.com/openFuyao/bkeadm/releases/download/latest/download.sh | bash"
result, err := ExecuteCommandOnNode(node, downloadCmd)
if err != nil {
return fmt.Errorf("%s: 下载 bke 工具失败: %w, stderr: %s", nodeLabel, err, result.Stderr)
}
if result.ExitCode != 0 {
return fmt.Errorf("%s: 下载 bke 工具失败 (exit code %d): %s", nodeLabel, result.ExitCode, result.Stderr)
}
resetCmd := "echo \"y\" | bke reset --all --mount"
result, err = ExecuteCommandOnNode(node, resetCmd)
if err != nil {
}
if result.ExitCode != 0 {
}
rmBkeCmd := "rm -rf /bke"
result, err = ExecuteCommandOnNode(node, rmBkeCmd)
if err != nil {
}
if result.ExitCode != 0 {
}
rmBkeBinCmd := "rm -f /usr/bin/bke"
result, err = ExecuteCommandOnNode(node, rmBkeBinCmd)
if err != nil {
}
if result.ExitCode != 0 {
}
return nil
}
func (m *ClusterManager) WaitForClusterDeleted(ctx context.Context, clusterName string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
pollInterval := 1 * time.Minute
for time.Now().Before(deadline) {
if !m.ClusterExists(clusterName) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pollInterval):
continue
}
}
return fmt.Errorf("等待集群删除超时")
}
func (m *ClusterManager) CleanupBKECluster(clusterName string, force bool) error {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
if !m.ClusterExists(clusterName) {
return nil
}
cmd := fmt.Sprintf("kubectl delete bc %s -n %s", bcName, namespace)
if force {
cmd += " --force --grace-period=0"
}
result, err := m.executor.Exec(cmd)
if err != nil {
return fmt.Errorf("删除 BC 资源失败: %w", err)
}
if result.ExitCode != 0 {
if strings.Contains(result.Stderr, "not found") {
return nil
}
return fmt.Errorf("删除 BC 资源失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) GetClusterStatus(clusterName string) (string, error) {
return m.GetClusterStatusWithKubeconfig(clusterName, "")
}
func (m *ClusterManager) GetClusterStatusWithKubeconfig(clusterName, kubeconfigPath string) (string, error) {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
cmd := fmt.Sprintf("%skubectl get bc %s -n %s -o jsonpath='{.status.clusterHealthState}'", kubeconfigArg, bcName, namespace)
result, err := m.executor.Exec(cmd)
if err != nil {
return "", err
}
if result.ExitCode != 0 {
return "", fmt.Errorf("获取集群状态失败: %s", result.Stderr)
}
state := strings.TrimSpace(strings.Trim(result.Stdout, "'"))
if state == "" {
return "Unknown", nil
}
return state, nil
}
func (m *ClusterManager) GetClusterPhase(clusterName string) (string, error) {
return m.GetClusterPhaseWithKubeconfig(clusterName, "")
}
func (m *ClusterManager) GetClusterPhaseWithKubeconfig(clusterName, kubeconfigPath string) (string, error) {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
cmd := fmt.Sprintf("%skubectl get bc %s -n %s -o jsonpath='{.status.phase}'", kubeconfigArg, bcName, namespace)
result, err := m.executor.Exec(cmd)
if err != nil {
return "", err
}
if result.ExitCode != 0 {
return "", fmt.Errorf("获取集群阶段失败: %s", result.Stderr)
}
phase := strings.TrimSpace(strings.Trim(result.Stdout, "'"))
if phase == "" {
return "Unknown", nil
}
return phase, nil
}
func (m *ClusterManager) GetClusterFullStatus(clusterName string) (phase, state, clusterStatus string, err error) {
return m.GetClusterFullStatusWithKubeconfig(clusterName, "")
}
func (m *ClusterManager) GetClusterFullStatusWithKubeconfig(clusterName, kubeconfigPath string) (phase, state, clusterStatus string, err error) {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
cmd := fmt.Sprintf("%skubectl get bc %s -n %s -o custom-columns=PHASE:.status.phase,STATE:.status.clusterHealthState,STATUS:.status.clusterStatus --no-headers", kubeconfigArg, bcName, namespace)
result, err := m.executor.Exec(cmd)
if err != nil {
return "", "", "", err
}
if result.ExitCode != 0 {
return "", "", "", fmt.Errorf("获取集群状态失败: %s", result.Stderr)
}
fields := strings.Fields(strings.TrimSpace(result.Stdout))
if len(fields) >= 3 {
phase = fields[0]
state = fields[1]
clusterStatus = fields[2]
}
if phase == "<none>" {
phase = ""
}
if state == "<none>" {
state = ""
}
if clusterStatus == "<none>" {
clusterStatus = ""
}
return phase, state, clusterStatus, nil
}
const (
ManagementClusterSelfBCName = "bke-cluster"
ManagementClusterSelfBCNamespace = "bke-cluster"
)
func (m *ClusterManager) GetManagementClusterSelfFullStatus(mgmtKubeconfigPath string) (phase, state, clusterStatus string, err error) {
if mgmtKubeconfigPath == "" {
return "", "", "", fmt.Errorf("mgmtKubeconfigPath 不能为空")
}
kubeconfigArg := fmt.Sprintf("KUBECONFIG=%s ", mgmtKubeconfigPath)
cmd := fmt.Sprintf("%skubectl get bc %s -n %s -o custom-columns=PHASE:.status.phase,STATE:.status.clusterHealthState,STATUS:.status.clusterStatus --no-headers",
kubeconfigArg, ManagementClusterSelfBCName, ManagementClusterSelfBCNamespace)
result, err := m.executor.Exec(cmd)
if err != nil {
return "", "", "", err
}
if result.ExitCode != 0 {
return "", "", "", fmt.Errorf("获取管理集群自管 BC 状态失败: %s", result.Stderr)
}
fields := strings.Fields(strings.TrimSpace(result.Stdout))
if len(fields) >= 3 {
phase = fields[0]
state = fields[1]
clusterStatus = fields[2]
}
if phase == "<none>" {
phase = ""
}
if state == "<none>" {
state = ""
}
if clusterStatus == "<none>" {
clusterStatus = ""
}
return phase, state, clusterStatus, nil
}
func (m *ClusterManager) IsManagementClusterSelfHealthy(mgmtKubeconfigPath string) (bool, error) {
_, state, _, err := m.GetManagementClusterSelfFullStatus(mgmtKubeconfigPath)
if err != nil {
return false, err
}
return state == "Healthy", nil
}
func (m *ClusterManager) ClusterExists(clusterName string) bool {
return m.ClusterExistsWithKubeconfig(clusterName, "")
}
func (m *ClusterManager) ClusterExistsWithKubeconfig(clusterName, kubeconfigPath string) bool {
bcName := "bke-" + clusterName
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
cmd := fmt.Sprintf("%skubectl get bc %s -n %s --no-headers 2>/dev/null", kubeconfigArg, bcName, namespace)
result, err := m.executor.Exec(cmd)
if err != nil {
return false
}
return result.ExitCode == 0 && strings.TrimSpace(result.Stdout) != ""
}
func (m *ClusterManager) WaitForClusterReady(ctx context.Context, clusterName string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
pollInterval := 30 * time.Second
for time.Now().Before(deadline) {
state, err := m.GetClusterStatus(clusterName)
if err == nil && state == "Healthy" {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(pollInterval):
continue
}
}
return fmt.Errorf("等待集群就绪超时")
}
func (m *ClusterManager) ScaleOutNode(clusterName string, node config.NodeInfo) error {
return m.ScaleOutNodeWithKubeconfig(clusterName, node, "")
}
func (m *ClusterManager) ScaleOutNodeWithKubeconfig(clusterName string, node config.NodeInfo, kubeconfigPath string) error {
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
nodeYAML := GenerateSingleBKENodeYAML(clusterName, node)
tempPath := fmt.Sprintf("/tmp/bke-scaleout-%s-%s.yaml", clusterName, node.Hostname)
writeCmd := fmt.Sprintf("cat > %s << 'BKENODE_EOF'\n%sBKENODE_EOF", tempPath, nodeYAML)
result, err := m.executor.Exec(writeCmd)
if err != nil {
return fmt.Errorf("写入扩容节点配置失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("写入扩容节点配置失败: %s", result.Stderr)
}
applyCmd := fmt.Sprintf("%skubectl apply -f %s", kubeconfigArg, tempPath)
result, err = m.executor.Exec(applyCmd)
if err != nil {
return fmt.Errorf("扩容节点失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("扩容节点失败: %s", result.Stderr)
}
m.executor.Exec(fmt.Sprintf("rm -f %s", tempPath))
_ = namespace
return nil
}
func (m *ClusterManager) ScaleInNode(clusterName, nodeIP string) error {
return m.ScaleInNodeWithKubeconfig(clusterName, nodeIP, "")
}
func (m *ClusterManager) ScaleInNodeWithKubeconfig(clusterName, nodeIP, kubeconfigPath string) error {
namespace := "bke-" + clusterName
kubeconfigArg := ""
if kubeconfigPath != "" {
kubeconfigArg = fmt.Sprintf("KUBECONFIG=%s ", kubeconfigPath)
}
findCmd := fmt.Sprintf(`%skubectl get bkenode -n %s -o jsonpath='{range .items[*]}{.metadata.name},{.spec.ip}{"\n"}{end}'`, kubeconfigArg, namespace)
result, err := m.executor.Exec(findCmd)
if err != nil {
return fmt.Errorf("获取 BKENode 列表失败: %w", err)
}
nodeName := ""
for _, line := range strings.Split(strings.Trim(result.Stdout, "'"), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
parts := strings.SplitN(line, ",", 2)
if len(parts) == 2 && parts[1] == nodeIP {
nodeName = parts[0]
break
}
}
if nodeName == "" {
return fmt.Errorf("在集群中未找到 IP 为 %s 的 BKENode", nodeIP)
}
deleteCmd := fmt.Sprintf("%skubectl delete bkenode %s -n %s", kubeconfigArg, nodeName, namespace)
result, err = m.executor.Exec(deleteCmd)
if err != nil {
return fmt.Errorf("删除 BKENode 失败: %w", err)
}
if result.ExitCode != 0 {
return fmt.Errorf("删除 BKENode 失败: %s", result.Stderr)
}
return nil
}
func (m *ClusterManager) GetClusterList() ([]string, error) {
cmd := "bke cluster list --no-headers 2>/dev/null | awk '{print $1}'"
result, err := m.executor.Exec(cmd)
if err != nil {
return nil, fmt.Errorf("获取集群列表失败: %w", err)
}
clusters := []string{}
for _, line := range splitLines(result.Stdout) {
if line != "" {
clusters = append(clusters, line)
}
}
return clusters, nil
}
func (m *ClusterManager) PrepareAndCreateCluster(cfg *config.BKEClusterConfig) (string, string, error) {
clusterConfigPath, nodeConfigPath, err := m.configGenerator.GenerateAndUpload(cfg)
if err != nil {
return "", "", fmt.Errorf("生成集群配置失败: %w", err)
}
if err := m.CreateClusterInBackgroundWithKubeconfig(clusterConfigPath, nodeConfigPath, DefaultBootstraoClusterKubeconfig); err != nil {
return clusterConfigPath, nodeConfigPath, fmt.Errorf("创建集群失败: %w", err)
}
return clusterConfigPath, nodeConfigPath, nil
}
func (m *ClusterManager) GetConfigGenerator() *ClusterConfigGenerator {
return m.configGenerator
}
func (m *ClusterManager) CleanupConfig(configPath string) error {
return m.configGenerator.CleanupConfig(configPath)
}
func (m *ClusterManager) CleanupPathRecursive(path string) error {
return m.configGenerator.CleanupPathRecursive(path)
}
func (m *ClusterManager) GetDynamicClient() dynamic.Interface {
return m.dynamicClient
}
func (m *ClusterManager) AddNewAddons(clusterName, namespace string, newAddon config.AddonConfig) error {
clusterName = ClusterName(clusterName)
namespace = ClusterNamespace(namespace)
unstructuredObj, err := m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Get(
context.TODO(),
clusterName,
metav1.GetOptions{},
)
if err != nil {
return fmt.Errorf("获取 bkecluster 失败:%w", err)
}
addons, found, err := unstructured.NestedSlice(unstructuredObj.Object, "spec", "clusterConfig", "addons")
if err != nil {
return fmt.Errorf("获取 addons 失败:%w", err)
}
if !found {
addons = make([]interface{}, 0)
}
for _, addon := range addons {
addonMap, ok := addon.(map[string]interface{})
if !ok {
continue
}
if name, _, _ := unstructured.NestedString(addonMap, "name"); name == newAddon.Name {
return fmt.Errorf("addon %s 已存在,如需更新请使用 PatchBkeClusterAddonVersion", newAddon.Name)
}
}
newAddonMap := make(map[string]interface{})
newAddonMap["name"] = newAddon.Name
newAddonMap["version"] = newAddon.Version
if newAddon.Type != "" {
newAddonMap["type"] = newAddon.Type
}
if newAddon.ReleaseName != "" {
newAddonMap["releaseName"] = newAddon.ReleaseName
}
if newAddon.Namespace != "" {
newAddonMap["namespace"] = newAddon.Namespace
}
if newAddon.Block {
newAddonMap["block"] = newAddon.Block
}
addons = append(addons, newAddonMap)
if err := unstructured.SetNestedSlice(unstructuredObj.Object, addons, "spec", "clusterConfig", "addons"); err != nil {
return fmt.Errorf("设置 addons 失败:%w", err)
}
_, err = m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Update(
context.TODO(),
unstructuredObj,
metav1.UpdateOptions{},
)
return err
}
func (m *ClusterManager) PatchBkeClusterAddonVersion(clusterName, namespace, addonName, version string) error {
clusterName = ClusterName(clusterName)
namespace = ClusterNamespace(namespace)
unstructuredObj, err := m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Get(
context.TODO(),
clusterName,
metav1.GetOptions{},
)
if err != nil {
return fmt.Errorf("获取 bkecluster 失败:%w", err)
}
addons, found, err := unstructured.NestedSlice(unstructuredObj.Object, "spec", "clusterConfig", "addons")
if err != nil || !found {
return fmt.Errorf("获取 addons 失败:%w", err)
}
for i, addon := range addons {
addonMap, ok := addon.(map[string]interface{})
if !ok {
continue
}
if name, _, _ := unstructured.NestedString(addonMap, "name"); name == addonName {
addonMap["version"] = version
addons[i] = addonMap
break
}
}
if err := unstructured.SetNestedSlice(unstructuredObj.Object, addons, "spec", "clusterConfig", "addons"); err != nil {
return fmt.Errorf("设置 addons 失败:%w", err)
}
_, err = m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Update(
context.TODO(),
unstructuredObj,
metav1.UpdateOptions{},
)
return err
}
func (m *ClusterManager) RemoveBkeClusterAddonByName(clusterName, namespace, addonName string) error {
clusterName = ClusterName(clusterName)
namespace = ClusterNamespace(namespace)
unstructuredObj, err := m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("获取bkecluster失败: %w", err)
}
addons, found, err := unstructured.NestedSlice(unstructuredObj.Object, "spec", "clusterConfig", "addons")
if err != nil {
return fmt.Errorf("获取addons失败: %w", err)
}
if !found {
return fmt.Errorf("addons未找到")
}
newAddons := make([]interface{}, 0, len(addons))
for _, item := range addons {
itemMap, ok := item.(map[string]interface{})
if !ok {
newAddons = append(newAddons, item)
continue
}
nameVal, _, _ := unstructured.NestedString(itemMap, "name")
if nameVal == addonName {
continue
}
newAddons = append(newAddons, item)
}
if len(newAddons) == len(addons) {
return nil
}
err = unstructured.SetNestedSlice(unstructuredObj.Object, newAddons, "spec", "clusterConfig", "addons")
if err != nil {
return fmt.Errorf("failed to set addons: %v", err)
}
_, err = m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Update(context.TODO(), unstructuredObj, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update BKECluster: %v", err)
}
return nil
}
func (m *ClusterManager) GetBkeCluster(clusterName, namespace string) (*unstructured.Unstructured, error) {
clusterName = ClusterName(clusterName)
namespace = ClusterNamespace(namespace)
return m.dynamicClient.Resource(BKEClusterGVR).Namespace(namespace).Get(context.TODO(), clusterName, metav1.GetOptions{})
}
func splitLines(s string) []string {
return strings.Split(strings.TrimSpace(s), "\n")
}
func ClusterName(clusterName string) string {
return "bke-" + clusterName
}
func ClusterNamespace(clusterName string) string {
return "bke-" + clusterName
}
func extractClusterNameFromConfigPath(configPath string) string {
base := configPath
if idx := strings.LastIndex(base, "/"); idx >= 0 {
base = base[idx+1:]
}
base = strings.TrimSuffix(base, ".yaml")
base = strings.TrimPrefix(base, "bke-cluster-")
return base
}