b970215e创建于 11 小时前历史提交
#!/usr/bin/env python3
"""
Precision verification: CPU vs NPU comparison for PatchCore.

Computes:
  - rel_diff = |score_cpu - score_npu| / (|score_cpu| + 1e-8)
  - PASS if mean rel_diff < 1%
  - Reports for FP32 and FP16 modes
"""

import argparse
import json
import logging
import os
import sys
import time

import numpy as np
from sklearn.metrics import roc_auc_score

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
LOGGER = logging.getLogger(__name__)


def run_inference(device: str, data_path: str, classname: str, fp16: bool = False):
    """Run PatchCore inference on specified device, return scores & timing."""
    from inference import build_patchcore, build_dataloader, evaluate

    device_obj = torch.device(device)
    LOGGER.info(f"Running on {device_obj} {'(FP16)' if fp16 else '(FP32)'}")

    patchcore_instance = build_patchcore(
        backbone_name="wideresnet50",
        device=device_obj,
        layers=["layer2", "layer3"],
        pretrain_embed_dim=1024,
        target_embed_dim=384,
        patchsize=3,
        patchstride=1,
        anomaly_scorer_num_nn=1,
        sampling_percentage=0.007,
    )

    if fp16 and device_obj.type != "cpu":
        patchcore_instance = patchcore_instance.half()
        patchcore_instance.forward_modules["feature_aggregator"].backbone = (
            patchcore_instance.forward_modules["feature_aggregator"].backbone.half()
        )

    train_loader = build_dataloader(data_path, classname, "train",
                                    batch_size=1, num_workers=0)
    patchcore_instance.fit(train_loader)
    n_bank = len(patchcore_instance.anomaly_scorer.detection_features)

    test_loader = build_dataloader(data_path, classname, "test",
                                   batch_size=1, num_workers=0)

    if device_obj.type == "cuda":
        torch.cuda.synchronize()
    elif device_obj.type == "npu":
        torch.npu.synchronize()

    t0 = time.perf_counter()
    scores, segmentations, labels_gt, masks_gt = evaluate(patchcore_instance, test_loader, device_obj)

    if device_obj.type == "cuda":
        torch.cuda.synchronize()
    elif device_obj.type == "npu":
        torch.npu.synchronize()
    t_infer = time.perf_counter() - t0

    scores = np.array(scores)
    segmentations = np.array(segmentations) if len(segmentations) > 0 else np.array([])
    labels_gt = np.array(labels_gt)

    auroc = roc_auc_score(labels_gt, scores) if len(np.unique(labels_gt)) > 1 else 0.5

    return {
        "scores": scores,
        "segmentations": segmentations,
        "labels_gt": labels_gt,
        "auroc": auroc,
        "n_bank": n_bank,
        "infer_time_s": t_infer,
        "device": device,
        "fp16": fp16,
    }


def compute_metrics(cpu_data, npu_data):
    """Compute relative difference and other metrics."""
    scores_cpu = cpu_data["scores"]
    scores_npu = npu_data["scores"]

    assert len(scores_cpu) == len(scores_npu), "Score count mismatch!"

    abs_diff = np.abs(scores_cpu - scores_npu)
    rel_diff = abs_diff / (np.abs(scores_cpu) + 1e-8)

    metrics = {
        "mean_abs_diff": float(np.mean(abs_diff)),
        "max_abs_diff": float(np.max(abs_diff)),
        "std_abs_diff": float(np.std(abs_diff)),
        "mean_rel_diff": float(np.mean(rel_diff)),
        "max_rel_diff": float(np.max(rel_diff)),
        "passed_rel_diff_1pct": float(np.mean(rel_diff)) < 0.01,
        "cpu_auroc": float(cpu_data["auroc"]),
        "npu_auroc": float(npu_data["auroc"]),
        "cpu_infer_time_s": cpu_data["infer_time_s"],
        "npu_infer_time_s": npu_data["infer_time_s"],
        "speedup": cpu_data["infer_time_s"] / max(npu_data["infer_time_s"], 1e-8),
        "memory_bank": cpu_data["n_bank"],
        "num_images": len(scores_cpu),
    }

    return metrics


def main():
    parser = argparse.ArgumentParser(description="PatchCore Precision Verification")
    parser.add_argument("--data_path", type=str, required=True, help="MVTec AD dataset path")
    parser.add_argument("--classname", type=str, default="bottle")
    args = parser.parse_args()

    results = {}

    # CPU FP32 baseline
    cpu_fp32 = run_inference("cpu", args.data_path, args.classname, fp16=False)
    results["cpu_fp32"] = {
        "auroc": cpu_fp32["auroc"],
        "infer_time_s": cpu_fp32["infer_time_s"],
        "n_bank": cpu_fp32["n_bank"],
    }
    cpu_scores = cpu_fp32["scores"]

    # Try NPU
    npu_available = False
    try:
        import torch_npu  # noqa: F401
        if torch.npu.is_available():
            npu_available = True
    except (ImportError, RuntimeError):
        pass

    if npu_available:
        for fp16 in [False, True]:
            tag = "npu_fp32" if not fp16 else "npu_fp16"
            npu_data = run_inference("npu:0", args.data_path, args.classname, fp16=fp16)
            results[tag] = {
                "auroc": npu_data["auroc"],
                "infer_time_s": npu_data["infer_time_s"],
                "n_bank": npu_data["n_bank"],
            }

            metrics = compute_metrics(cpu_fp32, npu_data)

            LOGGER.info("=" * 60)
            LOGGER.info(f"CPU vs NPU{' FP16' if fp16 else ' FP32'} Precision Report")
            LOGGER.info("=" * 60)
            LOGGER.info(f"  Mean abs diff:     {metrics['mean_abs_diff']:.6e}")
            LOGGER.info(f"  Max abs diff:      {metrics['max_abs_diff']:.6e}")
            LOGGER.info(f"  Mean rel diff:     {metrics['mean_rel_diff']:.6%}")
            LOGGER.info(f"  Max rel diff:      {metrics['max_rel_diff']:.6%}")
            LOGGER.info(f"  Rel diff < 1%:     {'PASS' if metrics['passed_rel_diff_1pct'] else 'FAIL'}")
            LOGGER.info(f"  CPU AUROC:         {metrics['cpu_auroc']:.4f}")
            LOGGER.info(f"  NPU AUROC:         {metrics['npu_auroc']:.4f}")
            LOGGER.info(f"  CPU time:          {metrics['cpu_infer_time_s']:.3f}s")
            LOGGER.info(f"  NPU time:          {metrics['npu_infer_time_s']:.3f}s")
            LOGGER.info(f"  Speedup:           {metrics['speedup']:.2f}x")
            LOGGER.info(f"  Memory bank:       {metrics['memory_bank']}")
            LOGGER.info(f"  Test images:       {metrics['num_images']}")
            LOGGER.info("=" * 60)

            results[tag + "_metrics"] = metrics
    else:
        LOGGER.warning("NPU not available — running CPU-only baseline.")
        LOGGER.info(f"CPU FP32 AUROC: {cpu_fp32['auroc']:.4f}")

    # Save results
    report_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "precision_report.json")
    import json
    # Convert numpy values
    class NpEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, (np.integer, np.floating, np.bool_)):
                return obj.item()
            return super().default(obj)

    with open(report_path, "w") as f:
        json.dump(results, f, indent=2, cls=NpEncoder)
    LOGGER.info(f"Precision report saved to {report_path}")


if __name__ == "__main__":
    main()