package utils
import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"gitcode.com/openFuyao/e2e-auto-test/e2e/framework/executor"
)
type FunctionalConfig struct {
SSHHost string
SSHPort int
SSHUser string
SSHPassword string
VolcanoNamespace string
VolcanoConfigMapName string
VolcanoSchedulerName string
KubeletConfigPath string
KubeletServiceName string
ContainerdServiceName string
BizNamespace string
BizDeploymentName string
BizLabelSelector string
AutoRestore bool
HelmReleaseName string
HelmNamespace string
HelmURL string
HelmVersion string
}
type PodSnapshot struct {
Name string
UID string
Phase string
Ready bool
RestartCount int64
}
func LoadFunctionalConfig() (*FunctionalConfig, error) {
host := firstNonEmpty(
os.Getenv("FUNCTIONAL_TEST_IP"),
os.Getenv("SYSTEM_TEST_IP"),
os.Getenv("COMPONENTS_TEST_IP"),
)
user := firstNonEmpty(
os.Getenv("FUNCTIONAL_TEST_USER"),
os.Getenv("SYSTEM_TEST_USER"),
os.Getenv("COMPONENTS_TEST_USER"),
)
password := firstNonEmpty(
os.Getenv("FUNCTIONAL_TEST_PASSWORD"),
os.Getenv("SYSTEM_TEST_PASSWORD"),
os.Getenv("COMPONENTS_TEST_PASSWORD"),
)
portRaw := firstNonEmpty(
os.Getenv("FUNCTIONAL_TEST_PORT"),
os.Getenv("SYSTEM_TEST_PORT"),
os.Getenv("COMPONENTS_TEST_PORT"),
"22",
)
if host == "" || user == "" || password == "" {
return nil, fmt.Errorf("FUNCTIONAL_TEST_IP/USER/PASSWORD (or SYSTEM_TEST_*/COMPONENTS_TEST_*) must be configured")
}
port, err := strconv.Atoi(portRaw)
if err != nil {
return nil, fmt.Errorf("invalid ssh port %q: %w", portRaw, err)
}
autoRestore, err := parseBoolWithDefault(os.Getenv("NUMA_FUNCTIONAL_AUTO_RESTORE"), true)
if err != nil {
return nil, err
}
selector := strings.TrimSpace(os.Getenv("NUMA_FUNCTIONAL_BIZ_LABEL_SELECTOR"))
if selector == "" {
labelKey := firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_BIZ_LABEL_KEY"), "app")
labelValue := firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_BIZ_LABEL_VALUE"), "numa-functional-biz")
selector = fmt.Sprintf("%s=%s", labelKey, labelValue)
}
cfg := &FunctionalConfig{
SSHHost: host,
SSHPort: port,
SSHUser: user,
SSHPassword: password,
VolcanoNamespace: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_VOLCANO_NAMESPACE"), "volcano-system"),
VolcanoConfigMapName: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_VOLCANO_CONFIGMAP"), "volcano-scheduler-configmap"),
VolcanoSchedulerName: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_SCHEDULER_NAME"), "volcano"),
KubeletConfigPath: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_KUBELET_CONFIG_PATH"), "/var/lib/kubelet/config.yaml"),
KubeletServiceName: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_KUBELET_SERVICE"), "kubelet"),
ContainerdServiceName: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_CONTAINERD_SERVICE"), "containerd"),
BizNamespace: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_BIZ_NAMESPACE"), "default"),
BizDeploymentName: firstNonEmpty(os.Getenv("NUMA_FUNCTIONAL_BIZ_DEPLOYMENT_NAME"), "numa-functional-biz"),
BizLabelSelector: selector,
AutoRestore: autoRestore,
HelmReleaseName: strings.TrimSpace(os.Getenv("NUMA_FUNCTIONAL_HELM_RELEASE")),
HelmNamespace: strings.TrimSpace(os.Getenv("NUMA_FUNCTIONAL_HELM_NAMESPACE")),
HelmURL: strings.TrimSpace(os.Getenv("NUMA_FUNCTIONAL_HELM_URL")),
HelmVersion: strings.TrimSpace(os.Getenv("NUMA_FUNCTIONAL_HELM_VERSION")),
}
return cfg, nil
}
func ExecStdout(sshClient *executor.SSHExecutor, cmd string) (string, error) {
res, err := sshClient.Exec(cmd)
if err != nil {
stderr := ""
if res != nil {
stderr = strings.TrimSpace(res.Stderr)
if stderr == "" {
stderr = strings.TrimSpace(res.Stdout)
}
}
return "", fmt.Errorf("run command failed: %s, stderr: %s, error: %w", cmd, stderr, err)
}
return strings.TrimSpace(res.Stdout), nil
}
func NormalizeOutput(raw string) string {
text := strings.TrimSpace(raw)
text = strings.Trim(text, "'")
text = strings.Trim(text, "\"")
return strings.TrimSpace(text)
}
func RestartSystemdServiceAndWaitActive(sshClient *executor.SSHExecutor, serviceName string) error {
service := NormalizeOutput(serviceName)
if service == "" {
return fmt.Errorf("service name is empty")
}
cmd := fmt.Sprintf("systemctl restart %s && systemctl is-active %s", ShellQuote(service), ShellQuote(service))
status, err := ExecStdout(sshClient, cmd)
if err != nil {
return err
}
if NormalizeOutput(status) != "active" {
return fmt.Errorf("service %s is not active after restart", service)
}
return nil
}
func EnsureNamespace(sshClient *executor.SSHExecutor, namespace string) error {
cmd := fmt.Sprintf("kubectl create namespace %s --dry-run=client -o yaml | kubectl apply -f -", ShellQuote(namespace))
_, err := ExecStdout(sshClient, cmd)
return err
}
func EnsureBusinessWorkload(sshClient *executor.SSHExecutor, cfg *FunctionalConfig) (bool, error) {
if err := EnsureNamespace(sshClient, cfg.BizNamespace); err != nil {
return false, err
}
deployCmd := fmt.Sprintf("kubectl get deployment %s -n %s --no-headers 2>/dev/null | wc -l", ShellQuote(cfg.BizDeploymentName), ShellQuote(cfg.BizNamespace))
countRaw, err := ExecStdout(sshClient, deployCmd)
if err != nil {
return false, err
}
count, err := strconv.Atoi(strings.TrimSpace(countRaw))
if err != nil {
return false, fmt.Errorf("parse deployment count failed: %w", err)
}
if count > 0 {
return false, nil
}
labelKey, labelValue, err := ParseSingleLabelSelector(cfg.BizLabelSelector)
if err != nil {
return false, err
}
yamlContent := BuildBusinessDeploymentYAML(cfg.BizNamespace, cfg.BizDeploymentName, labelKey, labelValue)
applyCmd := fmt.Sprintf("cat <<'EOF' | kubectl apply -f -\n%s\nEOF", yamlContent)
if _, err = ExecStdout(sshClient, applyCmd); err != nil {
return false, err
}
return true, nil
}
func DeleteBusinessWorkload(sshClient *executor.SSHExecutor, cfg *FunctionalConfig) {
_, _ = ExecStdout(sshClient, fmt.Sprintf("kubectl delete deployment %s -n %s --ignore-not-found=true", ShellQuote(cfg.BizDeploymentName), ShellQuote(cfg.BizNamespace)))
_, _ = ExecStdout(sshClient, fmt.Sprintf("kubectl wait --for=delete pod -l %s -n %s --timeout=60s 2>/dev/null", ShellQuote(cfg.BizLabelSelector), ShellQuote(cfg.BizNamespace)))
}
func GetRunningPodSnapshots(sshClient *executor.SSHExecutor, namespace string, labelSelector string) ([]PodSnapshot, error) {
selectorArg := ""
if strings.TrimSpace(labelSelector) != "" {
selectorArg = fmt.Sprintf("-l %s", ShellQuote(labelSelector))
}
cmd := fmt.Sprintf("kubectl get pods -n %s %s -o json", ShellQuote(namespace), selectorArg)
rawJSON, err := ExecStdout(sshClient, cmd)
if err != nil {
return nil, err
}
var podList struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
UID string `json:"uid"`
DeletionTimestamp *string `json:"deletionTimestamp"`
} `json:"metadata"`
Status struct {
Phase string `json:"phase"`
ContainerStatuses []struct {
RestartCount int64 `json:"restartCount"`
} `json:"containerStatuses"`
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
} `json:"conditions"`
} `json:"status"`
} `json:"items"`
}
if err = json.Unmarshal([]byte(rawJSON), &podList); err != nil {
return nil, fmt.Errorf("parse pod json failed: %w", err)
}
result := make([]PodSnapshot, 0)
nonRunning := make([]string, 0)
for _, item := range podList.Items {
if item.Metadata.DeletionTimestamp != nil {
continue
}
if item.Status.Phase != "Running" {
nonRunning = append(nonRunning, fmt.Sprintf("%s(phase=%s)", item.Metadata.Name, item.Status.Phase))
continue
}
ready := false
for _, condition := range item.Status.Conditions {
if condition.Type == "Ready" && strings.EqualFold(condition.Status, "True") {
ready = true
break
}
}
restartCount := int64(0)
for _, status := range item.Status.ContainerStatuses {
restartCount += status.RestartCount
}
result = append(result, PodSnapshot{
Name: item.Metadata.Name,
UID: item.Metadata.UID,
Phase: item.Status.Phase,
Ready: ready,
RestartCount: restartCount,
})
}
if len(result) == 0 {
if len(nonRunning) > 0 {
return nil, fmt.Errorf("no running pods found in namespace=%s selector=%s, non-running pods: %s", namespace, labelSelector, strings.Join(nonRunning, ", "))
}
return nil, fmt.Errorf("no pods found in namespace=%s selector=%s", namespace, labelSelector)
}
return result, nil
}
func CollectPodDiagnostics(sshClient *executor.SSHExecutor, namespace string, labelSelector string) string {
var parts []string
podsCmd := fmt.Sprintf("kubectl get pods -n %s", ShellQuote(namespace))
if strings.TrimSpace(labelSelector) != "" {
podsCmd = fmt.Sprintf("kubectl get pods -n %s -l %s", ShellQuote(namespace), ShellQuote(labelSelector))
}
if out, err := ExecStdout(sshClient, podsCmd); err == nil {
parts = append(parts, "kubectl get pods:\n"+out)
} else {
parts = append(parts, fmt.Sprintf("kubectl get pods failed: %v", err))
}
eventsCmd := fmt.Sprintf("kubectl get events -n %s --sort-by=.lastTimestamp 2>/dev/null | tail -n 30", ShellQuote(namespace))
if out, err := ExecStdout(sshClient, eventsCmd); err == nil {
parts = append(parts, "recent events (tail 30):\n"+out)
} else {
parts = append(parts, fmt.Sprintf("kubectl get events failed: %v", err))
}
return strings.Join(parts, "\n\n")
}
func WaitForVolcanoComponentsReady(sshClient *executor.SSHExecutor, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
var lastErr error
for time.Now().Before(deadline) {
lastErr = nil
podsCmd := fmt.Sprintf(
"kubectl get pods -n %s -o json 2>/dev/null",
ShellQuote(namespace),
)
rawJSON, err := ExecStdout(sshClient, podsCmd)
if err != nil {
lastErr = fmt.Errorf("query volcano pods failed: %w", err)
time.Sleep(5 * time.Second)
continue
}
var podList struct {
Items []struct {
Metadata struct {
Name string `json:"name"`
} `json:"metadata"`
Status struct {
Phase string `json:"phase"`
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
} `json:"conditions"`
} `json:"status"`
} `json:"items"`
}
if err = json.Unmarshal([]byte(rawJSON), &podList); err != nil {
lastErr = fmt.Errorf("parse volcano pod json failed: %w", err)
time.Sleep(5 * time.Second)
continue
}
coreKeywords := []string{"volcano-scheduler", "volcano-controllers", "volcano-admission"}
readyCore := make(map[string]bool, len(coreKeywords))
for _, kw := range coreKeywords {
readyCore[kw] = false
}
for _, item := range podList.Items {
for _, kw := range coreKeywords {
if strings.Contains(item.Metadata.Name, kw) && item.Status.Phase == "Running" {
readyCore[kw] = true
}
}
}
allCoreReady := true
for _, kw := range coreKeywords {
if !readyCore[kw] {
allCoreReady = false
break
}
}
if !allCoreReady {
notReady := make([]string, 0)
for _, kw := range coreKeywords {
if !readyCore[kw] {
notReady = append(notReady, kw)
}
}
lastErr = fmt.Errorf("volcano core pods not all running yet, missing: %s", strings.Join(notReady, ", "))
time.Sleep(5 * time.Second)
continue
}
epCmd := fmt.Sprintf(
"kubectl get endpoints volcano-admission-service -n %s -o json 2>/dev/null",
ShellQuote(namespace),
)
epJSON, err := ExecStdout(sshClient, epCmd)
if err != nil {
lastErr = fmt.Errorf("query volcano-admission endpoints failed: %w", err)
time.Sleep(5 * time.Second)
continue
}
var ep struct {
Subsets []struct {
Addresses []struct {
IP string `json:"ip"`
} `json:"addresses"`
NotReadyAddresses []struct {
IP string `json:"ip"`
} `json:"notReadyAddresses"`
} `json:"subsets"`
}
if err = json.Unmarshal([]byte(epJSON), &ep); err != nil {
lastErr = fmt.Errorf("parse volcano-admission endpoints json failed: %w", err)
time.Sleep(5 * time.Second)
continue
}
readyCount := 0
for _, subset := range ep.Subsets {
readyCount += len(subset.Addresses)
}
if readyCount == 0 {
lastErr = fmt.Errorf("volcano-admission-service has no ready endpoints yet")
time.Sleep(5 * time.Second)
continue
}
return nil
}
if lastErr == nil {
lastErr = fmt.Errorf("volcano components not ready within %s", timeout)
}
return lastErr
}
func AssertPodSnapshotsStable(before []PodSnapshot, current []PodSnapshot) error {
currentByUID := make(map[string]PodSnapshot, len(current))
for _, item := range current {
currentByUID[item.UID] = item
}
for _, old := range before {
newer, exists := currentByUID[old.UID]
if !exists {
return fmt.Errorf("pod was recreated or disappeared: name=%s uid=%s", old.Name, old.UID)
}
if !newer.Ready || newer.Phase != "Running" {
return fmt.Errorf("pod is not running and ready: name=%s phase=%s ready=%v", newer.Name, newer.Phase, newer.Ready)
}
if newer.RestartCount != old.RestartCount {
return fmt.Errorf("pod restart count changed: name=%s before=%d after=%d", newer.Name, old.RestartCount, newer.RestartCount)
}
}
return nil
}
func BackupFile(sshClient *executor.SSHExecutor, filePath string) (string, error) {
sanitized := strings.NewReplacer("/", "_", "\\", "_", ":", "_").Replace(filePath)
backupPath := fmt.Sprintf("/tmp/numa-functional-%d-%s.bak", time.Now().UnixNano(), sanitized)
cmd := fmt.Sprintf("cp %s %s", ShellQuote(filePath), ShellQuote(backupPath))
if _, err := ExecStdout(sshClient, cmd); err != nil {
return "", err
}
return backupPath, nil
}
func RestoreFile(sshClient *executor.SSHExecutor, backupPath string, targetPath string) error {
cmd := fmt.Sprintf("cp %s %s", ShellQuote(backupPath), ShellQuote(targetPath))
_, err := ExecStdout(sshClient, cmd)
return err
}
func RemoveFile(sshClient *executor.SSHExecutor, filePath string) {
_, _ = ExecStdout(sshClient, fmt.Sprintf("rm -f %s", ShellQuote(filePath)))
}
func EnsurePreferClosestNumaNodesFalse(sshClient *executor.SSHExecutor, kubeletConfigPath string) (bool, error) {
cmd := fmt.Sprintf(
"if grep -Eq '^[[:space:]]*prefer-closest-numa-nodes:[[:space:]]*true([[:space:]]|$)' %s; then "+
"sed -i -E 's/^([[:space:]]*prefer-closest-numa-nodes:[[:space:]]*)true([[:space:]]|$)/\\1false\\2/' %s; "+
"echo changed; else echo unchanged; fi",
ShellQuote(kubeletConfigPath),
ShellQuote(kubeletConfigPath),
)
out, err := ExecStdout(sshClient, cmd)
if err != nil {
return false, err
}
return strings.Contains(strings.ToLower(out), "changed"), nil
}
func ParseSingleLabelSelector(selector string) (string, string, error) {
parts := strings.Split(selector, "=")
if len(parts) != 2 {
return "", "", fmt.Errorf("selector should be single key=value, got: %s", selector)
}
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
if key == "" || value == "" {
return "", "", fmt.Errorf("selector key/value should not be empty: %s", selector)
}
return key, value, nil
}
func BuildBusinessDeploymentYAML(namespace string, deploymentName string, labelKey string, labelValue string) string {
return fmt.Sprintf(`apiVersion: apps/v1
kind: Deployment
metadata:
name: %s
namespace: %s
spec:
replicas: 1
selector:
matchLabels:
%s: %s
template:
metadata:
labels:
%s: %s
spec:
containers:
- name: pause
image: alpine:latest
imagePullPolicy: IfNotPresent
command: ["sleep", "360000"]
resources:
limits:
cpu: "100m"
memory: "64Mi"
requests:
cpu: "100m"
memory: "64Mi"`, deploymentName, namespace, labelKey, labelValue, labelKey, labelValue)
}
func ShellQuote(text string) string {
escaped := strings.ReplaceAll(text, "'", `'"'"'`)
return fmt.Sprintf("'%s'", escaped)
}
func GenerateCaseNamespace(caseID string) string {
normalized := strings.ToLower(caseID)
normalized = strings.ReplaceAll(normalized, "_", "-")
normalized = strings.ReplaceAll(normalized, ".", "-")
return fmt.Sprintf("%s-%d", normalized, time.Now().Unix())
}
func BuildVolcanoDeploymentYAML(namespace string, deploymentName string, podLabel string, schedulerName string) string {
return fmt.Sprintf(`apiVersion: apps/v1
kind: Deployment
metadata:
name: %s
namespace: %s
spec:
replicas: 1
selector:
matchLabels:
app: %s
template:
metadata:
labels:
app: %s
annotations:
volcano.sh/numa-topology-policy: single-numa-node
spec:
schedulerName: %s
containers:
- name: pause
image: alpine:latest
imagePullPolicy: IfNotPresent
command: ["sleep", "360000"]
resources:
limits:
cpu: "100m"
memory: "64Mi"
requests:
cpu: "100m"
memory: "64Mi"`, deploymentName, namespace, podLabel, podLabel, schedulerName)
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
trimmed := strings.TrimSpace(value)
if trimmed != "" {
return trimmed
}
}
return ""
}
func parseBoolWithDefault(raw string, defaultValue bool) (bool, error) {
trimmed := strings.TrimSpace(raw)
if trimmed == "" {
return defaultValue, nil
}
parsed, err := strconv.ParseBool(trimmed)
if err != nil {
return false, fmt.Errorf("invalid bool value %q: %w", raw, err)
}
return parsed, nil
}