* Copyright (c) 2026 Huawei Technologies Co., Ltd.
* openFuyao is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
package warmupjob
import (
"context"
"errors"
"log/slog"
"strings"
"testing"
"time"
warmupv1alpha1 "github.com/openfuyao/weight-dispatcher/api/v1alpha1"
"github.com/openfuyao/weight-dispatcher/pkg/node"
"github.com/openfuyao/weight-dispatcher/pkg/planning/transferplanner"
sharedtypes "github.com/openfuyao/weight-dispatcher/pkg/types"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ktypes "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
func TestValidateJobAcceptsNodeAndHuggingFaceSources(t *testing.T) {
t.Parallel()
nodeSpec := validJobSpec()
if err := validateJob(nodeSpec); err != nil {
t.Fatalf("expected node source spec to pass: %v", err)
}
hfSpec := validJobSpec()
hfSpec.Sources = []warmupv1alpha1.SourceSpec{{
SourceType: "external",
Endpoint: "https://huggingface.co",
Path: "Qwen/Qwen3-8B",
}}
if err := validateJob(hfSpec); err != nil {
t.Fatalf("expected huggingface external source to pass: %v", err)
}
}
func TestValidateJobRejectsInvalidInputs(t *testing.T) {
t.Parallel()
tests := []struct {
name string
mutate func(*warmupv1alpha1.ModelWarmupJobSpec)
wantErr string
}{
{
name: "missing artifact",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Artifact.Key = "" },
wantErr: "spec.artifact.key",
},
{
name: "missing target path",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Target.TargetPath = "" },
wantErr: "spec.target.targetPath",
},
{
name: "missing target selector",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Target.NodeNames = nil },
wantErr: "spec.target.nodeNames",
},
{
name: "node source endpoint explicit",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Sources[0].Endpoint = "10.0.0.1" },
wantErr: "must not set endpoint",
},
{
name: "duplicate source",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Sources = append(spec.Sources, spec.Sources[0]) },
wantErr: "duplicated source",
},
{
name: "unsupported external only",
mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) {
spec.Sources = []warmupv1alpha1.SourceSpec{{SourceType: "external", Endpoint: "https://example.invalid", Path: "/model"}}
},
wantErr: "phase one requires",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
spec := validJobSpec()
tt.mutate(&spec)
err := validateJob(spec)
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("expected error containing %q, got %v", tt.wantErr, err)
}
})
}
}
func TestNormalizeSourcesResolvesNodeEndpointAndRejectsDuplicates(t *testing.T) {
t.Parallel()
reconciler := newResolverReconciler(t, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
got, err := reconciler.normalizeSources(context.Background(), []warmupv1alpha1.SourceSpec{
{SourceType: "node", NodeName: "node-a", Path: "/models/qwen"},
{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
})
if err != nil {
t.Fatalf("normalizeSources returned error: %v", err)
}
if got[0].Endpoint != "10.0.0.1" {
t.Fatalf("expected resolved endpoint, got %#v", got[0])
}
if got[1].Endpoint != "https://huggingface.co" {
t.Fatalf("expected external source unchanged, got %#v", got[1])
}
_, err = reconciler.normalizeSources(context.Background(), []warmupv1alpha1.SourceSpec{
{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
})
if err == nil || !strings.Contains(err.Error(), "duplicated normalized source") {
t.Fatalf("expected duplicate source error, got %v", err)
}
}
func TestSelectManifestSourcePrefersNodeThenHuggingFace(t *testing.T) {
t.Parallel()
reconciler := newResolverReconciler(t, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
source, nodeObj, err := reconciler.selectManifestSource(context.Background(), []warmupv1alpha1.SourceSpec{
{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
{SourceType: "node", NodeName: "node-a", Path: "/models/qwen"},
})
if err != nil {
t.Fatalf("selectManifestSource returned error: %v", err)
}
if source.SourceType != "node" || nodeObj.Name != "node-a" {
t.Fatalf("expected node source first, got source=%#v node=%s", source, nodeObj.Name)
}
source, nodeObj, err = reconciler.selectManifestSource(context.Background(), []warmupv1alpha1.SourceSpec{
{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
})
if err != nil {
t.Fatalf("selectManifestSource returned error for hf source: %v", err)
}
if source.SourceType != "external" || nodeObj.Name != "" {
t.Fatalf("expected external source and empty node, got source=%#v node=%s", source, nodeObj.Name)
}
}
func TestStateHelpersFilterSummarizeAndAggregatePhases(t *testing.T) {
t.Parallel()
states := []warmupv1alpha1.WarmupNodeState{
{NodeName: "node-a", TaskID: "plan-a-node-a", Phase: warmupv1alpha1.JobPhaseSucceeded},
{NodeName: "node-b", TaskID: "old-node-b", Phase: warmupv1alpha1.JobPhaseRunning},
{NodeName: "node-c", Phase: warmupv1alpha1.JobPhasePending},
{NodeName: "node-d", TaskID: "plan-a-node-d", Phase: string(sharedtypes.WarmupPhaseCanceled)},
}
filtered := filterExistingNodeStates(states, "plan-a")
if _, ok := filtered["node-b"]; ok {
t.Fatalf("expected stale task to be filtered out: %#v", filtered)
}
if _, ok := filtered["node-a"]; !ok {
t.Fatalf("expected plan-a task to remain: %#v", filtered)
}
summary := summarize(states)
if summary.Total != 4 || summary.Succeeded != 1 || summary.Running != 1 || summary.Pending != 1 || summary.Failed != 1 {
t.Fatalf("unexpected summary: %#v", summary)
}
if phase := aggregatePhase(summary); phase != warmupv1alpha1.JobPhaseRunning {
t.Fatalf("expected running aggregate phase, got %s", phase)
}
if phase := aggregatePhase(warmupv1alpha1.WarmupSummary{Total: 1, Failed: 1}); phase != warmupv1alpha1.JobPhaseFailed {
t.Fatalf("expected failed aggregate phase, got %s", phase)
}
if phase := aggregatePhase(warmupv1alpha1.WarmupSummary{Total: 1, Succeeded: 1}); phase != warmupv1alpha1.JobPhaseSucceeded {
t.Fatalf("expected succeeded aggregate phase, got %s", phase)
}
}
func TestApplyTaskStatusCopiesProgressAndTimestamps(t *testing.T) {
t.Parallel()
started := time.Date(2026, 5, 1, 1, 2, 3, 0, time.FixedZone("utc+8", 8*60*60))
finished := started.Add(time.Minute)
state := warmupv1alpha1.WarmupNodeState{NodeName: "node-a"}
applyTaskStatus(&state, sharedtypes.TaskStatus{
TaskID: "task-a",
Phase: sharedtypes.WarmupPhaseSucceeded,
Message: "done",
ProgressBytes: 1024,
ThroughputMBps: 512,
TransportPath: sharedtypes.TransportPathRDMA,
CachePath: "/cache/qwen",
StartedAt: &started,
FinishedAt: &finished,
})
if state.TaskID != "task-a" || state.Phase != warmupv1alpha1.JobPhaseSucceeded || state.CachePath != "/cache/qwen" {
t.Fatalf("task status not copied: %#v", state)
}
if state.StartedAt == nil || state.StartedAt.Time.Location() != time.UTC {
t.Fatalf("expected UTC started time, got %#v", state.StartedAt)
}
if state.FinishedAt == nil || state.FinishedAt.Time.Location() != time.UTC {
t.Fatalf("expected UTC finished time, got %#v", state.FinishedAt)
}
}
func TestPathAndManifestHelpers(t *testing.T) {
t.Parallel()
if !isPathWithin("/models/qwen/snapshots/rev", "/models/qwen") {
t.Fatalf("expected nested unix path to be within root")
}
if isPathWithin("/models/qwen", "/models/qwen") {
t.Fatalf("same path must not be treated as nested")
}
if cleanAgentPath("/models/qwen/../qwen3") != "/models/qwen3" {
t.Fatalf("unexpected cleanAgentPath result")
}
if !usesForwardSlashPath("/models/qwen") || usesForwardSlashPath(`C:\models\qwen`) {
t.Fatalf("unexpected forward slash path detection")
}
manifest := sharedtypes.LogicalManifest{Files: []sharedtypes.ArtifactFile{
{RelativePath: "a.bin", SizeBytes: 1, Kind: sharedtypes.ArtifactFileKindAuxiliary, Chunkable: true, Required: true},
}}
if !manifestLayoutsMatch(manifest, manifest) {
t.Fatalf("expected identical manifests to match")
}
changed := manifest
changed.Files = append([]sharedtypes.ArtifactFile(nil), manifest.Files...)
changed.Files[0].SizeBytes = 2
if manifestLayoutsMatch(manifest, changed) {
t.Fatalf("expected changed manifest to mismatch")
}
if sig := manifestLayoutSignature(manifest); !strings.Contains(sig, "a.bin|1|") {
t.Fatalf("unexpected manifest signature %q", sig)
}
if got := manifestChunkSizeBytes(sharedtypes.LogicalManifest{}, validJobSpec()); got != 64*1024*1024 {
t.Fatalf("expected policy chunk size bytes, got %d", got)
}
}
func TestCollectivePeerPlanHelpersPopulateRingEndpoints(t *testing.T) {
t.Parallel()
nodeByName := map[string]corev1.Node{
"node-a": nodeFixture("node-a", "10.0.0.1"),
"node-b": nodeFixture("node-b", "10.0.0.2"),
"node-c": nodeFixture("node-c", "10.0.0.3"),
}
cachePlan := transferplanner.CacheBuildPlan{NodeIntents: []transferplanner.WarmupNodeIntent{
ringIntent("node-a", "node-c", "node-b", 0),
ringIntent("node-b", "node-a", "node-c", 1),
ringIntent("node-c", "node-b", "node-a", 2),
}}
peerPlans, err := buildCollectivePeerPlans(cachePlan.NodeIntents, nodeByName)
if err != nil {
t.Fatalf("buildCollectivePeerPlans returned error: %v", err)
}
if len(peerPlans) != 3 {
t.Fatalf("expected 3 peer plans, got %#v", peerPlans)
}
if err := applyCollectivePeers(&cachePlan, peerPlans); err != nil {
t.Fatalf("applyCollectivePeers returned error: %v", err)
}
for _, intent := range cachePlan.NodeIntents {
if len(intent.Collective.Peers) != 3 {
t.Fatalf("expected every intent to see all peers, got %#v", intent.Collective.Peers)
}
if intent.Collective.Ring.SelfEndpoint == "" || intent.Collective.Ring.PrevEndpoint == "" || intent.Collective.Ring.NextEndpoint == "" {
t.Fatalf("ring endpoints not populated: %#v", intent.Collective.Ring)
}
}
}
func TestPendingAndExistingStateHandlers(t *testing.T) {
t.Parallel()
logger := slog.Default()
nodeObj := nodeFixture("node-a", "10.0.0.1")
intent := ringIntent("node-a", "node-b", "node-b", 0)
execution := sharedtypes.WarmupExecutionPlan{
TaskID: "task-a",
ArtifactKey: "artifact-a",
CollectiveSpec: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeRing, SessionID: "session-a"},
}
reconciler := &Reconciler{Agent: &fakeStateAgent{submitHandle: sharedtypes.TaskHandle{TaskID: "task-a"}}}
state := reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
if state.TaskID != "task-a" || state.Phase != warmupv1alpha1.JobPhaseRunning || state.StartedAt == nil {
t.Fatalf("pending state not submitted: %#v", state)
}
openErr := errors.New("open failed")
state = reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution, openErr: openErr}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
if state.Phase != warmupv1alpha1.JobPhasePending || state.Message != openErr.Error() {
t.Fatalf("open error not reflected: %#v", state)
}
agent := &fakeStateAgent{submitErr: errors.New("submit failed")}
reconciler.Agent = agent
state = reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
if state.Phase != warmupv1alpha1.JobPhasePending || len(agent.completed) != 1 {
t.Fatalf("submit failure should keep pending and cleanup collective, state=%#v completed=%#v", state, agent.completed)
}
reconciler.Agent = &fakeStateAgent{status: sharedtypes.TaskStatus{TaskID: "task-a", Phase: sharedtypes.WarmupPhaseSucceeded, CachePath: "/cache/a"}}
state = reconciler.reconcileExistingNodeState(context.Background(), logger, nodeObj, intent, warmupv1alpha1.WarmupNodeState{NodeName: "node-a", TaskID: "task-a", Phase: warmupv1alpha1.JobPhaseRunning})
if state.Phase != warmupv1alpha1.JobPhaseSucceeded || state.CachePath != "/cache/a" {
t.Fatalf("existing state not refreshed: %#v", state)
}
reconciler.Agent = &fakeStateAgent{statusErr: &AgentHTTPError{Operation: "get", StatusCode: httpStatusNotFoundForTest}}
state = reconciler.reconcileExistingNodeState(context.Background(), logger, nodeObj, intent, warmupv1alpha1.WarmupNodeState{NodeName: "node-a", TaskID: "task-a", Phase: warmupv1alpha1.JobPhaseRunning, CachePath: "/cache/a"})
if state.TaskID != "" || state.Phase != warmupv1alpha1.JobPhasePending || state.CachePath != "" {
t.Fatalf("not-found state should be cleared for resubmit: %#v", state)
}
}
func TestCollectiveSessionOpenAndCompleteHelpers(t *testing.T) {
t.Parallel()
logger := slog.Default()
nodeObj := nodeFixture("node-a", "10.0.0.1")
collective := sharedtypes.CollectiveSpec{
Mode: sharedtypes.CollectiveModeRing,
SessionID: "session-a",
Ring: &sharedtypes.RingPeerPlan{Rank: 0, WorldSize: 2},
}
execution := sharedtypes.WarmupExecutionPlan{
TaskID: "task-a",
ArtifactKey: "artifact-a",
CollectiveSpec: collective,
EnableChunkCRC32C: true,
}
agent := &fakeStateAgent{openOK: true, openResponse: sharedtypes.OpenCollectiveResponse{TransportPath: sharedtypes.TransportPathTCPFallback}}
reconciler := &Reconciler{Agent: agent}
pending := map[string]pendingExecution{
"node-a": {node: nodeObj, execution: execution},
"node-b": {node: nodeFixture("node-b", "10.0.0.2"), execution: sharedtypes.WarmupExecutionPlan{
TaskID: "task-b",
CollectiveSpec: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
}},
}
intents := []transferplanner.WarmupNodeIntent{
{TargetNode: "node-a", Collective: collective},
{TargetNode: "node-b", Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone}},
{TargetNode: "node-c", Collective: collective},
}
reconciler.openPendingCollectives(context.Background(), logger, intents, pending, 7)
if len(agent.completed) != 1 || !strings.Contains(agent.completed[0].Message, "pre-open cleanup") {
t.Fatalf("expected one pre-open cleanup, got %#v", agent.completed)
}
if len(agent.opened) != 1 || agent.opened[0].TaskID != "task-a" || !agent.opened[0].EnableChunkCRC || agent.opened[0].TimeoutSeconds != 7 {
t.Fatalf("expected one collective open request, got %#v", agent.opened)
}
cachePlan := transferplanner.CacheBuildPlan{NodeIntents: []transferplanner.WarmupNodeIntent{{
TargetNode: "node-a",
TargetPath: "/cache/artifact-a",
Collective: collective,
}}}
states := []warmupv1alpha1.WarmupNodeState{{
NodeName: "node-a",
TaskID: "task-a",
Phase: warmupv1alpha1.JobPhaseSucceeded,
Message: "ok",
}}
reconciler.completeCollectiveSessions(context.Background(), logger, []corev1.Node{nodeObj}, cachePlan, states)
if len(agent.completed) != 2 {
t.Fatalf("expected completion cleanup after pre-open cleanup, got %#v", agent.completed)
}
finalReq := agent.completed[1]
if finalReq.TaskID != "task-a" || finalReq.SessionID != "session-a" || !finalReq.Success || finalReq.StagingPath == "" {
t.Fatalf("unexpected final collective completion request: %#v", finalReq)
}
}
func TestCollectTargetNamesFindNodeAndResetStalePlanState(t *testing.T) {
t.Parallel()
nodes := []corev1.Node{nodeFixture("node-b", "10.0.0.2"), nodeFixture("node-a", "10.0.0.1")}
if got := collectTargetNames(nodes); strings.Join(got, ",") != "node-b,node-a" {
t.Fatalf("unexpected target names: %#v", got)
}
if nodeObj, ok := findNodeByName(nodes, "node-a"); !ok || nodeObj.Name != "node-a" {
t.Fatalf("expected to find node-a")
}
if _, ok := findNodeByName(nodes, "missing"); ok {
t.Fatalf("missing node should not be found")
}
job := &warmupv1alpha1.ModelWarmupJob{
Status: warmupv1alpha1.ModelWarmupJobStatus{
LastPlanID: "old-plan",
NodeStates: []warmupv1alpha1.WarmupNodeState{{
NodeName: "node-a",
TaskID: "old-plan-node-a",
}},
},
}
resetStalePlanState(slog.Default(), job, "new-plan")
if len(job.Status.NodeStates) != 0 {
t.Fatalf("expected stale node states to be cleared: %#v", job.Status.NodeStates)
}
}
func TestReconcileSubmitsPendingWarmupAndUpdatesStatus(t *testing.T) {
t.Parallel()
manifest := logicalManifestFixture()
cachePlan := transferplanner.CacheBuildPlan{
PlanID: "plan-a",
NodeIntents: []transferplanner.WarmupNodeIntent{{
TargetNode: "node-a",
TargetPath: "/cache/artifact-a",
TargetPlan: sharedtypes.TargetTransferPlan{
TargetNode: "node-a",
TransferMode: sharedtypes.TransferModeSingleSourceDirect,
},
Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
}},
}
execution := sharedtypes.WarmupExecutionPlan{
PlanID: "plan-a",
TaskID: "plan-a-node-a",
ArtifactKey: "artifact-a",
TargetNode: "node-a",
TargetFinalPath: "/cache/artifact-a",
LogicalManifest: manifest,
CollectiveSpec: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
}
job := modelWarmupJobFixture()
agent := &fakeStateAgent{
manifest: manifest,
submitHandle: sharedtypes.TaskHandle{TaskID: "plan-a-node-a", NodeName: "node-a"},
}
reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
reconciler.Dispatcher = fakeDispatcher{cachePlan: cachePlan, execution: execution}
reconciler.Agent = agent
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
if err != nil {
t.Fatalf("Reconcile returned error: %v", err)
}
if result.RequeueAfter == 0 {
t.Fatalf("expected running job to requeue")
}
var updated warmupv1alpha1.ModelWarmupJob
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
t.Fatalf("get updated job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseRunning || len(updated.Status.NodeStates) != 1 {
t.Fatalf("expected running node state, got %#v", updated.Status)
}
if updated.Status.NodeStates[0].TaskID != "plan-a-node-a" {
t.Fatalf("expected submitted task id, got %#v", updated.Status.NodeStates[0])
}
}
func TestReconcileMarksMissingNamedTargetFailed(t *testing.T) {
t.Parallel()
job := modelWarmupJobFixture()
job.Spec.Target.NodeNames = []string{"missing-node"}
reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
if err != nil {
t.Fatalf("Reconcile returned error: %v", err)
}
if result.RequeueAfter != 0 {
t.Fatalf("expected terminal failure not to requeue, got %s", result.RequeueAfter)
}
var updated warmupv1alpha1.ModelWarmupJob
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
t.Fatalf("get updated job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed {
t.Fatalf("expected missing target to fail job, got %#v", updated.Status)
}
if updated.Status.LastErrorCode != "NoTargetNodes" {
t.Fatalf("expected NoTargetNodes error code, got %q", updated.Status.LastErrorCode)
}
if !strings.Contains(updated.Status.LastErrorMessage, "missing-node") {
t.Fatalf("expected missing node in error message, got %q", updated.Status.LastErrorMessage)
}
}
func TestReconcileRefreshesExistingTaskToSucceeded(t *testing.T) {
t.Parallel()
manifest := logicalManifestFixture()
cachePlan := transferplanner.CacheBuildPlan{
PlanID: "plan-a",
NodeIntents: []transferplanner.WarmupNodeIntent{{
TargetNode: "node-a",
TargetPath: "/cache/artifact-a",
TargetPlan: sharedtypes.TargetTransferPlan{
TargetNode: "node-a",
TransferMode: sharedtypes.TransferModeSingleSourceDirect,
},
Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
}},
}
job := modelWarmupJobFixture()
job.Status.LastPlanID = "plan-a"
job.Status.NodeStates = []warmupv1alpha1.WarmupNodeState{{
NodeName: "node-a",
TaskID: "plan-a-node-a",
Phase: warmupv1alpha1.JobPhaseRunning,
}}
reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
reconciler.Dispatcher = fakeDispatcher{cachePlan: cachePlan}
reconciler.Agent = &fakeStateAgent{
manifest: manifest,
status: sharedtypes.TaskStatus{
TaskID: "plan-a-node-a",
Phase: sharedtypes.WarmupPhaseSucceeded,
ProgressBytes: 2048,
CachePath: "/cache/artifact-a",
},
}
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
if err != nil {
t.Fatalf("Reconcile returned error: %v", err)
}
if result.RequeueAfter != 0 {
t.Fatalf("expected terminal job not to requeue, got %s", result.RequeueAfter)
}
var updated warmupv1alpha1.ModelWarmupJob
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
t.Fatalf("get updated job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseSucceeded || updated.Status.Summary.Succeeded != 1 {
t.Fatalf("expected succeeded job status, got %#v", updated.Status)
}
}
func TestReconcileValidationAndPlanFailurePaths(t *testing.T) {
t.Parallel()
invalidJob := modelWarmupJobFixture()
invalidJob.Spec.Artifact.Key = ""
reconciler := newObjectReconciler(t, invalidJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
if _, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}}); err == nil {
t.Fatalf("expected validation failure to be returned")
}
var updated warmupv1alpha1.ModelWarmupJob
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
t.Fatalf("get validation failure job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "ValidationFailed" {
t.Fatalf("unexpected validation status: %#v", updated.Status)
}
planJob := modelWarmupJobFixture()
planJob.Name = "job-plan"
manifest := logicalManifestFixture()
reconciler = newObjectReconciler(t, planJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
reconciler.Agent = &fakeStateAgent{manifest: manifest}
reconciler.Dispatcher = fakeDispatcher{cachePlanErr: errors.New("planner unavailable")}
if _, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-plan"}}); err == nil {
t.Fatalf("expected plan build failure to be returned")
}
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-plan"}, &updated); err != nil {
t.Fatalf("get plan failure job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "PlanBuildFailed" {
t.Fatalf("unexpected plan failure status: %#v", updated.Status)
}
}
func TestReconcileSkipsMissingAndObservedTerminalJobs(t *testing.T) {
t.Parallel()
reconciler := newObjectReconciler(t, modelWarmupJobFixture(), []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "missing"}})
if err != nil || result.RequeueAfter != 0 {
t.Fatalf("expected missing job to be ignored, result=%#v err=%v", result, err)
}
doneJob := modelWarmupJobFixture()
doneJob.Name = "job-done"
doneJob.Status.Phase = warmupv1alpha1.JobPhaseSucceeded
doneJob.Status.ObservedGeneration = doneJob.Generation
reconciler = newObjectReconciler(t, doneJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
result, err = reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-done"}})
if err != nil || result.RequeueAfter != 0 {
t.Fatalf("expected observed terminal job to be ignored, result=%#v err=%v", result, err)
}
}
func TestReconcileMarksEmptySelectorResultFailed(t *testing.T) {
t.Parallel()
job := modelWarmupJobFixture()
job.Spec.Target.NodeNames = nil
job.Spec.Target.NodeSelector = map[string]string{"role": "missing"}
reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
if err != nil || result.RequeueAfter != 0 {
t.Fatalf("expected empty selector result to be terminal, result=%#v err=%v", result, err)
}
var updated warmupv1alpha1.ModelWarmupJob
if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
t.Fatalf("get updated job: %v", err)
}
if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "NoTargetNodes" {
t.Fatalf("unexpected empty selector status: %#v", updated.Status)
}
}
func validJobSpec() warmupv1alpha1.ModelWarmupJobSpec {
return warmupv1alpha1.ModelWarmupJobSpec{
Artifact: warmupv1alpha1.ArtifactRefSpec{Key: "artifact-a"},
Sources: []warmupv1alpha1.SourceSpec{{
SourceType: "node",
NodeName: "node-a",
Path: "/models/qwen",
}},
Target: warmupv1alpha1.WarmupTargetSpec{
NodeNames: []string{"node-a"},
TargetPath: "/cache/artifact-a",
},
Policy: warmupv1alpha1.PolicySpec{ChunkSizeMB: 64},
}
}
func modelWarmupJobFixture() *warmupv1alpha1.ModelWarmupJob {
return &warmupv1alpha1.ModelWarmupJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "job-a",
Generation: 1,
},
Spec: validJobSpec(),
}
}
func logicalManifestFixture() sharedtypes.LogicalManifest {
return sharedtypes.LogicalManifest{
ArtifactKey: "artifact-a",
RootPath: "/models/qwen",
ChunkSizeBytes: 64 * 1024 * 1024,
Files: []sharedtypes.ArtifactFile{{
RelativePath: "config.json",
SizeBytes: 128,
Kind: sharedtypes.ArtifactFileKindJSON,
Required: true,
}},
}
}
func newObjectReconciler(t *testing.T, job *warmupv1alpha1.ModelWarmupJob, nodes []corev1.Node) *Reconciler {
t.Helper()
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
t.Fatalf("add core scheme: %v", err)
}
if err := warmupv1alpha1.AddToScheme(scheme); err != nil {
t.Fatalf("add warmup scheme: %v", err)
}
objects := []runtime.Object{job}
for i := range nodes {
objects = append(objects, &nodes[i])
}
client := fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(objects...).
WithStatusSubresource(&warmupv1alpha1.ModelWarmupJob{}).
Build()
return &Reconciler{
Client: client,
Scheme: scheme,
Resolver: node.NewResolver(client),
RequeueAfter: time.Millisecond,
Logger: slog.Default(),
}
}
func newResolverReconciler(t *testing.T, nodes []corev1.Node) *Reconciler {
t.Helper()
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
t.Fatalf("add core scheme: %v", err)
}
objects := make([]runtime.Object, 0, len(nodes))
for i := range nodes {
objects = append(objects, &nodes[i])
}
client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
return &Reconciler{Resolver: node.NewResolver(client)}
}
func nodeFixture(name, ip string) corev1.Node {
return corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: name},
Status: corev1.NodeStatus{Addresses: []corev1.NodeAddress{{
Type: corev1.NodeInternalIP,
Address: ip,
}}},
}
}
func ringIntent(name, prev, next string, rank int32) transferplanner.WarmupNodeIntent {
return transferplanner.WarmupNodeIntent{
TargetNode: name,
TargetPath: "/cache/" + name,
TargetPlan: sharedtypes.TargetTransferPlan{
TransferMode: sharedtypes.TransferModePartialPullAllGather,
SourceSegments: []sharedtypes.SourceSegmentPlan{{
SourceID: "source-a",
ByteRanges: []sharedtypes.ByteRange{{
RelativePath: "weights.bin",
Start: int64(rank) * 10,
End: int64(rank+1) * 10,
}},
}},
},
Collective: sharedtypes.CollectiveSpec{
Mode: sharedtypes.CollectiveModeRing,
Ring: &sharedtypes.RingPeerPlan{
SelfNode: name,
PrevNode: prev,
NextNode: next,
Rank: rank,
WorldSize: 3,
},
},
}
}
const httpStatusNotFoundForTest = 404
type fakeStateAgent struct {
submitHandle sharedtypes.TaskHandle
submitErr error
manifest sharedtypes.LogicalManifest
status sharedtypes.TaskStatus
statusErr error
completed []sharedtypes.CompleteCollectiveRequest
openOK bool
openResponse sharedtypes.OpenCollectiveResponse
openErr error
opened []sharedtypes.OpenCollectiveRequest
}
func (f *fakeStateAgent) SubmitWarmup(context.Context, corev1.Node, sharedtypes.SubmitWarmupRequest) (sharedtypes.TaskHandle, error) {
return f.submitHandle, f.submitErr
}
func (f *fakeStateAgent) GetWarmupTaskStatus(context.Context, corev1.Node, sharedtypes.GetWarmupTaskStatusRequest) (sharedtypes.TaskStatus, error) {
return f.status, f.statusErr
}
func (f *fakeStateAgent) BuildManifest(context.Context, corev1.Node, sharedtypes.BuildManifestRequest) (sharedtypes.BuildManifestResponse, error) {
if len(f.manifest.Files) == 0 {
return sharedtypes.BuildManifestResponse{}, errors.New("not implemented")
}
return sharedtypes.BuildManifestResponse{Manifest: f.manifest}, nil
}
func (f *fakeStateAgent) OpenCollective(_ context.Context, _ corev1.Node, req sharedtypes.OpenCollectiveRequest) (sharedtypes.OpenCollectiveResponse, error) {
if f.openErr != nil {
return sharedtypes.OpenCollectiveResponse{}, f.openErr
}
if f.openOK {
f.opened = append(f.opened, req)
return f.openResponse, nil
}
return sharedtypes.OpenCollectiveResponse{}, errors.New("not implemented")
}
func (*fakeStateAgent) StepCollective(context.Context, corev1.Node, sharedtypes.CollectiveStepRequest) (sharedtypes.CollectiveStepResponse, error) {
return sharedtypes.CollectiveStepResponse{}, errors.New("not implemented")
}
func (f *fakeStateAgent) CompleteCollective(_ context.Context, _ corev1.Node, req sharedtypes.CompleteCollectiveRequest) error {
f.completed = append(f.completed, req)
return nil
}
type fakeDispatcher struct {
cachePlan transferplanner.CacheBuildPlan
cachePlanErr error
execution sharedtypes.WarmupExecutionPlan
executionErr error
}
func (f fakeDispatcher) BuildCachePlan(context.Context, transferplanner.CacheBuildRequest) (transferplanner.CacheBuildPlan, error) {
if f.cachePlanErr != nil {
return transferplanner.CacheBuildPlan{}, f.cachePlanErr
}
return f.cachePlan, nil
}
func (f fakeDispatcher) BuildWarmupExecution(context.Context, transferplanner.BuildWarmupExecutionRequest) (sharedtypes.WarmupExecutionPlan, error) {
if f.executionErr != nil {
return sharedtypes.WarmupExecutionPlan{}, f.executionErr
}
return f.execution, nil
}