"""在验证集上评估“模型预测”与“物理先验基线”效果。"""
from __future__ import annotations
import argparse
import json
import os
import warnings
from typing import Dict, List
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "")
import numpy as np
import torch
from torch.utils.data import DataLoader
from models.physics_residual_gru import PhysicsResidualGRU
from utils.dataset_builder import TrajectoryResidualDataset, build_samples
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
warnings.filterwarnings("ignore", message=".*CUDA capability.*", category=UserWarning)
warnings.filterwarnings("ignore", message=".*Found GPU0.*", category=UserWarning)
def find_dataset_dir() -> str:
candidates = [
os.path.join(PROJECT_DIR, "..", "BPT-V", "dataset"),
os.path.join(PROJECT_DIR, "..", "..", "BPT-V", "dataset"),
]
for p in candidates:
if os.path.isdir(p) and os.path.exists(os.path.join(p, "video_001", "data.json")):
return os.path.abspath(p)
raise FileNotFoundError(f"未找到数据集目录,已尝试路径:{candidates}")
def collect_video_ids(dataset_dir: str) -> List[int]:
ids = []
for name in os.listdir(dataset_dir):
if name.startswith("video_"):
tail = name.replace("video_", "")
if tail.isdigit() and os.path.exists(os.path.join(dataset_dir, name, "data.json")):
ids.append(int(tail))
return sorted(ids)
def resolve_ckpt_path(path_str: str) -> str:
if os.path.isabs(path_str):
return path_str
return os.path.normpath(os.path.join(PROJECT_DIR, path_str))
def split_ids(all_ids: List[int], train_ratio: float = 0.8):
all_ids = sorted(all_ids)
n = len(all_ids)
k = max(1, int(n * train_ratio))
train_ids = all_ids[:k]
val_ids = all_ids[k:]
if not val_ids:
val_ids = train_ids[-5:]
return train_ids, val_ids
def choose_device() -> torch.device:
return torch.device("cpu")
def mse_np(a: np.ndarray, b: np.ndarray) -> float:
return float(np.mean((a - b) ** 2))
def event_f1_np(logit: np.ndarray, label: np.ndarray) -> float:
prob = 1.0 / (1.0 + np.exp(-logit))
pred = (prob >= 0.5).astype(np.float32)
tgt = (label >= 0.5).astype(np.float32)
tp = float((pred * tgt).sum())
fp = float((pred * (1.0 - tgt)).sum())
fn = float(((1.0 - pred) * tgt).sum())
if tp <= 1e-8:
return 0.0
p = tp / max(tp + fp, 1e-8)
r = tp / max(tp + fn, 1e-8)
return float(2.0 * p * r / max(p + r, 1e-8))
def best_event_f1_np(logits: np.ndarray, labels: np.ndarray):
probs = 1.0 / (1.0 + np.exp(-logits.reshape(-1)))
target = (labels.reshape(-1) >= 0.5).astype(np.float32)
best_f1 = 0.0
best_thr = 0.5
for thr in np.linspace(0.05, 0.95, 19):
pred = (probs >= thr).astype(np.float32)
tp = float((pred * target).sum())
fp = float((pred * (1.0 - target)).sum())
fn = float(((1.0 - pred) * target).sum())
if tp <= 1e-8:
f1 = 0.0
else:
p = tp / max(tp + fp, 1e-8)
r = tp / max(tp + fn, 1e-8)
f1 = float(2.0 * p * r / max(p + r, 1e-8))
if f1 > best_f1:
best_f1 = f1
best_thr = float(thr)
return best_f1, best_thr
def evaluate_model(model: PhysicsResidualGRU, loader: DataLoader, device: torch.device) -> Dict[str, float]:
model.eval()
model_mse = []
physics_mse = []
event_f1 = []
unc_nll = []
all_event_logits = []
all_event_labels = []
with torch.no_grad():
for feats, prior, _, target, _, priors, prior_conf, event_labels, _ in loader:
feats = feats.to(device)
prior = prior.to(device)
priors = priors.to(device)
prior_conf = prior_conf.to(device)
target = target.to(device)
outputs = model(feats, priors, prior_conf)
pred = outputs["pred_xy"]
logvar = outputs["logvar"]
pred_np = pred.cpu().numpy()
prior_np = prior.cpu().numpy()
target_np = target.cpu().numpy()
event_logit_np = outputs["event_logit"].cpu().numpy()
event_label_np = event_labels.numpy()
all_event_logits.append(event_logit_np)
all_event_labels.append(event_label_np)
nll_np = (0.5 * (np.exp(-logvar.cpu().numpy()) * (pred_np - target_np) ** 2 + logvar.cpu().numpy())).mean()
for i in range(pred_np.shape[0]):
model_mse.append(mse_np(pred_np[i], target_np[i]))
physics_mse.append(mse_np(prior_np[i], target_np[i]))
event_f1.append(event_f1_np(event_logit_np[i], event_label_np[i]))
unc_nll.append(float(nll_np))
model_mse_mean = float(np.mean(model_mse)) if model_mse else 0.0
physics_mse_mean = float(np.mean(physics_mse)) if physics_mse else 0.0
improve = (physics_mse_mean - model_mse_mean) / max(physics_mse_mean, 1e-12) * 100.0
if all_event_logits:
event_f1_calib, event_best_thr = best_event_f1_np(np.concatenate(all_event_logits), np.concatenate(all_event_labels))
else:
event_f1_calib, event_best_thr = 0.0, 0.5
return {
"model_mse_mean": model_mse_mean,
"physics_mse_mean": physics_mse_mean,
"improvement_pct_vs_physics": float(improve),
"event_f1_mean": float(np.mean(event_f1)) if event_f1 else 0.0,
"event_f1_calibrated": float(event_f1_calib),
"event_best_threshold": float(event_best_thr),
"uncertainty_nll_mean": float(np.mean(unc_nll)) if unc_nll else 0.0,
"samples": len(model_mse),
}
def main():
parser = argparse.ArgumentParser(description="评估 物理先验 + 残差 模型。")
parser.add_argument("--ckpt", type=str, default=os.path.join(PROJECT_DIR, "checkpoints", "default", "best_model.pt"))
parser.add_argument("--dataset-dir", type=str, default="")
parser.add_argument("--batch-size", type=int, default=16)
parser.add_argument("--missing-start", type=int, default=30)
parser.add_argument("--missing-end", type=int, default=50)
parser.add_argument("--tag", type=str, default="default")
args = parser.parse_args()
ckpt_path = resolve_ckpt_path(args.ckpt)
if not os.path.exists(ckpt_path):
raise FileNotFoundError(f"未找到模型权重:{ckpt_path}")
if args.dataset_dir:
dataset_dir = os.path.abspath(args.dataset_dir)
if not os.path.exists(os.path.join(dataset_dir, "video_001", "data.json")):
raise FileNotFoundError(f"数据集目录无效:{dataset_dir}")
else:
dataset_dir = find_dataset_dir()
all_ids = collect_video_ids(dataset_dir)
_, val_ids = split_ids(all_ids, 0.8)
val_samples = build_samples(dataset_dir, val_ids, args.missing_start, args.missing_end)
val_ds = TrajectoryResidualDataset(val_samples)
val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False)
payload = torch.load(ckpt_path, map_location="cpu")
cfg = payload.get("config", {})
input_dim = int(payload.get("input_dim", cfg.get("input_dim", 17)))
num_priors = int(payload.get("num_priors", 3))
model = PhysicsResidualGRU(
input_dim=input_dim,
hidden_dim=int(cfg.get("hidden_dim", 64)),
num_layers=int(cfg.get("num_layers", 2)),
dropout=float(cfg.get("dropout", 0.1)),
num_priors=num_priors,
)
model.load_state_dict(payload["model_state_dict"])
device = choose_device()
model = model.to(device)
metrics = evaluate_model(model, val_loader, device)
out_dir = os.path.join(PROJECT_DIR, "outputs", "metrics")
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, f"model_eval_{args.tag}.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(metrics, f, ensure_ascii=False, indent=2)
print("=" * 60)
print("评估完成")
print("样本数:", metrics["samples"])
print("物理先验 MSE:", f"{metrics['physics_mse_mean']:.6f}")
print("模型 MSE:", f"{metrics['model_mse_mean']:.6f}")
print("相对物理先验提升:", f"{metrics['improvement_pct_vs_physics']:.2f}%")
print("事件 F1(固定阈值0.5):", f"{metrics['event_f1_mean']:.4f}")
print("事件 F1(校准阈值):", f"{metrics['event_f1_calibrated']:.4f}")
print("事件最优阈值:", f"{metrics['event_best_threshold']:.2f}")
print("不确定性 NLL:", f"{metrics['uncertainty_nll_mean']:.6f}")
print("结果已保存:", out_path)
print("=" * 60)
if __name__ == "__main__":
main()