/*
 * Copyright (c) 2026 Huawei Technologies Co., Ltd.
 * openFuyao is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package warmupjob

import (
	"context"
	"errors"
	"log/slog"
	"strings"
	"testing"
	"time"

	warmupv1alpha1 "github.com/openfuyao/weight-dispatcher/api/v1alpha1"
	"github.com/openfuyao/weight-dispatcher/pkg/node"
	"github.com/openfuyao/weight-dispatcher/pkg/planning/transferplanner"
	sharedtypes "github.com/openfuyao/weight-dispatcher/pkg/types"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	ktypes "k8s.io/apimachinery/pkg/types"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestValidateJobAcceptsNodeAndHuggingFaceSources(t *testing.T) {
	t.Parallel()

	nodeSpec := validJobSpec()
	if err := validateJob(nodeSpec); err != nil {
		t.Fatalf("expected node source spec to pass: %v", err)
	}

	hfSpec := validJobSpec()
	hfSpec.Sources = []warmupv1alpha1.SourceSpec{{
		SourceType: "external",
		Endpoint:   "https://huggingface.co",
		Path:       "Qwen/Qwen3-8B",
	}}
	if err := validateJob(hfSpec); err != nil {
		t.Fatalf("expected huggingface external source to pass: %v", err)
	}
}

func TestValidateJobRejectsInvalidInputs(t *testing.T) {
	t.Parallel()

	tests := []struct {
		name    string
		mutate  func(*warmupv1alpha1.ModelWarmupJobSpec)
		wantErr string
	}{
		{
			name:    "missing artifact",
			mutate:  func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Artifact.Key = "" },
			wantErr: "spec.artifact.key",
		},
		{
			name:    "missing target path",
			mutate:  func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Target.TargetPath = "" },
			wantErr: "spec.target.targetPath",
		},
		{
			name:    "missing target selector",
			mutate:  func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Target.NodeNames = nil },
			wantErr: "spec.target.nodeNames",
		},
		{
			name:    "node source endpoint explicit",
			mutate:  func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Sources[0].Endpoint = "10.0.0.1" },
			wantErr: "must not set endpoint",
		},
		{
			name:    "duplicate source",
			mutate:  func(spec *warmupv1alpha1.ModelWarmupJobSpec) { spec.Sources = append(spec.Sources, spec.Sources[0]) },
			wantErr: "duplicated source",
		},
		{
			name: "unsupported external only",
			mutate: func(spec *warmupv1alpha1.ModelWarmupJobSpec) {
				spec.Sources = []warmupv1alpha1.SourceSpec{{SourceType: "external", Endpoint: "https://example.invalid", Path: "/model"}}
			},
			wantErr: "phase one requires",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			t.Parallel()
			spec := validJobSpec()
			tt.mutate(&spec)
			err := validateJob(spec)
			if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
				t.Fatalf("expected error containing %q, got %v", tt.wantErr, err)
			}
		})
	}
}

func TestNormalizeSourcesResolvesNodeEndpointAndRejectsDuplicates(t *testing.T) {
	t.Parallel()

	reconciler := newResolverReconciler(t, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	got, err := reconciler.normalizeSources(context.Background(), []warmupv1alpha1.SourceSpec{
		{SourceType: "node", NodeName: "node-a", Path: "/models/qwen"},
		{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
	})
	if err != nil {
		t.Fatalf("normalizeSources returned error: %v", err)
	}
	if got[0].Endpoint != "10.0.0.1" {
		t.Fatalf("expected resolved endpoint, got %#v", got[0])
	}
	if got[1].Endpoint != "https://huggingface.co" {
		t.Fatalf("expected external source unchanged, got %#v", got[1])
	}

	_, err = reconciler.normalizeSources(context.Background(), []warmupv1alpha1.SourceSpec{
		{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
		{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
	})
	if err == nil || !strings.Contains(err.Error(), "duplicated normalized source") {
		t.Fatalf("expected duplicate source error, got %v", err)
	}
}

func TestSelectManifestSourcePrefersNodeThenHuggingFace(t *testing.T) {
	t.Parallel()

	reconciler := newResolverReconciler(t, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	source, nodeObj, err := reconciler.selectManifestSource(context.Background(), []warmupv1alpha1.SourceSpec{
		{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
		{SourceType: "node", NodeName: "node-a", Path: "/models/qwen"},
	})
	if err != nil {
		t.Fatalf("selectManifestSource returned error: %v", err)
	}
	if source.SourceType != "node" || nodeObj.Name != "node-a" {
		t.Fatalf("expected node source first, got source=%#v node=%s", source, nodeObj.Name)
	}

	source, nodeObj, err = reconciler.selectManifestSource(context.Background(), []warmupv1alpha1.SourceSpec{
		{SourceType: "external", Endpoint: "https://huggingface.co", Path: "Qwen/Qwen3-8B"},
	})
	if err != nil {
		t.Fatalf("selectManifestSource returned error for hf source: %v", err)
	}
	if source.SourceType != "external" || nodeObj.Name != "" {
		t.Fatalf("expected external source and empty node, got source=%#v node=%s", source, nodeObj.Name)
	}
}

func TestStateHelpersFilterSummarizeAndAggregatePhases(t *testing.T) {
	t.Parallel()

	states := []warmupv1alpha1.WarmupNodeState{
		{NodeName: "node-a", TaskID: "plan-a-node-a", Phase: warmupv1alpha1.JobPhaseSucceeded},
		{NodeName: "node-b", TaskID: "old-node-b", Phase: warmupv1alpha1.JobPhaseRunning},
		{NodeName: "node-c", Phase: warmupv1alpha1.JobPhasePending},
		{NodeName: "node-d", TaskID: "plan-a-node-d", Phase: string(sharedtypes.WarmupPhaseCanceled)},
	}
	filtered := filterExistingNodeStates(states, "plan-a")
	if _, ok := filtered["node-b"]; ok {
		t.Fatalf("expected stale task to be filtered out: %#v", filtered)
	}
	if _, ok := filtered["node-a"]; !ok {
		t.Fatalf("expected plan-a task to remain: %#v", filtered)
	}

	summary := summarize(states)
	if summary.Total != 4 || summary.Succeeded != 1 || summary.Running != 1 || summary.Pending != 1 || summary.Failed != 1 {
		t.Fatalf("unexpected summary: %#v", summary)
	}
	if phase := aggregatePhase(summary); phase != warmupv1alpha1.JobPhaseRunning {
		t.Fatalf("expected running aggregate phase, got %s", phase)
	}
	if phase := aggregatePhase(warmupv1alpha1.WarmupSummary{Total: 1, Failed: 1}); phase != warmupv1alpha1.JobPhaseFailed {
		t.Fatalf("expected failed aggregate phase, got %s", phase)
	}
	if phase := aggregatePhase(warmupv1alpha1.WarmupSummary{Total: 1, Succeeded: 1}); phase != warmupv1alpha1.JobPhaseSucceeded {
		t.Fatalf("expected succeeded aggregate phase, got %s", phase)
	}
}

func TestApplyTaskStatusCopiesProgressAndTimestamps(t *testing.T) {
	t.Parallel()

	started := time.Date(2026, 5, 1, 1, 2, 3, 0, time.FixedZone("utc+8", 8*60*60))
	finished := started.Add(time.Minute)
	state := warmupv1alpha1.WarmupNodeState{NodeName: "node-a"}

	applyTaskStatus(&state, sharedtypes.TaskStatus{
		TaskID:         "task-a",
		Phase:          sharedtypes.WarmupPhaseSucceeded,
		Message:        "done",
		ProgressBytes:  1024,
		ThroughputMBps: 512,
		TransportPath:  sharedtypes.TransportPathRDMA,
		CachePath:      "/cache/qwen",
		StartedAt:      &started,
		FinishedAt:     &finished,
	})

	if state.TaskID != "task-a" || state.Phase != warmupv1alpha1.JobPhaseSucceeded || state.CachePath != "/cache/qwen" {
		t.Fatalf("task status not copied: %#v", state)
	}
	if state.StartedAt == nil || state.StartedAt.Time.Location() != time.UTC {
		t.Fatalf("expected UTC started time, got %#v", state.StartedAt)
	}
	if state.FinishedAt == nil || state.FinishedAt.Time.Location() != time.UTC {
		t.Fatalf("expected UTC finished time, got %#v", state.FinishedAt)
	}
}

func TestPathAndManifestHelpers(t *testing.T) {
	t.Parallel()

	if !isPathWithin("/models/qwen/snapshots/rev", "/models/qwen") {
		t.Fatalf("expected nested unix path to be within root")
	}
	if isPathWithin("/models/qwen", "/models/qwen") {
		t.Fatalf("same path must not be treated as nested")
	}
	if cleanAgentPath("/models/qwen/../qwen3") != "/models/qwen3" {
		t.Fatalf("unexpected cleanAgentPath result")
	}
	if !usesForwardSlashPath("/models/qwen") || usesForwardSlashPath(`C:\models\qwen`) {
		t.Fatalf("unexpected forward slash path detection")
	}

	manifest := sharedtypes.LogicalManifest{Files: []sharedtypes.ArtifactFile{
		{RelativePath: "a.bin", SizeBytes: 1, Kind: sharedtypes.ArtifactFileKindAuxiliary, Chunkable: true, Required: true},
	}}
	if !manifestLayoutsMatch(manifest, manifest) {
		t.Fatalf("expected identical manifests to match")
	}
	changed := manifest
	changed.Files = append([]sharedtypes.ArtifactFile(nil), manifest.Files...)
	changed.Files[0].SizeBytes = 2
	if manifestLayoutsMatch(manifest, changed) {
		t.Fatalf("expected changed manifest to mismatch")
	}
	if sig := manifestLayoutSignature(manifest); !strings.Contains(sig, "a.bin|1|") {
		t.Fatalf("unexpected manifest signature %q", sig)
	}
	if got := manifestChunkSizeBytes(sharedtypes.LogicalManifest{}, validJobSpec()); got != 64*1024*1024 {
		t.Fatalf("expected policy chunk size bytes, got %d", got)
	}
}

func TestCollectivePeerPlanHelpersPopulateRingEndpoints(t *testing.T) {
	t.Parallel()

	nodeByName := map[string]corev1.Node{
		"node-a": nodeFixture("node-a", "10.0.0.1"),
		"node-b": nodeFixture("node-b", "10.0.0.2"),
		"node-c": nodeFixture("node-c", "10.0.0.3"),
	}
	cachePlan := transferplanner.CacheBuildPlan{NodeIntents: []transferplanner.WarmupNodeIntent{
		ringIntent("node-a", "node-c", "node-b", 0),
		ringIntent("node-b", "node-a", "node-c", 1),
		ringIntent("node-c", "node-b", "node-a", 2),
	}}

	peerPlans, err := buildCollectivePeerPlans(cachePlan.NodeIntents, nodeByName)
	if err != nil {
		t.Fatalf("buildCollectivePeerPlans returned error: %v", err)
	}
	if len(peerPlans) != 3 {
		t.Fatalf("expected 3 peer plans, got %#v", peerPlans)
	}
	if err := applyCollectivePeers(&cachePlan, peerPlans); err != nil {
		t.Fatalf("applyCollectivePeers returned error: %v", err)
	}
	for _, intent := range cachePlan.NodeIntents {
		if len(intent.Collective.Peers) != 3 {
			t.Fatalf("expected every intent to see all peers, got %#v", intent.Collective.Peers)
		}
		if intent.Collective.Ring.SelfEndpoint == "" || intent.Collective.Ring.PrevEndpoint == "" || intent.Collective.Ring.NextEndpoint == "" {
			t.Fatalf("ring endpoints not populated: %#v", intent.Collective.Ring)
		}
	}
}

func TestPendingAndExistingStateHandlers(t *testing.T) {
	t.Parallel()

	logger := slog.Default()
	nodeObj := nodeFixture("node-a", "10.0.0.1")
	intent := ringIntent("node-a", "node-b", "node-b", 0)
	execution := sharedtypes.WarmupExecutionPlan{
		TaskID:         "task-a",
		ArtifactKey:    "artifact-a",
		CollectiveSpec: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeRing, SessionID: "session-a"},
	}

	reconciler := &Reconciler{Agent: &fakeStateAgent{submitHandle: sharedtypes.TaskHandle{TaskID: "task-a"}}}
	state := reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
	if state.TaskID != "task-a" || state.Phase != warmupv1alpha1.JobPhaseRunning || state.StartedAt == nil {
		t.Fatalf("pending state not submitted: %#v", state)
	}

	openErr := errors.New("open failed")
	state = reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution, openErr: openErr}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
	if state.Phase != warmupv1alpha1.JobPhasePending || state.Message != openErr.Error() {
		t.Fatalf("open error not reflected: %#v", state)
	}

	agent := &fakeStateAgent{submitErr: errors.New("submit failed")}
	reconciler.Agent = agent
	state = reconciler.handlePendingWarmupState(context.Background(), logger, nodeObj, intent, pendingExecution{execution: execution}, warmupv1alpha1.WarmupNodeState{NodeName: "node-a"})
	if state.Phase != warmupv1alpha1.JobPhasePending || len(agent.completed) != 1 {
		t.Fatalf("submit failure should keep pending and cleanup collective, state=%#v completed=%#v", state, agent.completed)
	}

	reconciler.Agent = &fakeStateAgent{status: sharedtypes.TaskStatus{TaskID: "task-a", Phase: sharedtypes.WarmupPhaseSucceeded, CachePath: "/cache/a"}}
	state = reconciler.reconcileExistingNodeState(context.Background(), logger, nodeObj, intent, warmupv1alpha1.WarmupNodeState{NodeName: "node-a", TaskID: "task-a", Phase: warmupv1alpha1.JobPhaseRunning})
	if state.Phase != warmupv1alpha1.JobPhaseSucceeded || state.CachePath != "/cache/a" {
		t.Fatalf("existing state not refreshed: %#v", state)
	}

	reconciler.Agent = &fakeStateAgent{statusErr: &AgentHTTPError{Operation: "get", StatusCode: httpStatusNotFoundForTest}}
	state = reconciler.reconcileExistingNodeState(context.Background(), logger, nodeObj, intent, warmupv1alpha1.WarmupNodeState{NodeName: "node-a", TaskID: "task-a", Phase: warmupv1alpha1.JobPhaseRunning, CachePath: "/cache/a"})
	if state.TaskID != "" || state.Phase != warmupv1alpha1.JobPhasePending || state.CachePath != "" {
		t.Fatalf("not-found state should be cleared for resubmit: %#v", state)
	}
}

func TestCollectiveSessionOpenAndCompleteHelpers(t *testing.T) {
	t.Parallel()

	logger := slog.Default()
	nodeObj := nodeFixture("node-a", "10.0.0.1")
	collective := sharedtypes.CollectiveSpec{
		Mode:      sharedtypes.CollectiveModeRing,
		SessionID: "session-a",
		Ring:      &sharedtypes.RingPeerPlan{Rank: 0, WorldSize: 2},
	}
	execution := sharedtypes.WarmupExecutionPlan{
		TaskID:            "task-a",
		ArtifactKey:       "artifact-a",
		CollectiveSpec:    collective,
		EnableChunkCRC32C: true,
	}
	agent := &fakeStateAgent{openOK: true, openResponse: sharedtypes.OpenCollectiveResponse{TransportPath: sharedtypes.TransportPathTCPFallback}}
	reconciler := &Reconciler{Agent: agent}
	pending := map[string]pendingExecution{
		"node-a": {node: nodeObj, execution: execution},
		"node-b": {node: nodeFixture("node-b", "10.0.0.2"), execution: sharedtypes.WarmupExecutionPlan{
			TaskID:         "task-b",
			CollectiveSpec: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
		}},
	}
	intents := []transferplanner.WarmupNodeIntent{
		{TargetNode: "node-a", Collective: collective},
		{TargetNode: "node-b", Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone}},
		{TargetNode: "node-c", Collective: collective},
	}

	reconciler.openPendingCollectives(context.Background(), logger, intents, pending, 7)
	if len(agent.completed) != 1 || !strings.Contains(agent.completed[0].Message, "pre-open cleanup") {
		t.Fatalf("expected one pre-open cleanup, got %#v", agent.completed)
	}
	if len(agent.opened) != 1 || agent.opened[0].TaskID != "task-a" || !agent.opened[0].EnableChunkCRC || agent.opened[0].TimeoutSeconds != 7 {
		t.Fatalf("expected one collective open request, got %#v", agent.opened)
	}

	cachePlan := transferplanner.CacheBuildPlan{NodeIntents: []transferplanner.WarmupNodeIntent{{
		TargetNode: "node-a",
		TargetPath: "/cache/artifact-a",
		Collective: collective,
	}}}
	states := []warmupv1alpha1.WarmupNodeState{{
		NodeName: "node-a",
		TaskID:   "task-a",
		Phase:    warmupv1alpha1.JobPhaseSucceeded,
		Message:  "ok",
	}}
	reconciler.completeCollectiveSessions(context.Background(), logger, []corev1.Node{nodeObj}, cachePlan, states)
	if len(agent.completed) != 2 {
		t.Fatalf("expected completion cleanup after pre-open cleanup, got %#v", agent.completed)
	}
	finalReq := agent.completed[1]
	if finalReq.TaskID != "task-a" || finalReq.SessionID != "session-a" || !finalReq.Success || finalReq.StagingPath == "" {
		t.Fatalf("unexpected final collective completion request: %#v", finalReq)
	}
}

func TestCollectTargetNamesFindNodeAndResetStalePlanState(t *testing.T) {
	t.Parallel()

	nodes := []corev1.Node{nodeFixture("node-b", "10.0.0.2"), nodeFixture("node-a", "10.0.0.1")}
	if got := collectTargetNames(nodes); strings.Join(got, ",") != "node-b,node-a" {
		t.Fatalf("unexpected target names: %#v", got)
	}
	if nodeObj, ok := findNodeByName(nodes, "node-a"); !ok || nodeObj.Name != "node-a" {
		t.Fatalf("expected to find node-a")
	}
	if _, ok := findNodeByName(nodes, "missing"); ok {
		t.Fatalf("missing node should not be found")
	}

	job := &warmupv1alpha1.ModelWarmupJob{
		Status: warmupv1alpha1.ModelWarmupJobStatus{
			LastPlanID: "old-plan",
			NodeStates: []warmupv1alpha1.WarmupNodeState{{
				NodeName: "node-a",
				TaskID:   "old-plan-node-a",
			}},
		},
	}
	resetStalePlanState(slog.Default(), job, "new-plan")
	if len(job.Status.NodeStates) != 0 {
		t.Fatalf("expected stale node states to be cleared: %#v", job.Status.NodeStates)
	}
}

func TestReconcileSubmitsPendingWarmupAndUpdatesStatus(t *testing.T) {
	t.Parallel()

	manifest := logicalManifestFixture()
	cachePlan := transferplanner.CacheBuildPlan{
		PlanID: "plan-a",
		NodeIntents: []transferplanner.WarmupNodeIntent{{
			TargetNode: "node-a",
			TargetPath: "/cache/artifact-a",
			TargetPlan: sharedtypes.TargetTransferPlan{
				TargetNode:   "node-a",
				TransferMode: sharedtypes.TransferModeSingleSourceDirect,
			},
			Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
		}},
	}
	execution := sharedtypes.WarmupExecutionPlan{
		PlanID:          "plan-a",
		TaskID:          "plan-a-node-a",
		ArtifactKey:     "artifact-a",
		TargetNode:      "node-a",
		TargetFinalPath: "/cache/artifact-a",
		LogicalManifest: manifest,
		CollectiveSpec:  sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
	}
	job := modelWarmupJobFixture()
	agent := &fakeStateAgent{
		manifest:     manifest,
		submitHandle: sharedtypes.TaskHandle{TaskID: "plan-a-node-a", NodeName: "node-a"},
	}
	reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	reconciler.Dispatcher = fakeDispatcher{cachePlan: cachePlan, execution: execution}
	reconciler.Agent = agent

	result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
	if err != nil {
		t.Fatalf("Reconcile returned error: %v", err)
	}
	if result.RequeueAfter == 0 {
		t.Fatalf("expected running job to requeue")
	}
	var updated warmupv1alpha1.ModelWarmupJob
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
		t.Fatalf("get updated job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseRunning || len(updated.Status.NodeStates) != 1 {
		t.Fatalf("expected running node state, got %#v", updated.Status)
	}
	if updated.Status.NodeStates[0].TaskID != "plan-a-node-a" {
		t.Fatalf("expected submitted task id, got %#v", updated.Status.NodeStates[0])
	}
}

func TestReconcileMarksMissingNamedTargetFailed(t *testing.T) {
	t.Parallel()

	job := modelWarmupJobFixture()
	job.Spec.Target.NodeNames = []string{"missing-node"}
	reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})

	result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
	if err != nil {
		t.Fatalf("Reconcile returned error: %v", err)
	}
	if result.RequeueAfter != 0 {
		t.Fatalf("expected terminal failure not to requeue, got %s", result.RequeueAfter)
	}

	var updated warmupv1alpha1.ModelWarmupJob
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
		t.Fatalf("get updated job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed {
		t.Fatalf("expected missing target to fail job, got %#v", updated.Status)
	}
	if updated.Status.LastErrorCode != "NoTargetNodes" {
		t.Fatalf("expected NoTargetNodes error code, got %q", updated.Status.LastErrorCode)
	}
	if !strings.Contains(updated.Status.LastErrorMessage, "missing-node") {
		t.Fatalf("expected missing node in error message, got %q", updated.Status.LastErrorMessage)
	}
}

func TestReconcileRefreshesExistingTaskToSucceeded(t *testing.T) {
	t.Parallel()

	manifest := logicalManifestFixture()
	cachePlan := transferplanner.CacheBuildPlan{
		PlanID: "plan-a",
		NodeIntents: []transferplanner.WarmupNodeIntent{{
			TargetNode: "node-a",
			TargetPath: "/cache/artifact-a",
			TargetPlan: sharedtypes.TargetTransferPlan{
				TargetNode:   "node-a",
				TransferMode: sharedtypes.TransferModeSingleSourceDirect,
			},
			Collective: sharedtypes.CollectiveSpec{Mode: sharedtypes.CollectiveModeNone},
		}},
	}
	job := modelWarmupJobFixture()
	job.Status.LastPlanID = "plan-a"
	job.Status.NodeStates = []warmupv1alpha1.WarmupNodeState{{
		NodeName: "node-a",
		TaskID:   "plan-a-node-a",
		Phase:    warmupv1alpha1.JobPhaseRunning,
	}}
	reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	reconciler.Dispatcher = fakeDispatcher{cachePlan: cachePlan}
	reconciler.Agent = &fakeStateAgent{
		manifest: manifest,
		status: sharedtypes.TaskStatus{
			TaskID:        "plan-a-node-a",
			Phase:         sharedtypes.WarmupPhaseSucceeded,
			ProgressBytes: 2048,
			CachePath:     "/cache/artifact-a",
		},
	}

	result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
	if err != nil {
		t.Fatalf("Reconcile returned error: %v", err)
	}
	if result.RequeueAfter != 0 {
		t.Fatalf("expected terminal job not to requeue, got %s", result.RequeueAfter)
	}
	var updated warmupv1alpha1.ModelWarmupJob
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
		t.Fatalf("get updated job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseSucceeded || updated.Status.Summary.Succeeded != 1 {
		t.Fatalf("expected succeeded job status, got %#v", updated.Status)
	}
}

func TestReconcileValidationAndPlanFailurePaths(t *testing.T) {
	t.Parallel()

	invalidJob := modelWarmupJobFixture()
	invalidJob.Spec.Artifact.Key = ""
	reconciler := newObjectReconciler(t, invalidJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	if _, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}}); err == nil {
		t.Fatalf("expected validation failure to be returned")
	}
	var updated warmupv1alpha1.ModelWarmupJob
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
		t.Fatalf("get validation failure job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "ValidationFailed" {
		t.Fatalf("unexpected validation status: %#v", updated.Status)
	}

	planJob := modelWarmupJobFixture()
	planJob.Name = "job-plan"
	manifest := logicalManifestFixture()
	reconciler = newObjectReconciler(t, planJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	reconciler.Agent = &fakeStateAgent{manifest: manifest}
	reconciler.Dispatcher = fakeDispatcher{cachePlanErr: errors.New("planner unavailable")}
	if _, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-plan"}}); err == nil {
		t.Fatalf("expected plan build failure to be returned")
	}
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-plan"}, &updated); err != nil {
		t.Fatalf("get plan failure job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "PlanBuildFailed" {
		t.Fatalf("unexpected plan failure status: %#v", updated.Status)
	}
}

func TestReconcileSkipsMissingAndObservedTerminalJobs(t *testing.T) {
	t.Parallel()

	reconciler := newObjectReconciler(t, modelWarmupJobFixture(), []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "missing"}})
	if err != nil || result.RequeueAfter != 0 {
		t.Fatalf("expected missing job to be ignored, result=%#v err=%v", result, err)
	}

	doneJob := modelWarmupJobFixture()
	doneJob.Name = "job-done"
	doneJob.Status.Phase = warmupv1alpha1.JobPhaseSucceeded
	doneJob.Status.ObservedGeneration = doneJob.Generation
	reconciler = newObjectReconciler(t, doneJob, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	result, err = reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-done"}})
	if err != nil || result.RequeueAfter != 0 {
		t.Fatalf("expected observed terminal job to be ignored, result=%#v err=%v", result, err)
	}
}

func TestReconcileMarksEmptySelectorResultFailed(t *testing.T) {
	t.Parallel()

	job := modelWarmupJobFixture()
	job.Spec.Target.NodeNames = nil
	job.Spec.Target.NodeSelector = map[string]string{"role": "missing"}
	reconciler := newObjectReconciler(t, job, []corev1.Node{nodeFixture("node-a", "10.0.0.1")})
	result, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: ktypes.NamespacedName{Namespace: "default", Name: "job-a"}})
	if err != nil || result.RequeueAfter != 0 {
		t.Fatalf("expected empty selector result to be terminal, result=%#v err=%v", result, err)
	}
	var updated warmupv1alpha1.ModelWarmupJob
	if err := reconciler.Get(context.Background(), ktypes.NamespacedName{Namespace: "default", Name: "job-a"}, &updated); err != nil {
		t.Fatalf("get updated job: %v", err)
	}
	if updated.Status.Phase != warmupv1alpha1.JobPhaseFailed || updated.Status.LastErrorCode != "NoTargetNodes" {
		t.Fatalf("unexpected empty selector status: %#v", updated.Status)
	}
}

func validJobSpec() warmupv1alpha1.ModelWarmupJobSpec {
	return warmupv1alpha1.ModelWarmupJobSpec{
		Artifact: warmupv1alpha1.ArtifactRefSpec{Key: "artifact-a"},
		Sources: []warmupv1alpha1.SourceSpec{{
			SourceType: "node",
			NodeName:   "node-a",
			Path:       "/models/qwen",
		}},
		Target: warmupv1alpha1.WarmupTargetSpec{
			NodeNames:  []string{"node-a"},
			TargetPath: "/cache/artifact-a",
		},
		Policy: warmupv1alpha1.PolicySpec{ChunkSizeMB: 64},
	}
}

func modelWarmupJobFixture() *warmupv1alpha1.ModelWarmupJob {
	return &warmupv1alpha1.ModelWarmupJob{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:  "default",
			Name:       "job-a",
			Generation: 1,
		},
		Spec: validJobSpec(),
	}
}

func logicalManifestFixture() sharedtypes.LogicalManifest {
	return sharedtypes.LogicalManifest{
		ArtifactKey:    "artifact-a",
		RootPath:       "/models/qwen",
		ChunkSizeBytes: 64 * 1024 * 1024,
		Files: []sharedtypes.ArtifactFile{{
			RelativePath: "config.json",
			SizeBytes:    128,
			Kind:         sharedtypes.ArtifactFileKindJSON,
			Required:     true,
		}},
	}
}

func newObjectReconciler(t *testing.T, job *warmupv1alpha1.ModelWarmupJob, nodes []corev1.Node) *Reconciler {
	t.Helper()
	scheme := runtime.NewScheme()
	if err := corev1.AddToScheme(scheme); err != nil {
		t.Fatalf("add core scheme: %v", err)
	}
	if err := warmupv1alpha1.AddToScheme(scheme); err != nil {
		t.Fatalf("add warmup scheme: %v", err)
	}
	objects := []runtime.Object{job}
	for i := range nodes {
		objects = append(objects, &nodes[i])
	}
	client := fake.NewClientBuilder().
		WithScheme(scheme).
		WithRuntimeObjects(objects...).
		WithStatusSubresource(&warmupv1alpha1.ModelWarmupJob{}).
		Build()
	return &Reconciler{
		Client:       client,
		Scheme:       scheme,
		Resolver:     node.NewResolver(client),
		RequeueAfter: time.Millisecond,
		Logger:       slog.Default(),
	}
}

func newResolverReconciler(t *testing.T, nodes []corev1.Node) *Reconciler {
	t.Helper()
	scheme := runtime.NewScheme()
	if err := corev1.AddToScheme(scheme); err != nil {
		t.Fatalf("add core scheme: %v", err)
	}
	objects := make([]runtime.Object, 0, len(nodes))
	for i := range nodes {
		objects = append(objects, &nodes[i])
	}
	client := fake.NewClientBuilder().WithScheme(scheme).WithRuntimeObjects(objects...).Build()
	return &Reconciler{Resolver: node.NewResolver(client)}
}

func nodeFixture(name, ip string) corev1.Node {
	return corev1.Node{
		ObjectMeta: metav1.ObjectMeta{Name: name},
		Status: corev1.NodeStatus{Addresses: []corev1.NodeAddress{{
			Type:    corev1.NodeInternalIP,
			Address: ip,
		}}},
	}
}

func ringIntent(name, prev, next string, rank int32) transferplanner.WarmupNodeIntent {
	return transferplanner.WarmupNodeIntent{
		TargetNode: name,
		TargetPath: "/cache/" + name,
		TargetPlan: sharedtypes.TargetTransferPlan{
			TransferMode: sharedtypes.TransferModePartialPullAllGather,
			SourceSegments: []sharedtypes.SourceSegmentPlan{{
				SourceID: "source-a",
				ByteRanges: []sharedtypes.ByteRange{{
					RelativePath: "weights.bin",
					Start:        int64(rank) * 10,
					End:          int64(rank+1) * 10,
				}},
			}},
		},
		Collective: sharedtypes.CollectiveSpec{
			Mode: sharedtypes.CollectiveModeRing,
			Ring: &sharedtypes.RingPeerPlan{
				SelfNode:  name,
				PrevNode:  prev,
				NextNode:  next,
				Rank:      rank,
				WorldSize: 3,
			},
		},
	}
}

const httpStatusNotFoundForTest = 404

type fakeStateAgent struct {
	submitHandle sharedtypes.TaskHandle
	submitErr    error
	manifest     sharedtypes.LogicalManifest
	status       sharedtypes.TaskStatus
	statusErr    error
	completed    []sharedtypes.CompleteCollectiveRequest
	openOK       bool
	openResponse sharedtypes.OpenCollectiveResponse
	openErr      error
	opened       []sharedtypes.OpenCollectiveRequest
}

func (f *fakeStateAgent) SubmitWarmup(context.Context, corev1.Node, sharedtypes.SubmitWarmupRequest) (sharedtypes.TaskHandle, error) {
	return f.submitHandle, f.submitErr
}

func (f *fakeStateAgent) GetWarmupTaskStatus(context.Context, corev1.Node, sharedtypes.GetWarmupTaskStatusRequest) (sharedtypes.TaskStatus, error) {
	return f.status, f.statusErr
}

func (f *fakeStateAgent) BuildManifest(context.Context, corev1.Node, sharedtypes.BuildManifestRequest) (sharedtypes.BuildManifestResponse, error) {
	if len(f.manifest.Files) == 0 {
		return sharedtypes.BuildManifestResponse{}, errors.New("not implemented")
	}
	return sharedtypes.BuildManifestResponse{Manifest: f.manifest}, nil
}

func (f *fakeStateAgent) OpenCollective(_ context.Context, _ corev1.Node, req sharedtypes.OpenCollectiveRequest) (sharedtypes.OpenCollectiveResponse, error) {
	if f.openErr != nil {
		return sharedtypes.OpenCollectiveResponse{}, f.openErr
	}
	if f.openOK {
		f.opened = append(f.opened, req)
		return f.openResponse, nil
	}
	return sharedtypes.OpenCollectiveResponse{}, errors.New("not implemented")
}

func (*fakeStateAgent) StepCollective(context.Context, corev1.Node, sharedtypes.CollectiveStepRequest) (sharedtypes.CollectiveStepResponse, error) {
	return sharedtypes.CollectiveStepResponse{}, errors.New("not implemented")
}

func (f *fakeStateAgent) CompleteCollective(_ context.Context, _ corev1.Node, req sharedtypes.CompleteCollectiveRequest) error {
	f.completed = append(f.completed, req)
	return nil
}

type fakeDispatcher struct {
	cachePlan    transferplanner.CacheBuildPlan
	cachePlanErr error
	execution    sharedtypes.WarmupExecutionPlan
	executionErr error
}

func (f fakeDispatcher) BuildCachePlan(context.Context, transferplanner.CacheBuildRequest) (transferplanner.CacheBuildPlan, error) {
	if f.cachePlanErr != nil {
		return transferplanner.CacheBuildPlan{}, f.cachePlanErr
	}
	return f.cachePlan, nil
}

func (f fakeDispatcher) BuildWarmupExecution(context.Context, transferplanner.BuildWarmupExecutionRequest) (sharedtypes.WarmupExecutionPlan, error) {
	if f.executionErr != nil {
		return sharedtypes.WarmupExecutionPlan{}, f.executionErr
	}
	return f.execution, nil
}