package utils
import (
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"gitcode.com/openFuyao/e2e-auto-test/e2e/framework/k8s"
)
func containerStateText(state corev1.ContainerState) string {
if state.Waiting != nil {
return fmt.Sprintf("waiting(reason=%s message=%q)", state.Waiting.Reason, state.Waiting.Message)
}
if state.Running != nil {
return fmt.Sprintf("running(startedAt=%s)", state.Running.StartedAt.Time.Format(time.RFC3339))
}
if state.Terminated != nil {
return fmt.Sprintf(
"terminated(reason=%s exitCode=%d message=%q)",
state.Terminated.Reason,
state.Terminated.ExitCode,
state.Terminated.Message,
)
}
return "unknown"
}
func eventTimestamp(evt corev1.Event) time.Time {
if !evt.EventTime.Time.IsZero() {
return evt.EventTime.Time
}
if !evt.LastTimestamp.Time.IsZero() {
return evt.LastTimestamp.Time
}
return evt.CreationTimestamp.Time
}
func printRecentPodEvents(c *k8s.K8SClient, ctx context.Context, namespace, podName string, maxItems int) {
events, err := c.Clientset.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.kind=Pod,involvedObject.name=%s", podName),
})
if err != nil {
fmt.Printf("[WaitDebug] failed to list events for pod %s/%s: %v\n", namespace, podName, err)
return
}
if len(events.Items) == 0 {
fmt.Printf("[WaitDebug] no events found for pod %s/%s\n", namespace, podName)
return
}
sort.Slice(events.Items, func(i, j int) bool {
return eventTimestamp(events.Items[i]).Before(eventTimestamp(events.Items[j]))
})
start := 0
if maxItems > 0 && len(events.Items) > maxItems {
start = len(events.Items) - maxItems
}
for _, evt := range events.Items[start:] {
ts := eventTimestamp(evt).Format(time.RFC3339)
fmt.Printf("[WaitDebug] pod event %s/%s time=%s type=%s reason=%s message=%s\n", namespace, podName, ts, evt.Type, evt.Reason, evt.Message)
}
}
func printPodTimeoutDiagnostics(c *k8s.K8SClient, ctx context.Context, name, namespace, expectedPhase string, timeout time.Duration) {
fmt.Printf("[WaitPodCondition] timeout waiting for pod %s/%s to reach phase=%s within %v\n", namespace, name, expectedPhase, timeout)
pod, err := c.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
fmt.Printf("[WaitPodCondition] failed to get pod %s/%s: %v\n", namespace, name, err)
printRecentPodEvents(c, ctx, namespace, name, 8)
return
}
fmt.Printf(
"[WaitPodCondition] current pod phase=%s reason=%s message=%q node=%s scheduler=%s\n",
pod.Status.Phase,
pod.Status.Reason,
pod.Status.Message,
pod.Spec.NodeName,
pod.Spec.SchedulerName,
)
for _, cond := range pod.Status.Conditions {
fmt.Printf(
"[WaitPodCondition] condition type=%s status=%s reason=%s message=%q\n",
cond.Type,
cond.Status,
cond.Reason,
cond.Message,
)
}
for _, cs := range pod.Status.InitContainerStatuses {
fmt.Printf(
"[WaitPodCondition] init-container=%s ready=%t restarts=%d state=%s\n",
cs.Name,
cs.Ready,
cs.RestartCount,
containerStateText(cs.State),
)
}
for _, cs := range pod.Status.ContainerStatuses {
fmt.Printf(
"[WaitPodCondition] container=%s ready=%t restarts=%d state=%s\n",
cs.Name,
cs.Ready,
cs.RestartCount,
containerStateText(cs.State),
)
}
printRecentPodEvents(c, ctx, namespace, name, 8)
}
func printDaemonSetTimeoutDiagnostics(c *k8s.K8SClient, ctx context.Context, name, namespace, waitMode string, timeout time.Duration) {
fmt.Printf("[WaitDaemonSet] timeout waiting for daemonset %s/%s mode=%s within %v\n", namespace, name, waitMode, timeout)
gvr := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}
ds, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
fmt.Printf("[WaitDaemonSet] failed to get daemonset %s/%s: %v\n", namespace, name, err)
return
}
desired, _, _ := unstructured.NestedInt64(ds.Object, "status", "desiredNumberScheduled")
ready, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberReady")
available, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberAvailable")
updated, _, _ := unstructured.NestedInt64(ds.Object, "status", "updatedNumberScheduled")
misscheduled, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberMisscheduled")
fmt.Printf(
"[WaitDaemonSet] status desired=%d ready=%d available=%d updated=%d misscheduled=%d\n",
desired,
ready,
available,
updated,
misscheduled,
)
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", name)})
if err != nil {
fmt.Printf("[WaitDaemonSet] failed to list daemonset pods by label app=%s: %v\n", name, err)
return
}
if len(pods.Items) == 0 {
fmt.Printf("[WaitDaemonSet] no pods found by label app=%s\n", name)
return
}
for _, pod := range pods.Items {
fmt.Printf(
"[WaitDaemonSet] pod=%s phase=%s reason=%s message=%q node=%s\n",
pod.Name,
pod.Status.Phase,
pod.Status.Reason,
pod.Status.Message,
pod.Spec.NodeName,
)
for _, cs := range pod.Status.ContainerStatuses {
fmt.Printf(
"[WaitDaemonSet] pod=%s container=%s ready=%t restarts=%d state=%s\n",
pod.Name,
cs.Name,
cs.Ready,
cs.RestartCount,
containerStateText(cs.State),
)
}
printRecentPodEvents(c, ctx, namespace, pod.Name, 3)
}
}
type StressConfig struct {
CPUCores int
CPULoad int
VM int
VMBytes int
}
func DecodePodFromManifest(manifest string, name string, cpuRequest string, cpuLimit string) (*corev1.Pod, error) {
decoder := scheme.Codecs.UniversalDeserializer()
obj, _, err := decoder.Decode([]byte(manifest), nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("manifest is not a Pod")
}
pod.Name = name
pod.Namespace = "default"
cpuRequest = strings.TrimSpace(cpuRequest)
cpuLimit = strings.TrimSpace(cpuLimit)
ctr := &pod.Spec.Containers[0]
if cpuRequest != "" {
if ctr.Resources.Requests == nil {
ctr.Resources.Requests = corev1.ResourceList{}
}
ctr.Resources.Requests[corev1.ResourceCPU] = mustParseQuantity(cpuRequest)
}
if cpuLimit != "" {
if ctr.Resources.Limits == nil {
ctr.Resources.Limits = corev1.ResourceList{}
}
ctr.Resources.Limits[corev1.ResourceCPU] = mustParseQuantity(cpuLimit)
}
return pod, nil
}
func isTransientWebhookCreateError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
if !strings.Contains(msg, "failed calling webhook") {
return false
}
transientSignals := []string{
"connection refused",
"no endpoints available for service",
"i/o timeout",
"context deadline exceeded",
"service unavailable",
"timeout",
"eof",
}
for _, signal := range transientSignals {
if strings.Contains(msg, signal) {
return true
}
}
return false
}
func InstallPod(c *k8s.K8SClient, ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) {
if pod == nil {
return nil, fmt.Errorf("pod is nil")
}
const maxAttempts = 4
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
createdPod, err := c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod.DeepCopy(), metav1.CreateOptions{})
if err == nil {
return createdPod, nil
}
if errors.IsAlreadyExists(err) {
existing, getErr := c.Clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if getErr == nil {
return existing, nil
}
return nil, fmt.Errorf("pod %s/%s already exists but get failed: %w", pod.Namespace, pod.Name, getErr)
}
lastErr = err
if !isTransientWebhookCreateError(err) || attempt == maxAttempts {
break
}
time.Sleep(time.Duration(2*attempt) * time.Second)
}
if lastErr == nil {
return nil, fmt.Errorf("create pod %s/%s failed with unknown error", pod.Namespace, pod.Name)
}
return nil, fmt.Errorf("create pod %s/%s failed: %w", pod.Namespace, pod.Name, lastErr)
}
func InstallBEPod(c *k8s.K8SClient, ctx context.Context, cpuRequest string, cpuLimit string) (*corev1.Pod, error) {
return InstallBEPodNamed(c, ctx, "be-pod", cpuRequest, cpuLimit)
}
func InstallBEPodNamed(c *k8s.K8SClient, ctx context.Context, podName string, cpuRequest, cpuLimit string) (*corev1.Pod, error) {
pod, err := DecodePodFromManifest(bePodManifest, podName, cpuRequest, cpuLimit)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}
return InstallPod(c, ctx, pod)
}
func InstallLSPod(c *k8s.K8SClient, ctx context.Context, cpuRequest string, cpuLimit string) (*corev1.Pod, error) {
return InstallLSPodNamed(c, ctx, "ls-pod", cpuRequest, cpuLimit)
}
func InstallLSPodNamed(c *k8s.K8SClient, ctx context.Context, podName string, cpuRequest, cpuLimit string) (*corev1.Pod, error) {
pod, err := DecodePodFromManifest(lsPodManifest, podName, cpuRequest, cpuLimit)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}
return InstallPod(c, ctx, pod)
}
func InstallHLSPod(c *k8s.K8SClient, ctx context.Context, cpuRequest string, cpuLimit string) (*corev1.Pod, error) {
return InstallHLSPodNamed(c, ctx, "hls-pod", cpuRequest, cpuLimit)
}
func InstallHLSPodNamed(c *k8s.K8SClient, ctx context.Context, podName string, cpuRequest, cpuLimit string) (*corev1.Pod, error) {
pod, err := DecodePodFromManifest(hlsPodManifest, podName, cpuRequest, cpuLimit)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}
return InstallPod(c, ctx, pod)
}
func PodExists(c *k8s.K8SClient, ctx context.Context, name, namespace string) bool {
_, err := c.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
return err == nil
}
func WaitPodCondition(c *k8s.K8SClient, ctx context.Context, name, namespace, condition string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pod, err := c.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil && string(pod.Status.Phase) == condition {
return true
}
time.Sleep(1 * time.Second)
}
printPodTimeoutDiagnostics(c, ctx, name, namespace, condition, timeout)
return false
}
func WaitPodDelete(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if _, err := c.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}); err != nil {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
func WaitAllPodsRunningOrSucceeded(c *k8s.K8SClient, ctx context.Context, namespace string, timeout, interval time.Duration) error {
deadline := time.Now().Add(timeout)
var lastNotReady []string
for time.Now().Before(deadline) {
list, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
time.Sleep(interval)
continue
}
if len(list.Items) == 0 {
time.Sleep(interval)
continue
}
notReady := make([]string, 0)
for _, p := range list.Items {
ph := p.Status.Phase
if ph != corev1.PodRunning && ph != corev1.PodSucceeded {
notReady = append(notReady, fmt.Sprintf("%s(%s)", p.Name, ph))
}
}
if len(notReady) == 0 {
return nil
}
lastNotReady = notReady
time.Sleep(interval)
}
return fmt.Errorf("namespace %q: not all pods Running/Succeeded within %v; not ready: %v", namespace, timeout, lastNotReady)
}
func DeletePod(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
if name == "" {
return nil
}
err := c.Clientset.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if timeout > 0 && !WaitPodDelete(c, ctx, name, namespace, timeout) {
return fmt.Errorf("timeout waiting for pod %s/%s to be deleted", namespace, name)
}
return nil
}
func WaitForVolcanoRestart(c *k8s.K8SClient, ctx context.Context, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pods, err := c.Clientset.CoreV1().Pods("volcano-system").List(ctx, metav1.ListOptions{
LabelSelector: "app=volcano-scheduler",
})
if err != nil || len(pods.Items) == 0 {
time.Sleep(1 * time.Second)
continue
}
if pods.Items[0].Status.Phase == corev1.PodRunning {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
func ListPriorityClasses(c *k8s.K8SClient, ctx context.Context) (*unstructured.UnstructuredList, error) {
gvr := schema.GroupVersionResource{
Group: "scheduling.k8s.io",
Version: "v1",
Resource: "priorityclasses",
}
return c.DynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})
}
func GetRubikLog(c *k8s.K8SClient, ctx context.Context) (string, error) {
pods, err := c.Clientset.CoreV1().Pods("openfuyao-colocation").List(ctx, metav1.ListOptions{
LabelSelector: "name=colocation-rubik",
})
if err != nil || len(pods.Items) == 0 {
return "", fmt.Errorf("failed to find rubik pod: %w", err)
}
log, err := c.Clientset.CoreV1().Pods("openfuyao-colocation").GetLogs(
pods.Items[0].Name,
&corev1.PodLogOptions{Container: "colocation-rubik"},
).Do(ctx).Raw()
if err != nil {
return "", fmt.Errorf("failed to get rubik log: %w", err)
}
return string(log), nil
}
const (
ColocationConfigNamespace = "openfuyao-colocation"
ColocationConfigName = "colocation-config"
ColocationVolcanoSchedulerOptsKey = "volcano-scheduler-options"
ColocationRubikOptsKey = "rubik-options"
)
func GetColocationConfigMap(c *k8s.K8SClient, ctx context.Context) (*corev1.ConfigMap, error) {
return c.Clientset.CoreV1().ConfigMaps(ColocationConfigNamespace).Get(ctx, ColocationConfigName, metav1.GetOptions{})
}
func PatchColocationVolcanoSchedulerUsage(c *k8s.K8SClient, ctx context.Context, enabled bool, cpuPercent, memPercent int) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return err
}
raw := ""
if cm.Data != nil {
raw = strings.TrimSpace(cm.Data[ColocationVolcanoSchedulerOptsKey])
}
var root map[string]interface{}
if raw != "" {
if err := json.Unmarshal([]byte(raw), &root); err != nil {
return fmt.Errorf("parse volcano-scheduler-options: %w", err)
}
} else {
root = map[string]interface{}{}
}
if enabled {
root["usagePlugin"] = map[string]interface{}{
"enable": true,
"usageThreshold": map[string]interface{}{
"enable": true,
"cpu": cpuPercent,
"memory": memPercent,
},
}
} else {
root["usagePlugin"] = map[string]interface{}{
"enable": false,
"usageThreshold": map[string]interface{}{
"enable": false,
},
}
}
out, err := json.Marshal(root)
if err != nil {
return fmt.Errorf("marshal volcano-scheduler-options: %w", err)
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[ColocationVolcanoSchedulerOptsKey] = string(out)
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config volcano-scheduler-options: %w", err)
}
return nil
}
func UpdateColocationVolcanoSchedulerOptions(c *k8s.K8SClient, ctx context.Context, volcanoOptsJSON string) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return fmt.Errorf("get colocation-config: %w", err)
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[ColocationVolcanoSchedulerOptsKey] = volcanoOptsJSON
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config volcano-scheduler-options: %w", err)
}
return nil
}
func UpdateColocationRubikOptions(c *k8s.K8SClient, ctx context.Context, rubikOptsJSON string) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return fmt.Errorf("get colocation-config: %w", err)
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[ColocationRubikOptsKey] = rubikOptsJSON
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config rubik-options: %w", err)
}
return nil
}
func PatchColocationRubikEviction(c *k8s.K8SClient, ctx context.Context, enabled bool, cpuEvictThreshold, memEvictThreshold int) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return err
}
raw := ""
if cm.Data != nil {
raw = strings.TrimSpace(cm.Data[ColocationRubikOptsKey])
}
var root map[string]interface{}
if raw != "" {
if err := json.Unmarshal([]byte(raw), &root); err != nil {
return fmt.Errorf("parse rubik-options: %w", err)
}
} else {
root = map[string]interface{}{}
}
if enabled {
root["eviction"] = map[string]interface{}{
"enable": true,
"cpuevict": map[string]interface{}{
"threshold": cpuEvictThreshold,
},
"memoryevict": map[string]interface{}{
"threshold": memEvictThreshold,
},
}
} else {
root["eviction"] = map[string]interface{}{
"enable": false,
"cpuevict": map[string]interface{}{},
"memoryevict": map[string]interface{}{},
}
}
out, err := json.Marshal(root)
if err != nil {
return fmt.Errorf("marshal rubik-options: %w", err)
}
cm.Data[ColocationRubikOptsKey] = string(out)
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config rubik-options: %w", err)
}
return nil
}
func UpdateVolcanoConfigMap(c *k8s.K8SClient, ctx context.Context, configData string) error {
cm, err := c.Clientset.CoreV1().ConfigMaps("volcano-system").Get(ctx, "volcano-scheduler-configmap", metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get volcano configmap: %w", err)
}
cm.Data["volcano-scheduler.conf"] = configData
_, err = c.Clientset.CoreV1().ConfigMaps("volcano-system").Update(ctx, cm, metav1.UpdateOptions{})
return err
}
func GetVolcanoConfigMap(c *k8s.K8SClient, ctx context.Context) (*corev1.ConfigMap, error) {
return c.Clientset.CoreV1().ConfigMaps("volcano-system").Get(ctx, "volcano-scheduler-configmap", metav1.GetOptions{})
}
func ExecInPod(c *k8s.K8SClient, ctx context.Context, namespace, podName, containerName string, command []string) error {
req := c.Clientset.CoreV1().RESTClient().Post().
Namespace(namespace).
Resource("pods").
Name(podName).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: command,
Stdin: false,
Stdout: false,
Stderr: false,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(c.RestConfig, "POST", req.URL())
if err != nil {
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: nil,
Stdout: nil,
Stderr: nil,
Tty: false,
})
}
func StopStressProcess(c *k8s.K8SClient, ctx context.Context) error {
pods, err := c.Clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{
LabelSelector: "app=stress-ng",
})
if err != nil {
return err
}
for _, pod := range pods.Items {
_ = ExecInPod(c, ctx, "default", pod.Name, "stress-ng", []string{"sh", "-c", "pkill -9 stress-ng || true"})
}
return nil
}
func InstallStressPod(c *k8s.K8SClient, ctx context.Context, hlsResources bool) (*corev1.Pod, error) {
decoder := scheme.Codecs.UniversalDeserializer()
obj, _, err := decoder.Decode([]byte(stressPodManifest), nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to decode manifest: %w", err)
}
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("manifest is not a Pod")
}
if hlsResources {
delete(pod.Labels, "volcano.sh/preemptable")
delete(pod.Annotations, "volcano.sh/preemptable")
pod.Annotations["openfuyao.com/qos-level"] = "HLS"
pod.Spec.Containers[0].Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("1"),
corev1.ResourceMemory: mustParseQuantity("8Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("1"),
corev1.ResourceMemory: mustParseQuantity("8Gi"),
},
}
}
createdPod, err := c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create stress pod: %w", err)
}
if !WaitPodCondition(c, ctx, createdPod.Name, createdPod.Namespace, "Running", 10*time.Second) {
_ = c.Clientset.CoreV1().Pods(createdPod.Namespace).Delete(ctx, createdPod.Name, metav1.DeleteOptions{})
return nil, fmt.Errorf("stress pod failed to start")
}
return createdPod, nil
}
func StartStressProcess(c *k8s.K8SClient, ctx context.Context, name, namespace string, config StressConfig) error {
var cmdParts []string
cmdParts = append(cmdParts, "stress-ng")
if config.CPUCores > 0 && config.CPULoad > 0 {
cmdParts = append(cmdParts, "--cpu", fmt.Sprintf("%d", config.CPUCores))
cmdParts = append(cmdParts, "--cpu-method", "pi")
cmdParts = append(cmdParts, "--cpu-load", fmt.Sprintf("%d", config.CPULoad))
}
if config.VM > 0 && config.VMBytes > 0 {
cmdParts = append(cmdParts, "--vm", fmt.Sprintf("%d", config.VM))
cmdParts = append(cmdParts, "--vm-bytes", fmt.Sprintf("%d%%", config.VMBytes))
}
command := fmt.Sprintf("%s > /dev/null 2>&1 &", strings.Join(cmdParts, " "))
return ExecInPod(c, ctx, namespace, name, "stress-ng", []string{"sh", "-c", command})
}
func CheckNodeAllocatableCPU(c *k8s.K8SClient, ctx context.Context, minMilliCPU int64) (bool, error) {
nodes, err := c.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("list nodes: %w", err)
}
var totalAllocatable, totalRequested int64
for _, node := range nodes.Items {
totalAllocatable += node.Status.Allocatable.Cpu().MilliValue()
}
pods, err := c.Clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{
FieldSelector: "status.phase!=Succeeded,status.phase!=Failed",
})
if err != nil {
return false, fmt.Errorf("list pods: %w", err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName == "" {
continue
}
for _, container := range pod.Spec.Containers {
if cpuReq, ok := container.Resources.Requests[corev1.ResourceCPU]; ok {
totalRequested += cpuReq.MilliValue()
}
}
}
available := totalAllocatable - totalRequested
return available >= minMilliCPU, nil
}
func CheckVolcanoExists(c *k8s.K8SClient, ctx context.Context) bool {
pods, err := c.Clientset.CoreV1().Pods("volcano-system").List(ctx, metav1.ListOptions{})
if err != nil {
return false
}
for _, pod := range pods.Items {
if strings.Contains(pod.Name, "volcano") {
return true
}
}
return false
}
func CheckVolcanoNotExists(c *k8s.K8SClient, ctx context.Context) bool {
pods, err := c.Clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
return false
}
for _, pod := range pods.Items {
if strings.Contains(pod.Name, "volcano") {
return false
}
}
return true
}
func GetRubikLogPattern(c *k8s.K8SClient, ctx context.Context, pattern string) (bool, error) {
log, err := GetRubikLog(c, ctx)
if err != nil {
return false, err
}
matched, err := regexp.MatchString(pattern, log)
if err != nil {
return false, err
}
return matched, nil
}
func WaitVolcanoWebhookReady(c *k8s.K8SClient, ctx context.Context, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pods, err := c.Clientset.CoreV1().Pods("volcano-system").List(ctx, metav1.ListOptions{
LabelSelector: "app=volcano-admission",
})
if err != nil {
time.Sleep(2 * time.Second)
continue
}
allReady := len(pods.Items) > 0
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
allReady = false
break
}
for _, cs := range pod.Status.ContainerStatuses {
if !cs.Ready {
allReady = false
break
}
}
}
if allReady {
serviceNames := []string{"volcano-admission-service"}
services, svcErr := c.Clientset.CoreV1().Services("volcano-system").List(ctx, metav1.ListOptions{
LabelSelector: "app=volcano-admission",
})
if svcErr == nil {
for _, svc := range services.Items {
exists := false
for _, name := range serviceNames {
if name == svc.Name {
exists = true
break
}
}
if !exists {
serviceNames = append(serviceNames, svc.Name)
}
}
}
for _, serviceName := range serviceNames {
endpointSlices, epErr := c.Clientset.DiscoveryV1().EndpointSlices("volcano-system").List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("kubernetes.io/service-name=%s", serviceName),
})
if epErr != nil {
continue
}
hasAddress := false
for _, endpointSlice := range endpointSlices.Items {
for _, endpoint := range endpointSlice.Endpoints {
if len(endpoint.Addresses) == 0 {
continue
}
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
hasAddress = true
break
}
if hasAddress {
break
}
}
if hasAddress {
time.Sleep(5 * time.Second)
return nil
}
}
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("volcano admission webhook not ready within %v", timeout)
}
func WaitVolcanoSchedulerReady(c *k8s.K8SClient, ctx context.Context, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pods, err := c.Clientset.CoreV1().Pods("volcano-system").List(ctx, metav1.ListOptions{
LabelSelector: "app=volcano-scheduler",
})
if err != nil {
time.Sleep(2 * time.Second)
continue
}
allReady := len(pods.Items) > 0
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
allReady = false
break
}
for _, cs := range pod.Status.ContainerStatuses {
if !cs.Ready {
allReady = false
break
}
}
}
if allReady {
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("volcano scheduler not ready within %v", timeout)
}
func EnsureStressPodDeleted(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
err := c.Clientset.CoreV1().Pods(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete pod %s/%s: %w", namespace, name, err)
}
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
_, err := c.Clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("pod %s/%s still exists after %v", namespace, name, timeout)
}
const (
ColocationNUMAAffinityOptsKey = "numa-affinity-options"
NUMAPluginPodPrefix = "colocation-overquota-agent"
)
func GetNUMAPluginLogs(c *k8s.K8SClient, ctx context.Context, tailLines int64) (string, error) {
pods, err := c.Clientset.CoreV1().Pods(ColocationConfigNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("failed to list pods in namespace %s: %w", ColocationConfigNamespace, err)
}
var allLogs strings.Builder
foundPod := false
for _, pod := range pods.Items {
if !strings.HasPrefix(pod.Name, NUMAPluginPodPrefix) {
continue
}
foundPod = true
opts := &corev1.PodLogOptions{}
if tailLines > 0 {
opts.TailLines = &tailLines
}
log, err := c.Clientset.CoreV1().Pods(ColocationConfigNamespace).GetLogs(
pod.Name, opts,
).Do(ctx).Raw()
if err != nil {
continue
}
allLogs.WriteString(string(log))
allLogs.WriteString("\n")
}
if !foundPod {
return "", fmt.Errorf("no NUMA plugin pod found with prefix %s", NUMAPluginPodPrefix)
}
return allLogs.String(), nil
}
func CheckNUMALogPattern(c *k8s.K8SClient, ctx context.Context, pattern string, tailLines int64) (bool, error) {
logs, err := GetNUMAPluginLogs(c, ctx, tailLines)
if err != nil {
return false, err
}
matched, err := regexp.MatchString(pattern, logs)
if err != nil {
return false, fmt.Errorf("invalid pattern %q: %w", pattern, err)
}
return matched, nil
}
func PatchColocationNUMAAffinityEnable(c *k8s.K8SClient, ctx context.Context, enabled bool) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return err
}
raw := ""
if cm.Data != nil {
raw = strings.TrimSpace(cm.Data[ColocationNUMAAffinityOptsKey])
}
var root map[string]interface{}
if raw != "" {
if err := json.Unmarshal([]byte(raw), &root); err != nil {
return fmt.Errorf("parse numa-affinity-options: %w", err)
}
} else {
root = map[string]interface{}{}
}
root["enable"] = enabled
out, err := json.Marshal(root)
if err != nil {
return fmt.Errorf("marshal numa-affinity-options: %w", err)
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[ColocationNUMAAffinityOptsKey] = string(out)
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config numa-affinity-options: %w", err)
}
return nil
}
func GetColocationNUMAAffinityOptions(c *k8s.K8SClient, ctx context.Context) (string, error) {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return "", err
}
if cm.Data == nil {
return "", nil
}
return cm.Data[ColocationNUMAAffinityOptsKey], nil
}
func UpdateColocationNUMAAffinityOptions(c *k8s.K8SClient, ctx context.Context, numaOptsJSON string) error {
cm, err := GetColocationConfigMap(c, ctx)
if err != nil {
return fmt.Errorf("get colocation-config: %w", err)
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[ColocationNUMAAffinityOptsKey] = numaOptsJSON
_, err = c.Clientset.CoreV1().ConfigMaps(cm.Namespace).Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update colocation-config numa-affinity-options: %w", err)
}
return nil
}
func WaitNUMAPluginReady(c *k8s.K8SClient, ctx context.Context, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pods, err := c.Clientset.CoreV1().Pods(ColocationConfigNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
time.Sleep(2 * time.Second)
continue
}
foundPod := false
allReady := true
for _, pod := range pods.Items {
if !strings.HasPrefix(pod.Name, NUMAPluginPodPrefix) {
continue
}
foundPod = true
if pod.Status.Phase != corev1.PodRunning {
allReady = false
break
}
for _, cs := range pod.Status.ContainerStatuses {
if !cs.Ready {
allReady = false
break
}
}
}
if foundPod && allReady {
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("NUMA affinity plugin not ready within %v", timeout)
}
func InstallNoQoSPod(c *k8s.K8SClient, ctx context.Context, name string) (*corev1.Pod, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ColocationConfigNamespace,
},
Spec: corev1.PodSpec{
SchedulerName: "volcano",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "docker.io/library/nginx:latest",
Command: []string{"sleep", "3600"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("100m"),
corev1.ResourceMemory: mustParseQuantity("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("200m"),
corev1.ResourceMemory: mustParseQuantity("256Mi"),
},
},
},
},
},
}
return c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func InstallHLSPodInNamespace(c *k8s.K8SClient, ctx context.Context, name, namespace string) (*corev1.Pod, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: map[string]string{
"openfuyao.com/qos-level": "HLS",
},
},
Spec: corev1.PodSpec{
SchedulerName: "volcano",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "docker.io/library/nginx:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "3600"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("1"),
corev1.ResourceMemory: mustParseQuantity("256Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("1"),
corev1.ResourceMemory: mustParseQuantity("256Mi"),
},
},
},
},
},
}
return c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func InstallLSPodInNamespace(c *k8s.K8SClient, ctx context.Context, name, namespace string) (*corev1.Pod, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: map[string]string{
"openfuyao.com/qos-level": "LS",
},
},
Spec: corev1.PodSpec{
SchedulerName: "volcano",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "docker.io/library/nginx:latest",
Command: []string{"sleep", "3600"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("100m"),
corev1.ResourceMemory: mustParseQuantity("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("500m"),
corev1.ResourceMemory: mustParseQuantity("512Mi"),
},
},
},
},
},
}
return c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func InstallBEPodInNamespace(c *k8s.K8SClient, ctx context.Context, name, namespace string) (*corev1.Pod, error) {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"volcano.sh/preemptable": "true",
},
Annotations: map[string]string{
"openfuyao.com/qos-level": "BE",
"volcano.sh/preemptable": "true",
},
},
Spec: corev1.PodSpec{
SchedulerName: "volcano",
Containers: []corev1.Container{
{
Name: "nginx",
Image: "docker.io/library/nginx:latest",
Command: []string{"sleep", "3600"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("100m"),
corev1.ResourceMemory: mustParseQuantity("128Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: mustParseQuantity("200m"),
corev1.ResourceMemory: mustParseQuantity("256Mi"),
},
},
},
},
},
}
return c.Clientset.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func InstallHLSDeployment(c *k8s.K8SClient, ctx context.Context, name, namespace string, replicas int32) error {
deploy := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "HLS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"imagePullPolicy": "IfNotPresent",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "1",
"memory": "256Mi",
},
"limits": map[string]interface{}{
"cpu": "1",
"memory": "256Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, deploy, metav1.CreateOptions{})
return err
}
func InstallLSDeployment(c *k8s.K8SClient, ctx context.Context, name, namespace string, replicas int32) error {
deploy := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "LS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "512Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, deploy, metav1.CreateOptions{})
return err
}
func DeleteDeployment(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
return c.DynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
func WaitDeploymentReady(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
for time.Now().Before(deadline) {
deploy, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
replicas, _, _ := unstructured.NestedInt64(deploy.Object, "spec", "replicas")
readyReplicas, _, _ := unstructured.NestedInt64(deploy.Object, "status", "readyReplicas")
if readyReplicas == replicas && replicas > 0 {
return nil
}
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("deployment %s not ready within %v", name, timeout)
}
func InstallLSStatefulSet(c *k8s.K8SClient, ctx context.Context, name, namespace string, replicas int32) error {
sts := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"serviceName": name,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "LS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "512Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, sts, metav1.CreateOptions{})
return err
}
func DeleteStatefulSet(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
}
return c.DynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
func WaitStatefulSetReady(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
}
for time.Now().Before(deadline) {
sts, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
replicas, _, _ := unstructured.NestedInt64(sts.Object, "spec", "replicas")
readyReplicas, _, _ := unstructured.NestedInt64(sts.Object, "status", "readyReplicas")
if readyReplicas == replicas && replicas > 0 {
return nil
}
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("statefulset %s not ready within %v", name, timeout)
}
func InstallLSJob(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
job := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "LS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"restartPolicy": "Never",
"containers": []interface{}{
map[string]interface{}{
"name": "job",
"image": "docker.io/library/busybox:latest",
"command": []string{"sleep", "60"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "512Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "jobs",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, job, metav1.CreateOptions{})
return err
}
func DeleteJob(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "jobs",
}
propagationPolicy := metav1.DeletePropagationForeground
return c.DynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
}
func WaitJobPodRunning(c *k8s.K8SClient, ctx context.Context, jobName, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", jobName),
})
if err == nil && len(pods.Items) > 0 {
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded {
return nil
}
}
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("job %s pod not running within %v", jobName, timeout)
}
func InstallLSDaemonSet(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
ds := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "DaemonSet",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "LS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"tolerations": []interface{}{
map[string]interface{}{
"operator": "Exists",
},
},
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "512Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "daemonsets",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, ds, metav1.CreateOptions{})
return err
}
func DeleteDaemonSet(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "daemonsets",
}
return c.DynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}
func WaitDaemonSetReady(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "daemonsets",
}
for time.Now().Before(deadline) {
ds, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
desiredNumberScheduled, _, _ := unstructured.NestedInt64(ds.Object, "status", "desiredNumberScheduled")
numberReady, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberReady")
if numberReady == desiredNumberScheduled && desiredNumberScheduled > 0 {
return nil
}
}
time.Sleep(2 * time.Second)
}
printDaemonSetTimeoutDiagnostics(c, ctx, name, namespace, "all-ready", timeout)
return fmt.Errorf("daemonset %s not ready within %v", name, timeout)
}
func WaitDaemonSetAtLeastOneReady(c *k8s.K8SClient, ctx context.Context, name, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "daemonsets",
}
var lastDesired, lastReady, lastAvailable int64
for time.Now().Before(deadline) {
ds, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
desiredNumberScheduled, _, _ := unstructured.NestedInt64(ds.Object, "status", "desiredNumberScheduled")
numberReady, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberReady")
numberAvailable, _, _ := unstructured.NestedInt64(ds.Object, "status", "numberAvailable")
lastDesired, lastReady, lastAvailable = desiredNumberScheduled, numberReady, numberAvailable
if desiredNumberScheduled > 0 && numberReady > 0 && numberAvailable > 0 {
return nil
}
}
time.Sleep(2 * time.Second)
}
printDaemonSetTimeoutDiagnostics(c, ctx, name, namespace, "at-least-one-ready", timeout)
return fmt.Errorf(
"daemonset %s has no ready replicas within %v (desired=%d ready=%d available=%d)",
name,
timeout,
lastDesired,
lastReady,
lastAvailable,
)
}
func InstallLSCronJob(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
cronJob := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "CronJob",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"schedule": "* * * * *",
"jobTemplate": map[string]interface{}{
"spec": map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
"openfuyao.com/qos-level": "LS",
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"restartPolicy": "Never",
"containers": []interface{}{
map[string]interface{}{
"name": "cronjob",
"image": "docker.io/library/busybox:latest",
"command": []string{"sleep", "30"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "512Mi",
},
},
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "cronjobs",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, cronJob, metav1.CreateOptions{})
return err
}
func DeleteCronJob(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "cronjobs",
}
propagationPolicy := metav1.DeletePropagationForeground
return c.DynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
}
func WaitCronJobPodRunning(c *k8s.K8SClient, ctx context.Context, cronJobName, namespace string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "jobs",
}
jobs, err := c.DynamicClient.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err == nil {
for _, job := range jobs.Items {
ownerRefs, _, _ := unstructured.NestedSlice(job.Object, "metadata", "ownerReferences")
for _, ref := range ownerRefs {
refMap, ok := ref.(map[string]interface{})
if !ok {
continue
}
if refMap["kind"] == "CronJob" && refMap["name"] == cronJobName {
jobName := job.GetName()
pods, err := c.Clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", jobName),
})
if err == nil && len(pods.Items) > 0 {
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded {
return nil
}
}
}
}
}
}
}
time.Sleep(5 * time.Second)
}
return fmt.Errorf("cronjob %s pod not running within %v", cronJobName, timeout)
}
func InstallNoQoSDeployment(c *k8s.K8SClient, ctx context.Context, name, namespace string, replicas int32) error {
deploy := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "200m",
"memory": "256Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, deploy, metav1.CreateOptions{})
return err
}
func InstallNoQoSStatefulSet(c *k8s.K8SClient, ctx context.Context, name, namespace string, replicas int32) error {
sts := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "StatefulSet",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"replicas": replicas,
"serviceName": name,
"selector": map[string]interface{}{
"matchLabels": map[string]interface{}{
"app": name,
},
},
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": map[string]interface{}{
"app": name,
},
},
"spec": map[string]interface{}{
"schedulerName": "volcano",
"containers": []interface{}{
map[string]interface{}{
"name": "nginx",
"image": "docker.io/library/nginx:latest",
"command": []string{"sleep", "3600"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "200m",
"memory": "256Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, sts, metav1.CreateOptions{})
return err
}
func InstallNoQoSJob(c *k8s.K8SClient, ctx context.Context, name, namespace string) error {
job := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"template": map[string]interface{}{
"spec": map[string]interface{}{
"schedulerName": "volcano",
"restartPolicy": "Never",
"containers": []interface{}{
map[string]interface{}{
"name": "job",
"image": "docker.io/library/busybox:latest",
"command": []string{"sleep", "60"},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "100m",
"memory": "128Mi",
},
"limits": map[string]interface{}{
"cpu": "200m",
"memory": "256Mi",
},
},
},
},
},
},
},
},
}
gvr := schema.GroupVersionResource{
Group: "batch",
Version: "v1",
Resource: "jobs",
}
_, err := c.DynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, job, metav1.CreateOptions{})
return err
}