/*
 * Copyright (c) 2024 Huawei Technologies Co., Ltd.
 * openFuyao is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package lifecycle

import (
	"context"
	"encoding/json"
	"os"
	"path/filepath"
	"strings"
	"testing"
	"time"

	metav1types "k8s.io/apimachinery/pkg/types"
	fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
	fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
	requestcontrol "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol"
	fwkscheduling "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling"
	latencyattr "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/attribute/latency"
	prefixattr "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/attribute/prefix"

	npuattr "hermes-router/pkg/epp/framework/plugins/datalayer/attribute/npu"
	predictioninputattr "hermes-router/pkg/epp/framework/plugins/datalayer/attribute/predictioninput"
	inflightproducer "hermes-router/pkg/epp/framework/plugins/requestcontrol/dataproducer/inflight"
	internalinflight "hermes-router/pkg/epp/internal/inflight"
	"hermes-router/pkg/epp/internal/pdgroup"
	"hermes-router/pkg/epp/internal/predictiondata"
)

const testPrimaryProfileName = "default"

func TestPreRequestTracksSelectedPrimaryProfileEndpoint(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pre-request")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	decode := newTestEndpoint("decode-a", "10.0.20.3")
	fallback := newTestEndpoint("decode-b", "10.0.20.4")
	decode.Put(prefixattr.PrefixCacheMatchInfoKey, prefixattr.NewPrefixCacheMatchInfo(2, 4, 16))
	decode.Put(latencyattr.LatencyPredictionInfoKey, latencyattr.NewLatencyPredictionInfo(true, true, 10, 5, 20, 2))
	request := newTestRequest("req-1", []uint32{1, 2, 3, 4})
	tracker.now = func() time.Time { return time.Unix(100, 0) }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(decode, fallback))

	assertLoad(t, store.Snapshot(decode.GetMetadata()), 4, 1)
	assertLoad(t, store.Snapshot(fallback.GetMetadata()), 0, 0)

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.SelectedEndpoint != "pod://default/decode-a" {
		t.Fatalf("expected selected endpoint metadata, got %q", active.MetaData.SelectedEndpoint)
	}
	if active.MetaData.PrefixCacheScore != 0.5 {
		t.Fatalf("expected prefix cache score 0.5, got %v", active.MetaData.PrefixCacheScore)
	}
	if active.MetaData.Prediction.TTFTMillis != 20 || active.MetaData.Prediction.TPOTMillis != 2 {
		t.Fatalf("unexpected latency prediction metadata: %+v", active.MetaData.Prediction)
	}
}

func TestPreRequestTracksSelectedPrefillAndDecodePods(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-pre-request")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-pd", "10.0.22.1")
	prefill := newTestEndpoint("prefill-pd", "10.0.22.2")
	decode := newTestEndpoint("decode-pd", "10.0.22.3")
	setSelectedPDRoute(leader, prefill, decode)
	request := newTestRequest("req-pd-pre", []uint32{1, 2, 3, 4})
	tracker.now = func() time.Time { return time.Unix(120, 0) }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	assertLoad(t, store.Snapshot(leader.GetMetadata()), 0, 0)
	assertLoad(t, store.Snapshot(prefill.GetMetadata()), 4, 1)
	assertLoad(t, store.Snapshot(decode.GetMetadata()), 4, 1)

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.SelectedEndpoint != "pod://default/leader-pd" {
		t.Fatalf("expected primary selected endpoint metadata to remain leader, got %q", active.MetaData.SelectedEndpoint)
	}
}

func TestPreRequestCapturesAggregateSelectedRouteSnapshot(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-aggregate-selected-route")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("aggregate-route", "10.0.28.1")
	seedEndpointMetrics(leader, 5, 4, 67.5, time.Unix(95, 0))
	seedEndpointNPUState(leader, time.Unix(96, 0), "node-a", "0")
	store.StartRequest(internalinflight.RequestStart{
		RequestID:        "req-existing-aggregate",
		SelectedEndpoint: leader.GetMetadata(),
		InputTokenLength: 6,
		StartedAt:        time.Unix(90, 0),
	})
	request := newTestRequest("req-aggregate-selected-route", []uint32{1, 2, 3, 4})
	tracker.now = func() time.Time { return time.Unix(100, 0) }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.SelectedRoute == nil {
		t.Fatal("expected selected route to be stored")
	}
	if active.MetaData.SelectedRoute.Leader == nil {
		t.Fatal("expected leader snapshot to be stored")
	}
	if active.MetaData.SelectedRoute.Leader.ID != "pod://default/aggregate-route" {
		t.Fatalf("expected aggregate leader id, got %q", active.MetaData.SelectedRoute.Leader.ID)
	}
	if active.MetaData.SelectedRoute.Prefill != nil || active.MetaData.SelectedRoute.Decode != nil {
		t.Fatalf("expected aggregate route to omit prefill/decode snapshots, got %+v", active.MetaData.SelectedRoute)
	}
	if active.MetaData.SelectedRoute.Leader.Metrics == nil {
		t.Fatal("expected aggregate leader metrics snapshot")
	}
	if active.MetaData.SelectedRoute.Leader.Metrics.WaitingQueueSize != 5 {
		t.Fatalf("expected waiting queue size 5, got %d", active.MetaData.SelectedRoute.Leader.Metrics.WaitingQueueSize)
	}
	if active.MetaData.SelectedRoute.Leader.Metrics.KVCacheUsagePercent != 67.5 {
		t.Fatalf("expected kv cache usage 67.5, got %v", active.MetaData.SelectedRoute.Leader.Metrics.KVCacheUsagePercent)
	}
	if active.MetaData.SelectedRoute.Leader.Inflight == nil {
		t.Fatal("expected aggregate leader inflight snapshot")
	}
	if active.MetaData.SelectedRoute.Leader.Inflight.Tokens != 6 || active.MetaData.SelectedRoute.Leader.Inflight.Requests != 1 {
		t.Fatalf("expected pre-dispatch inflight snapshot, got %+v", active.MetaData.SelectedRoute.Leader.Inflight)
	}
	if active.MetaData.SelectedRoute.Leader.NPU == nil {
		t.Fatal("expected aggregate leader npu snapshot")
	}
	if active.MetaData.SelectedRoute.Leader.NPU.DeviceCount != 1 {
		t.Fatalf("expected one npu device, got %+v", active.MetaData.SelectedRoute.Leader.NPU)
	}
	if len(active.MetaData.SelectedRoute.Leader.NPU.Devices) != 1 || active.MetaData.SelectedRoute.Leader.NPU.Devices[0].DeviceID != "0" {
		t.Fatalf("expected npu device 0 in snapshot, got %+v", active.MetaData.SelectedRoute.Leader.NPU.Devices)
	}
}

func TestPreRequestCapturesPDRoleSnapshotsAndStoresLeaderAsIdentityOnly(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-selected-route")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-selected-route", "10.0.29.1")
	prefill := newTestEndpoint("prefill-selected-route", "10.0.29.2")
	decode := newTestEndpoint("decode-selected-route", "10.0.29.3")
	seedEndpointMetrics(leader, 9, 8, 77.7, time.Unix(95, 0))
	seedEndpointNPUState(leader, time.Unix(96, 0), "node-leader", "9")
	seedEndpointMetrics(prefill, 1, 2, 21.5, time.Unix(97, 0))
	seedEndpointNPUState(prefill, time.Unix(98, 0), "node-prefill", "1")
	seedEndpointMetrics(decode, 3, 4, 42.5, time.Unix(99, 0))
	seedEndpointNPUState(decode, time.Unix(100, 0), "node-decode", "2")
	setSelectedPDRoute(leader, prefill, decode)
	store.StartRequest(internalinflight.RequestStart{
		RequestID:        "req-existing-pd",
		SelectedEndpoint: leader.GetMetadata(),
		TrackedEndpoints: []*fwkdl.EndpointMetadata{prefill.GetMetadata(), decode.GetMetadata()},
		InputTokenLength: 5,
		StartedAt:        time.Unix(90, 0),
	})
	request := newTestRequest("req-pd-selected-route", []uint32{1, 2, 3, 4})
	tracker.now = func() time.Time { return time.Unix(110, 0) }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.SelectedRoute == nil {
		t.Fatal("expected selected route to be stored")
	}
	if active.MetaData.SelectedRoute.Leader == nil {
		t.Fatal("expected leader identity to be stored")
	}
	if active.MetaData.SelectedRoute.Leader.ID != "pod://default/leader-selected-route" {
		t.Fatalf("expected leader id, got %q", active.MetaData.SelectedRoute.Leader.ID)
	}
	if active.MetaData.SelectedRoute.Leader.Metrics != nil || active.MetaData.SelectedRoute.Leader.Inflight != nil || active.MetaData.SelectedRoute.Leader.NPU != nil {
		t.Fatalf("expected pd leader snapshot to be identity only, got %+v", active.MetaData.SelectedRoute.Leader)
	}
	if active.MetaData.SelectedRoute.Prefill == nil || active.MetaData.SelectedRoute.Decode == nil {
		t.Fatalf("expected pd route snapshots for prefill/decode, got %+v", active.MetaData.SelectedRoute)
	}
	if active.MetaData.SelectedRoute.Prefill.ID != "pod://default/prefill-selected-route" {
		t.Fatalf("expected prefill id, got %q", active.MetaData.SelectedRoute.Prefill.ID)
	}
	if active.MetaData.SelectedRoute.Decode.ID != "pod://default/decode-selected-route" {
		t.Fatalf("expected decode id, got %q", active.MetaData.SelectedRoute.Decode.ID)
	}
	if active.MetaData.SelectedRoute.Prefill.Metrics == nil || active.MetaData.SelectedRoute.Prefill.Metrics.WaitingQueueSize != 1 {
		t.Fatalf("expected prefill metrics snapshot, got %+v", active.MetaData.SelectedRoute.Prefill.Metrics)
	}
	if active.MetaData.SelectedRoute.Decode.Metrics == nil || active.MetaData.SelectedRoute.Decode.Metrics.RunningRequestsSize != 4 {
		t.Fatalf("expected decode metrics snapshot, got %+v", active.MetaData.SelectedRoute.Decode.Metrics)
	}
	if active.MetaData.SelectedRoute.Prefill.Inflight == nil || active.MetaData.SelectedRoute.Prefill.Inflight.Tokens != 5 || active.MetaData.SelectedRoute.Prefill.Inflight.Requests != 1 {
		t.Fatalf("expected prefill pre-dispatch inflight snapshot, got %+v", active.MetaData.SelectedRoute.Prefill.Inflight)
	}
	if active.MetaData.SelectedRoute.Decode.Inflight == nil || active.MetaData.SelectedRoute.Decode.Inflight.Tokens != 5 || active.MetaData.SelectedRoute.Decode.Inflight.Requests != 1 {
		t.Fatalf("expected decode pre-dispatch inflight snapshot, got %+v", active.MetaData.SelectedRoute.Decode.Inflight)
	}
	if active.MetaData.SelectedRoute.Prefill.NPU == nil || active.MetaData.SelectedRoute.Prefill.NPU.DeviceCount != 1 {
		t.Fatalf("expected prefill npu snapshot, got %+v", active.MetaData.SelectedRoute.Prefill.NPU)
	}
	if active.MetaData.SelectedRoute.Decode.NPU == nil || active.MetaData.SelectedRoute.Decode.NPU.DeviceCount != 1 {
		t.Fatalf("expected decode npu snapshot, got %+v", active.MetaData.SelectedRoute.Decode.NPU)
	}
}

func TestPreRequestPersistsAggregatePredictionInputAtDispatchTime(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-aggregate-prediction-input")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("aggregate-prediction-input", "10.0.30.1")
	dispatchInput := predictiondata.Input{InputID: 0, PodType: predictiondata.PodTypeAggregate, InputTokenLength: 4, ServerIP: "10.0.30.1"}
	leader.Put(predictioninputattr.PredictionInputKey, &predictioninputattr.InputSnapshot{Input: dispatchInput})
	request := newTestRequest("req-aggregate-prediction-input", []uint32{1, 2, 3, 4})
	now := time.Unix(100, 0)
	tracker.now = func() time.Time { return now }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))
	leader.Put(predictioninputattr.PredictionInputKey, &predictioninputattr.InputSnapshot{Input: predictiondata.Input{InputID: 99, PodType: predictiondata.PodTypeAggregate, InputTokenLength: 99, ServerIP: "drifted"}})
	now = time.Unix(101, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{EndOfStream: true, Usage: requestcontrol.Usage{CompletionTokens: 2}}, leader.GetMetadata())

	completed := store.DrainCompleted(10)
	if len(completed) != 1 {
		t.Fatalf("expected one completed record, got %d", len(completed))
	}
	if completed[0].PredictionInputs[predictiondata.RoleLeader] != dispatchInput {
		t.Fatalf("expected dispatch-time prediction input, got %+v", completed[0].PredictionInputs)
	}
}

func TestPreRequestPersistsPDPredictionInputs(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-prediction-input")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-prediction-input", "10.0.31.1")
	prefill := newTestEndpoint("prefill-prediction-input", "10.0.31.2")
	decode := newTestEndpoint("decode-prediction-input", "10.0.31.3")
	prefillInput := predictiondata.Input{InputID: 1, PodType: predictiondata.PodTypePrefill, InputTokenLength: 4, ServerIP: "10.0.31.2"}
	decodeInput := predictiondata.Input{InputID: 2, PodType: predictiondata.PodTypeDecode, InputTokenLength: 4, ServerIP: "10.0.31.3"}
	prefill.Put(predictioninputattr.PredictionInputKey, &predictioninputattr.InputSnapshot{Input: prefillInput})
	decode.Put(predictioninputattr.PredictionInputKey, &predictioninputattr.InputSnapshot{Input: decodeInput})
	setSelectedPDRoute(leader, prefill, decode)
	request := newTestRequest("req-pd-prediction-input", []uint32{1, 2, 3, 4})
	tracker.now = func() time.Time { return time.Unix(100, 0) }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.PredictionInputs[predictiondata.RolePrefill] != prefillInput {
		t.Fatalf("expected prefill prediction input, got %+v", active.MetaData.PredictionInputs)
	}
	if active.MetaData.PredictionInputs[predictiondata.RoleDecode] != decodeInput {
		t.Fatalf("expected decode prediction input, got %+v", active.MetaData.PredictionInputs)
	}
}

func TestPreRequestPersistsSelectedMissingFeatureReasons(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-prediction-missing-reasons")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-missing-reason", "10.0.32.1")
	leader.Put(predictioninputattr.MissingFeatureReasonKey, &predictioninputattr.MissingFeatureReason{Reason: "missing npu state"})
	request := newTestRequest("req-missing-reason", []uint32{1, 2})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.MissingFeatureReasons[predictiondata.RoleLeader] != "missing npu state" {
		t.Fatalf("expected leader missing feature reason, got %+v", active.MetaData.MissingFeatureReasons)
	}
}

func TestPreRequestRecordsPDAwareScoresAndPredictions(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-scores")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-score", "10.0.27.1")
	prefill := newTestEndpoint("prefill-score", "10.0.27.2")
	decode := newTestEndpoint("decode-score", "10.0.27.3")
	prefill.Put(prefixattr.PrefixCacheMatchInfoKey, prefixattr.NewPrefixCacheMatchInfo(3, 4, 16))
	prefill.Put(latencyattr.LatencyPredictionInfoKey, latencyattr.NewLatencyPredictionInfo(true, true, 0, 0, 30, 0))
	decode.Put(latencyattr.LatencyPredictionInfoKey, latencyattr.NewLatencyPredictionInfo(true, true, 0, 0, 0, 5))
	setSelectedPDRoute(leader, prefill, decode)
	request := newTestRequest("req-pd-score", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.PrefixCacheScore != 0.75 {
		t.Fatalf("expected prefix cache score 0.75 from prefill, got %v", active.MetaData.PrefixCacheScore)
	}
	if active.MetaData.Prediction.TTFTMillis != 30 {
		t.Fatalf("expected TTFT 30ms from prefill, got %v", active.MetaData.Prediction.TTFTMillis)
	}
	if active.MetaData.Prediction.TPOTMillis != 5 {
		t.Fatalf("expected TPOT 5ms from decode, got %v", active.MetaData.Prediction.TPOTMillis)
	}
}

func TestPreRequestFailOpenWhenPDRouteLeaderIsNil(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-nil-leader")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	request := newTestRequest("req-pd-nil-leader", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(nil))

	if active := store.ActiveRequest(request.RequestId); active != nil {
		t.Fatalf("expected nil active request for nil-leader, got %+v", active)
	}
	if got := len(store.DrainCompleted(10)); got != 0 {
		t.Fatalf("expected no completed records for nil-leader, got %d", got)
	}
}

func TestPreRequestFailOpenWhenPDRouteMissingPrefill(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-missing-prefill")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-pd", "10.0.25.1")
	decode := newTestEndpoint("decode-pd", "10.0.25.3")
	pdgroup.Set(leader, &pdgroup.Info{
		GroupID:    "group-a",
		DecodePods: []fwkscheduling.ScoredEndpoint{{Endpoint: decode, Score: 0}},
	})
	request := newTestRequest("req-pd-missing-prefill", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request with incomplete Info to still be tracked against leader")
	}
	assertLoad(t, store.Snapshot(leader.GetMetadata()), 4, 1)
	assertLoad(t, store.Snapshot(decode.GetMetadata()), 0, 0)
}

func TestPreRequestFailOpenWhenPDRouteMissingDecode(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-missing-decode")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-pd", "10.0.26.1")
	prefill := newTestEndpoint("prefill-pd", "10.0.26.2")
	pdgroup.Set(leader, &pdgroup.Info{
		GroupID:     "group-a",
		PrefillPods: []fwkscheduling.ScoredEndpoint{{Endpoint: prefill, Score: 0}},
	})
	request := newTestRequest("req-pd-missing-decode", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request with incomplete Info to still be tracked against leader")
	}
	assertLoad(t, store.Snapshot(leader.GetMetadata()), 4, 1)
	assertLoad(t, store.Snapshot(prefill.GetMetadata()), 0, 0)
}

func TestResponseBodyMarksFirstChunkAndReleasesTrackedStateOnEndOfStream(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-release")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	decode := newTestEndpoint("decode-c", "10.0.21.3")
	request := newTestRequest("req-2", []uint32{7, 8, 9})
	now := time.Unix(200, 0)
	tracker.now = func() time.Time { return now }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(decode))
	now = time.Unix(201, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{EndOfStream: false}, decode.GetMetadata())
	assertLoad(t, store.Snapshot(decode.GetMetadata()), 3, 1)

	now = time.Unix(206, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{
		EndOfStream: true,
		Usage:       requestcontrol.Usage{CompletionTokens: 12},
	}, decode.GetMetadata())

	assertLoad(t, store.Snapshot(decode.GetMetadata()), 0, 0)

	completed := store.DrainCompleted(10)
	if len(completed) != 1 {
		t.Fatalf("expected one completed record, got %d", len(completed))
	}
	if completed[0].TTFTMillis != 1000 {
		t.Fatalf("expected TTFTMillis=1000, got %v", completed[0].TTFTMillis)
	}
	if completed[0].TotalLatencyMillis != 6000 {
		t.Fatalf("expected TotalLatencyMillis=6000, got %v", completed[0].TotalLatencyMillis)
	}
	if completed[0].SelectedEndpoint != "pod://default/decode-c" {
		t.Fatalf("expected completed selected endpoint to match response target, got %q", completed[0].SelectedEndpoint)
	}
	if completed[0].OutputLength != 12 {
		t.Fatalf("expected OutputLength=12 from response usage, got %d", completed[0].OutputLength)
	}
}

func TestResponseBodyReleasesSelectedPrefillAndDecodePodsOnEndOfStream(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-pd-release")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	leader := newTestEndpoint("leader-release", "10.0.23.1")
	prefill := newTestEndpoint("prefill-release", "10.0.23.2")
	decode := newTestEndpoint("decode-release", "10.0.23.3")
	setSelectedPDRoute(leader, prefill, decode)
	request := newTestRequest("req-pd-release", []uint32{7, 8, 9})
	now := time.Unix(220, 0)
	tracker.now = func() time.Time { return now }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(leader))
	now = time.Unix(221, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{EndOfStream: false}, decode.GetMetadata())
	assertLoad(t, store.Snapshot(prefill.GetMetadata()), 3, 1)
	assertLoad(t, store.Snapshot(decode.GetMetadata()), 3, 1)

	now = time.Unix(226, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{
		EndOfStream: true,
		Usage:       requestcontrol.Usage{CompletionTokens: 12},
	}, decode.GetMetadata())

	assertLoad(t, store.Snapshot(leader.GetMetadata()), 0, 0)
	assertLoad(t, store.Snapshot(prefill.GetMetadata()), 0, 0)
	assertLoad(t, store.Snapshot(decode.GetMetadata()), 0, 0)

	completed := store.DrainCompleted(10)
	if len(completed) != 1 {
		t.Fatalf("expected one completed record, got %d", len(completed))
	}
	if completed[0].SelectedEndpoint != "pod://default/leader-release" {
		t.Fatalf("expected completed metadata to preserve primary selected endpoint, got %q", completed[0].SelectedEndpoint)
	}
	if completed[0].TTFTMillis != 1000 {
		t.Fatalf("expected TTFTMillis=1000, got %v", completed[0].TTFTMillis)
	}
	if completed[0].TotalLatencyMillis != 6000 {
		t.Fatalf("expected TotalLatencyMillis=6000, got %v", completed[0].TotalLatencyMillis)
	}
	if completed[0].OutputLength != 12 {
		t.Fatalf("expected OutputLength=12 from response usage, got %d", completed[0].OutputLength)
	}
}

func TestFactoryFlushesCompletedRecordsOnHandleContextDone(t *testing.T) {
	outputDir := t.TempDir()
	outputPath := filepath.Join(outputDir, "shutdown.jsonl")
	ctx, cancel := context.WithCancel(context.Background())
	handle := fwkplugin.NewEppHandle(ctx, func() []metav1types.NamespacedName { return nil })
	params, err := json.Marshal(map[string]any{
		"storeName": "lifecycle-shutdown",
		"persistence": map[string]any{
			"enabled":        true,
			"flushThreshold": 10,
			"outputPath":     outputPath,
		},
	})
	if err != nil {
		t.Fatalf("expected test parameters to marshal: %v", err)
	}

	plugin, err := Factory("tracker", json.RawMessage(params), handle)
	if err != nil {
		t.Fatalf("expected factory to succeed, got %v", err)
	}
	tracker, ok := plugin.(*RequestLifecycleTracker)
	if !ok {
		t.Fatalf("expected lifecycle tracker plugin, got %T", plugin)
	}
	store := internalinflight.StoreForName("lifecycle-shutdown")
	t.Cleanup(func() { _ = store.Close() })
	decode := newTestEndpoint("decode-shutdown", "10.0.21.9")
	request := newTestRequest("req-shutdown", []uint32{1, 2, 3})
	now := time.Unix(300, 0)
	tracker.now = func() time.Time { return now }

	tracker.PreRequest(context.Background(), request, newSchedulingResult(decode))
	now = time.Unix(301, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{EndOfStream: true}, decode.GetMetadata())

	cancel()
	waitForCondition(t, func() bool {
		content, readErr := os.ReadFile(outputPath)
		return readErr == nil && len(splitNonEmptyLines(string(content))) == 1
	}, "expected shutdown to flush completed records to disk")
	var reloaded *internalinflight.Store
	waitForCondition(t, func() bool {
		reloaded = internalinflight.StoreForName("lifecycle-shutdown")
		return reloaded != store
	}, "expected shutdown close to remove the shared store from registry")
	t.Cleanup(func() { _ = reloaded.Close() })
}

func TestParseParametersAcceptsFlushIntervalDuration(t *testing.T) {
	raw, err := json.Marshal(map[string]any{
		"storeName": "lifecycle-flush-interval",
		"persistence": map[string]any{
			"enabled":        true,
			"flushThreshold": 10,
			"flushInterval":  "1m",
			"outputPath":     "/tmp/lifecycle-flush-interval.jsonl",
		},
	})
	if err != nil {
		t.Fatalf("expected test parameters to marshal: %v", err)
	}

	params, err := parseParameters(json.RawMessage(raw))
	if err != nil {
		t.Fatalf("expected parameters to parse, got %v", err)
	}
	if params.Persistence.FlushInterval != time.Minute {
		t.Fatalf("expected flush interval 1m, got %v", params.Persistence.FlushInterval)
	}
}

func TestParseParametersRejectsInvalidFlushInterval(t *testing.T) {
	raw, err := json.Marshal(map[string]any{
		"persistence": map[string]any{
			"enabled":       true,
			"flushInterval": "later",
			"outputPath":    "/tmp/lifecycle-flush-interval.jsonl",
		},
	})
	if err != nil {
		t.Fatalf("expected test parameters to marshal: %v", err)
	}

	_, err = parseParameters(json.RawMessage(raw))
	if err == nil {
		t.Fatal("expected invalid flush interval to be rejected")
	}
	if !strings.Contains(err.Error(), "persistence.flushInterval") {
		t.Fatalf("expected flush interval validation error, got %v", err)
	}
}

func TestPreRequestFailOpenOnNilOrIncompleteInputs(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-prerequest-fail-open")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	ctx := context.Background()

	tracker.PreRequest(ctx, nil, nil)
	tracker.PreRequest(ctx, &fwkscheduling.LLMRequest{}, nil)
	tracker.PreRequest(ctx, newTestRequest("req-missing-result", []uint32{1}), nil)
	tracker.PreRequest(ctx, newTestRequest("req-no-target", []uint32{1}), newSchedulingResult())
	tracker.PreRequest(ctx, newTestRequest("req-nil-target", []uint32{1}), newSchedulingResult(nil))

	if got := len(store.DrainCompleted(10)); got != 0 {
		t.Fatalf("expected no completed records for fail-open prerequest paths, got %d", got)
	}
	if active := store.ActiveRequest("req-missing-result"); active != nil {
		t.Fatalf("expected nil active request for fail-open path, got %+v", active)
	}
	if active := store.ActiveRequest("req-no-target"); active != nil {
		t.Fatalf("expected nil active request for missing target path, got %+v", active)
	}
	if active := store.ActiveRequest("req-nil-target"); active != nil {
		t.Fatalf("expected nil active request for nil target path, got %+v", active)
	}
	if got := store.Snapshot(newTestEndpoint("unused", "10.0.99.1").GetMetadata()); got.Tokens != 0 || got.Requests != 0 {
		t.Fatalf("expected fail-open prerequest paths to leave store empty, got %+v", got)
	}
}

func TestResponseBodyFailOpenOnNilOrIncompleteInputs(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-response-fail-open")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	endpoint := newTestEndpoint("decode-fail-open", "10.0.21.10")

	tracker.ResponseBody(context.Background(), nil, &requestcontrol.Response{EndOfStream: true}, endpoint.GetMetadata())
	tracker.ResponseBody(context.Background(), &fwkscheduling.LLMRequest{}, &requestcontrol.Response{EndOfStream: true}, endpoint.GetMetadata())
	tracker.ResponseBody(context.Background(), newTestRequest("req-nil-response", []uint32{1}), nil, endpoint.GetMetadata())

	if got := store.Snapshot(endpoint.GetMetadata()); got.Tokens != 0 || got.Requests != 0 {
		t.Fatalf("expected fail-open response paths to leave endpoint load unchanged, got %+v", got)
	}
	if completed := store.DrainCompleted(10); len(completed) != 0 {
		t.Fatalf("expected fail-open response paths to avoid completion records, got %d", len(completed))
	}
}

func TestInflightProducerAndLifecycleShareLoadLifecycle(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-integration")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	producer := inflightproducer.NewLoadProducer("producer", store, internalinflight.DefaultAttributeKey)
	endpoint := newTestEndpoint("decode-integration", "10.0.21.11")
	request := newTestRequest("req-integration", []uint32{1, 2, 3, 4})
	now := time.Unix(500, 0)
	tracker.now = func() time.Time { return now }

	if err := producer.PrepareRequestData(context.Background(), request, []fwkscheduling.Endpoint{endpoint}); err != nil {
		t.Fatalf("expected initial prepare to succeed, got %v", err)
	}
	assertEndpointAttributeLoad(t, endpoint, 0, 0)

	tracker.PreRequest(context.Background(), request, newSchedulingResult(endpoint))
	if err := producer.PrepareRequestData(context.Background(), request, []fwkscheduling.Endpoint{endpoint}); err != nil {
		t.Fatalf("expected prepare after prerequest to succeed, got %v", err)
	}
	assertEndpointAttributeLoad(t, endpoint, 4, 1)

	now = time.Unix(501, 0)
	tracker.ResponseBody(context.Background(), request, &requestcontrol.Response{EndOfStream: true}, endpoint.GetMetadata())
	if err := producer.PrepareRequestData(context.Background(), request, []fwkscheduling.Endpoint{endpoint}); err != nil {
		t.Fatalf("expected prepare after completion to succeed, got %v", err)
	}
	assertEndpointAttributeLoad(t, endpoint, 0, 0)
}

func TestPreRequestRecordsActiveModeWhenOnlyActiveKeyPresent(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-mode-active-only")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	endpoint := newTestEndpoint("only-active", "10.0.40.1")
	endpoint.Put(latencyattr.LatencyPredictionInfoKey, latencyattr.NewLatencyPredictionInfo(true, true, 0, 0, 25, 3))
	request := newTestRequest("req-active-only", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(endpoint))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.PredictionMode != internalinflight.PredictionModeActive {
		t.Fatalf("expected mode %q, got %q", internalinflight.PredictionModeActive, active.MetaData.PredictionMode)
	}
	if active.MetaData.Prediction.TTFTMillis != 25 || active.MetaData.Prediction.TPOTMillis != 3 {
		t.Fatalf("unexpected active prediction: %+v", active.MetaData.Prediction)
	}
}

func TestPreRequestRecordsEmptyModeWhenNeitherKeyPresent(t *testing.T) {
	store := internalinflight.StoreForName("lifecycle-mode-off")
	t.Cleanup(func() { _ = store.Close() })
	tracker := NewRequestLifecycleTracker("tracker", store)
	endpoint := newTestEndpoint("no-prediction", "10.0.40.4")
	request := newTestRequest("req-off", []uint32{1, 2, 3, 4})

	tracker.PreRequest(context.Background(), request, newSchedulingResult(endpoint))

	active := store.ActiveRequest(request.RequestId)
	if active == nil {
		t.Fatal("expected active request metadata")
	}
	if active.MetaData.PredictionMode != "" {
		t.Fatalf("expected empty mode for data-collection-only path, got %q", active.MetaData.PredictionMode)
	}
	if active.MetaData.Prediction != (internalinflight.LatencyPrediction{}) {
		t.Fatalf("expected zero active prediction, got %+v", active.MetaData.Prediction)
	}
}

func newTestRequest(requestID string, tokenIDs []uint32) *fwkscheduling.LLMRequest {
	return &fwkscheduling.LLMRequest{
		RequestId: requestID,
		Body: &fwkscheduling.LLMRequestBody{
			Completions: &fwkscheduling.CompletionsRequest{Prompt: fwkscheduling.Prompt{Raw: "hello world"}},
		},
		TokenizedPrompt: &fwkscheduling.TokenizedPrompt{TokenIDs: tokenIDs},
	}
}

func newSchedulingResult(targets ...fwkscheduling.Endpoint) *fwkscheduling.SchedulingResult {
	return &fwkscheduling.SchedulingResult{
		PrimaryProfileName: testPrimaryProfileName,
		ProfileResults: map[string]*fwkscheduling.ProfileRunResult{
			testPrimaryProfileName: {TargetEndpoints: targets},
		},
	}
}

func newTestEndpoint(name, address string) fwkscheduling.Endpoint {
	return fwkscheduling.NewEndpoint(
		&fwkdl.EndpointMetadata{
			NamespacedName: metav1types.NamespacedName{Namespace: "default", Name: name},
			PodName:        name,
			Address:        address,
			Port:           "8000",
		},
		fwkdl.NewMetrics(),
		fwkdl.NewAttributes(),
	)
}

func setSelectedPDRoute(leader, prefill, decode fwkscheduling.Endpoint) {
	prefillSelection := fwkscheduling.ScoredEndpoint{Endpoint: prefill, Score: 0}
	decodeSelection := fwkscheduling.ScoredEndpoint{Endpoint: decode, Score: 0}
	pdgroup.Set(leader, &pdgroup.Info{
		GroupID:            "test-group",
		PrefillPods:        []fwkscheduling.ScoredEndpoint{prefillSelection},
		DecodePods:         []fwkscheduling.ScoredEndpoint{decodeSelection},
		SelectedPrefillPod: &prefillSelection,
		SelectedDecodePod:  &decodeSelection,
	})
}

func seedEndpointMetrics(endpoint fwkscheduling.Endpoint, waiting, running int, kvUsagePercent float64, updateTime time.Time) {
	metrics := endpoint.GetMetrics()
	metrics.ActiveModels = map[string]int{"Qwen/Qwen3-8B": 1}
	metrics.WaitingModels = map[string]int{"Qwen/Qwen3-8B": waiting}
	metrics.MaxActiveModels = 2
	metrics.RunningRequestsSize = running
	metrics.WaitingQueueSize = waiting
	metrics.KVCacheUsagePercent = kvUsagePercent
	metrics.KvCacheMaxTokenCapacity = 131072
	metrics.CacheBlockSize = 16
	metrics.CacheNumBlocks = 8192
	metrics.UpdateTime = updateTime
}

func seedEndpointNPUState(endpoint fwkscheduling.Endpoint, observedAt time.Time, target, deviceID string) {
	endpoint.Put(npuattr.DefaultStateAttributeKey, &npuattr.State{
		ObservationTime:            observedAt,
		DataStatus:                 npuattr.DataPresent,
		DeviceCount:                1,
		AICoreUtilizationAvg:       40,
		AICoreUtilizationMax:       60,
		HBMUtilizationAvg:          50,
		HBMUtilizationMax:          70,
		HBMBandwidthUtilizationAvg: 30,
		HBMBandwidthUtilizationMax: 45,
		Devices: []npuattr.DeviceState{{
			Target:                  target,
			DeviceID:                deviceID,
			AICoreUtilization:       60,
			HBMUtilization:          70,
			HBMBandwidthUtilization: 45,
		}},
	})
}

func assertLoad(t *testing.T, load internalinflight.Load, wantTokens, wantRequests int) {
	t.Helper()
	if load.Tokens != wantTokens || load.Requests != wantRequests {
		t.Fatalf("unexpected inflight load: got %+v, want tokens=%d requests=%d", load, wantTokens, wantRequests)
	}
}

func assertEndpointAttributeLoad(t *testing.T, endpoint fwkscheduling.Endpoint, wantTokens, wantRequests int) {
	t.Helper()
	value, ok := endpoint.Get(internalinflight.DefaultAttributeKey)
	if !ok {
		t.Fatalf("expected attribute %q to be present", internalinflight.DefaultAttributeKey)
	}
	load, ok := value.(*internalinflight.Load)
	if !ok || load == nil {
		t.Fatalf("expected *inflight.Load attribute, got %T", value)
	}
	assertLoad(t, *load, wantTokens, wantRequests)
}

func waitForCondition(t *testing.T, condition func() bool, failureMessage string) {
	t.Helper()
	deadline := time.Now().Add(2 * time.Second)
	for time.Now().Before(deadline) {
		if condition() {
			return
		}
		time.Sleep(10 * time.Millisecond)
	}
	t.Fatal(failureMessage)
}

func splitNonEmptyLines(content string) []string {
	lines := make([]string, 0)
	start := 0
	for index := 0; index < len(content); index++ {
		if content[index] != '\n' {
			continue
		}
		if start != index {
			lines = append(lines, content[start:index])
		}
		start = index + 1
	}
	if start < len(content) {
		lines = append(lines, content[start:])
	}
	return lines
}