"""Simple script to run the CGF1 evaluator given a prediction file and GT file(s)."""
import argparse
import contextlib
import copy
import json
import os
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Union
import numpy as np
import pycocotools.mask as maskUtils
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from scipy.optimize import linear_sum_assignment
from tqdm import tqdm
@dataclass
class Metric:
name: str
image_level: bool
iou_threshold: Union[float, None]
CGF1_METRICS = [
Metric(name="cgF1", image_level=False, iou_threshold=None),
Metric(name="precision", image_level=False, iou_threshold=None),
Metric(name="recall", image_level=False, iou_threshold=None),
Metric(name="F1", image_level=False, iou_threshold=None),
Metric(name="positive_macro_F1", image_level=False, iou_threshold=None),
Metric(name="positive_micro_F1", image_level=False, iou_threshold=None),
Metric(name="positive_micro_precision", image_level=False, iou_threshold=None),
Metric(name="IL_precision", image_level=True, iou_threshold=None),
Metric(name="IL_recall", image_level=True, iou_threshold=None),
Metric(name="IL_F1", image_level=True, iou_threshold=None),
Metric(name="IL_FPR", image_level=True, iou_threshold=None),
Metric(name="IL_MCC", image_level=True, iou_threshold=None),
Metric(name="cgF1", image_level=False, iou_threshold=0.5),
Metric(name="precision", image_level=False, iou_threshold=0.5),
Metric(name="recall", image_level=False, iou_threshold=0.5),
Metric(name="F1", image_level=False, iou_threshold=0.5),
Metric(name="positive_macro_F1", image_level=False, iou_threshold=0.5),
Metric(name="positive_micro_F1", image_level=False, iou_threshold=0.5),
Metric(name="positive_micro_precision", image_level=False, iou_threshold=0.5),
Metric(name="cgF1", image_level=False, iou_threshold=0.75),
Metric(name="precision", image_level=False, iou_threshold=0.75),
Metric(name="recall", image_level=False, iou_threshold=0.75),
Metric(name="F1", image_level=False, iou_threshold=0.75),
Metric(name="positive_macro_F1", image_level=False, iou_threshold=0.75),
Metric(name="positive_micro_F1", image_level=False, iou_threshold=0.75),
Metric(name="positive_micro_precision", image_level=False, iou_threshold=0.75),
]
class COCOCustom(COCO):
"""COCO class from pycocotools with tiny modifications for speed"""
def createIndex(self):
print("creating index...")
anns, cats, imgs = {}, {}, {}
imgToAnns, catToImgs = defaultdict(list), defaultdict(list)
if "annotations" in self.dataset:
for ann in self.dataset["annotations"]:
imgToAnns[ann["image_id"]].append(ann)
anns[ann["id"]] = ann
if "images" in self.dataset:
if self.imgs:
imgs = self.imgs
else:
for img in self.dataset["images"]:
imgs[img["id"]] = img
if "categories" in self.dataset:
for cat in self.dataset["categories"]:
cats[cat["id"]] = cat
if "annotations" in self.dataset and "categories" in self.dataset:
for ann in self.dataset["annotations"]:
catToImgs[ann["category_id"]].append(ann["image_id"])
print("index created!")
self.anns = anns
self.imgToAnns = imgToAnns
self.catToImgs = catToImgs
self.imgs = imgs
self.cats = cats
def loadRes(self, resFile):
"""
Load result file and return a result api object.
:param resFile (str) : file name of result file
:return: res (obj) : result api object
"""
res = COCOCustom()
res.dataset["info"] = copy.deepcopy(self.dataset.get("info", {}))
res.dataset["images"] = self.dataset["images"]
print("Loading and preparing results...")
tic = time.time()
if isinstance(resFile, str):
with open(resFile) as f:
anns = json.load(f)
elif isinstance(resFile, np.ndarray):
anns = self.loadNumpyAnnotations(resFile)
else:
anns = resFile
if not isinstance(anns, list):
raise AssertionError("results is not an array of objects")
annsImgIds = [ann["image_id"] for ann in anns]
if not hasattr(self, "img_id_set"):
self.img_id_set = set(self.getImgIds())
if not set(annsImgIds).issubset(self.img_id_set):
raise AssertionError("Results do not correspond to current coco set")
if "caption" in anns[0]:
imgIds = set([img["id"] for img in res.dataset["images"]]) & set(
[ann["image_id"] for ann in anns]
)
res.dataset["images"] = [img for img in res.dataset["images"] if img["id"] in imgIds]
for id_, ann in enumerate(anns):
ann["id"] = id_ + 1
elif "bbox" in anns[0] and not anns[0]["bbox"] == []:
res.dataset["categories"] = copy.deepcopy(self.dataset["categories"])
for id_, ann in enumerate(anns):
bb = ann["bbox"]
x1, x2, y1, y2 = [bb[0], bb[0] + bb[2], bb[1], bb[1] + bb[3]]
if "segmentation" not in ann:
ann["segmentation"] = [[x1, y1, x1, y2, x2, y2, x2, y1]]
ann["area"] = bb[2] * bb[3]
ann["id"] = id_ + 1
ann["iscrowd"] = 0
elif "segmentation" in anns[0]:
res.dataset["categories"] = copy.deepcopy(self.dataset["categories"])
for id_, ann in enumerate(anns):
ann["area"] = maskUtils.area(ann["segmentation"])
if "bbox" not in ann:
ann["bbox"] = maskUtils.toBbox(ann["segmentation"])
ann["id"] = id_ + 1
ann["iscrowd"] = 0
elif "keypoints" in anns[0]:
res.dataset["categories"] = copy.deepcopy(self.dataset["categories"])
for id_, ann in enumerate(anns):
s = ann["keypoints"]
x = s[0::3]
y = s[1::3]
x0, x1, y0, y1 = np.min(x), np.max(x), np.min(y), np.max(y)
ann["area"] = (x1 - x0) * (y1 - y0)
ann["id"] = id_ + 1
ann["bbox"] = [x0, y0, x1 - x0, y1 - y0]
print("DONE (t={:0.2f}s)".format(time.time() - tic))
res.dataset["annotations"] = anns
res.imgs = self.imgs
res.createIndex()
return res
class CGF1Eval(COCOeval):
"""
This evaluator is based upon COCO evaluation, but evaluates the model in a more realistic setting
for downstream applications.
See SAM3 paper for the details on the CGF1 metric.
Do not use this evaluator directly. Prefer the CGF1Evaluator wrapper.
Notes:
- This evaluator does not support per-category evaluation (in the way defined by pyCocotools)
- In open vocabulary settings, we have different noun-phrases for each image. What we call an "image_id" here is actually an (image, noun-phrase) pair. So in every "image_id" there is only one category, implied by the noun-phrase. Thus we can ignore the usual coco "category" field of the predictions
"""
def __init__(
self,
coco_gt=None,
coco_dt=None,
iouType="segm",
threshold=0.5,
):
"""
Args:
coco_gt (COCO): ground truth COCO API
coco_dt (COCO): detections COCO API
iou_type (str): type of IoU to evaluate
threshold (float): threshold for predictions
"""
super().__init__(coco_gt, coco_dt, iouType)
self.threshold = threshold
self.params.useCats = False
self.params.areaRng = [[0**2, 1e5**2]]
self.params.areaRngLbl = ["all"]
self.params.maxDets = [1000000]
def computeIoU(self, imgId, catId):
p = self.params
if p.useCats:
gt = self._gts[imgId, catId]
dt = self._dts[imgId, catId]
else:
gt = [
_
for cId in p.catIds
for _ in self._gts[imgId, cId]
]
dt = [
_
for cId in p.catIds
for _ in self._dts[imgId, cId]
]
if len(gt) == 0 and len(dt) == 0:
return []
if p.iouType == "segm":
g = [g["segmentation"] for g in gt]
d = [d["segmentation"] for d in dt]
elif p.iouType == "bbox":
g = [g["bbox"] for g in gt]
d = [d["bbox"] for d in dt]
else:
raise Exception("unknown iouType for iou computation")
iscrowd = [int(ann["iscrowd"]) for ann in gt]
ious = maskUtils.iou(d, g, iscrowd)
return ious
def evaluateImg(self, imgId, catId, aRng, maxDet):
"""
perform evaluation for single category and image
:return: dict (single image results)
"""
p = self.params
if p.useCats:
raise AssertionError("This evaluator does not support per-category evaluation.")
if catId != -1:
raise AssertionError(f"Expected catId == -1, got {catId}")
all_gts = [
_
for cId in p.catIds
for _ in self._gts[imgId, cId]
]
keep_gt = np.array([not g["ignore"] for g in all_gts], dtype=bool)
gt = [g for g in all_gts if not g["ignore"]]
all_dts = [
_
for cId in p.catIds
for _ in self._dts[imgId, cId]
]
keep_dt = np.array([d["score"] >= self.threshold for d in all_dts], dtype=bool)
dt = [d for d in all_dts if d["score"] >= self.threshold]
if len(gt) == 0 and len(dt) == 0:
return {
"image_id": imgId,
"IL_TP": 0,
"IL_TN": 1,
"IL_FP": 0,
"IL_FN": 0,
"num_dt": len(dt),
}
if len(gt) > 0 and len(dt) == 0:
return {
"image_id": imgId,
"IL_TP": 0,
"IL_TN": 0,
"IL_FP": 0,
"IL_FN": 1,
"TPs": np.zeros((len(p.iouThrs),), dtype=np.int64),
"FPs": np.zeros((len(p.iouThrs),), dtype=np.int64),
"FNs": np.ones((len(p.iouThrs),), dtype=np.int64) * len(gt),
"local_F1s": np.zeros((len(p.iouThrs),), dtype=np.int64),
"local_positive_F1s": np.zeros((len(p.iouThrs),), dtype=np.int64),
"num_dt": len(dt),
}
ious = self.ious[(imgId, catId)]
if len(ious) == 0:
ious = np.zeros((len(dt), len(gt)))
else:
ious = ious[keep_dt, :][:, keep_gt]
if ious.shape != (len(dt), len(gt)):
raise RuntimeError(f"IoU shape mismatch: expected ({len(dt)}, {len(gt)}), got {ious.shape}")
matched_dt, matched_gt = linear_sum_assignment(-ious)
match_scores = ious[matched_dt, matched_gt]
TPs, FPs, FNs = [], [], []
IL_perfect = []
for thresh in p.iouThrs:
TP = (match_scores >= thresh).sum()
FP = len(dt) - TP
FN = len(gt) - TP
if FP < 0 or FN < 0:
raise RuntimeError(
f"Negative FP/FN: FP={FP}, FN={FN}, TP={TP}, match_scores={match_scores}, len(dt)={len(dt)}, len(gt)={len(gt)}, ious={ious}"
)
TPs.append(TP)
FPs.append(FP)
FNs.append(FN)
if FP == FN and FP == 0:
IL_perfect.append(1)
else:
IL_perfect.append(0)
TPs = np.array(TPs, dtype=np.int64)
FPs = np.array(FPs, dtype=np.int64)
FNs = np.array(FNs, dtype=np.int64)
IL_perfect = np.array(IL_perfect, dtype=np.int64)
precision = TPs / (TPs + FPs + 1e-4)
if not np.all(precision <= 1):
raise RuntimeError(f"Precision > 1 detected: {precision}")
recall = TPs / (TPs + FNs + 1e-4)
if not np.all(recall <= 1):
raise RuntimeError(f"Recall > 1 detected: {recall}")
F1 = 2 * precision * recall / (precision + recall + 1e-4)
result = {
"image_id": imgId,
"TPs": TPs,
"FPs": FPs,
"FNs": FNs,
"local_F1s": F1,
"IL_TP": (len(gt) > 0) and (len(dt) > 0),
"IL_FP": (len(gt) == 0) and (len(dt) > 0),
"IL_TN": (len(gt) == 0) and (len(dt) == 0),
"IL_FN": (len(gt) > 0) and (len(dt) == 0),
"num_dt": len(dt),
}
if len(gt) > 0 and len(dt) > 0:
result["local_positive_F1s"] = F1
return result
def accumulate(self, p=None):
"""
Accumulate per image evaluation results and store the result in self.eval
:param p: input params for evaluation
:return: None
"""
if self.evalImgs is None or len(self.evalImgs) == 0:
print("Please run evaluate() first")
if p is None:
p = self.params
setImgIds = set(p.imgIds)
TPs = np.zeros((len(p.iouThrs),), dtype=np.int64)
FPs = np.zeros((len(p.iouThrs),), dtype=np.int64)
pmFPs = np.zeros((len(p.iouThrs),), dtype=np.int64)
FNs = np.zeros((len(p.iouThrs),), dtype=np.int64)
local_F1s = np.zeros((len(p.iouThrs),), dtype=np.float64)
IL_TPs = 0
IL_FPs = 0
IL_TNs = 0
IL_FNs = 0
valid_img_count = 0
valid_F1_count = 0
evaledImgIds = set()
for res in self.evalImgs:
if res["image_id"] not in setImgIds:
continue
evaledImgIds.add(res["image_id"])
IL_TPs += res["IL_TP"]
IL_FPs += res["IL_FP"]
IL_TNs += res["IL_TN"]
IL_FNs += res["IL_FN"]
if "TPs" not in res:
continue
TPs += res["TPs"]
FPs += res["FPs"]
FNs += res["FNs"]
valid_img_count += 1
if "local_positive_F1s" in res:
local_F1s += res["local_positive_F1s"]
pmFPs += res["FPs"]
if res["num_dt"] > 0:
valid_F1_count += 1
missing = setImgIds - evaledImgIds
if missing:
raise RuntimeError(
f"{len(missing)} images not evaluated. "
f"Here are the IDs of the first 3: {list(missing)[:3]}"
)
precision = TPs / (TPs + FPs + 1e-4)
positive_micro_precision = TPs / (TPs + pmFPs + 1e-4)
if not np.all(precision <= 1):
raise RuntimeError(f"Precision > 1 detected: {precision}")
recall = TPs / (TPs + FNs + 1e-4)
if not np.all(recall <= 1):
raise RuntimeError(f"Recall > 1 detected: {recall}")
F1 = 2 * precision * recall / (precision + recall + 1e-4)
positive_micro_F1 = (
2
* positive_micro_precision
* recall
/ (positive_micro_precision + recall + 1e-4)
)
IL_rec = IL_TPs / (IL_TPs + IL_FNs + 1e-6)
IL_prec = IL_TPs / (IL_TPs + IL_FPs + 1e-6)
IL_F1 = 2 * IL_prec * IL_rec / (IL_prec + IL_rec + 1e-6)
IL_FPR = IL_FPs / (IL_FPs + IL_TNs + 1e-6)
IL_MCC = float(IL_TPs * IL_TNs - IL_FPs * IL_FNs) / (
(
float(IL_TPs + IL_FPs)
* float(IL_TPs + IL_FNs)
* float(IL_TNs + IL_FPs)
* float(IL_TNs + IL_FNs)
)
** 0.5
+ 1e-6
)
self.eval = {
"params": p,
"TPs": TPs,
"FPs": FPs,
"positive_micro_FPs": pmFPs,
"FNs": FNs,
"precision": precision,
"positive_micro_precision": positive_micro_precision,
"recall": recall,
"F1": F1,
"positive_micro_F1": positive_micro_F1,
"positive_macro_F1": local_F1s / valid_F1_count,
"IL_recall": IL_rec,
"IL_precision": IL_prec,
"IL_F1": IL_F1,
"IL_FPR": IL_FPR,
"IL_MCC": IL_MCC,
}
self.eval["cgF1"] = self.eval["positive_micro_F1"] * self.eval["IL_MCC"]
def summarize(self):
"""
Compute and display summary metrics for evaluation results.
"""
if not self.eval:
raise Exception("Please run accumulate() first")
def _summarize(iouThr=None, metric=""):
p = self.params
iStr = " {:<18} @[ IoU={:<9}] = {:0.3f}"
titleStr = "Average " + metric
iouStr = (
"{:0.2f}:{:0.2f}".format(p.iouThrs[0], p.iouThrs[-1])
if iouThr is None
else "{:0.2f}".format(iouThr)
)
s = self.eval[metric]
if iouThr is not None:
t = np.where(iouThr == p.iouThrs)[0]
s = s[t]
if len(s[s > -1]) == 0:
mean_s = -1
else:
mean_s = np.mean(s[s > -1])
print(iStr.format(titleStr, iouStr, mean_s))
return mean_s
def _summarize_single(metric=""):
titleStr = "Average " + metric
iStr = " {:<35} = {:0.3f}"
s = self.eval[metric]
print(iStr.format(titleStr, s))
return s
def _summarizeDets():
stats = []
for metric in CGF1_METRICS:
if metric.image_level:
stats.append(_summarize_single(metric=metric.name))
else:
stats.append(
_summarize(iouThr=metric.iou_threshold, metric=metric.name)
)
return np.asarray(stats)
summarize = _summarizeDets
self.stats = summarize()
def _evaluate(self):
"""
Run per image evaluation on given images and store results (a list of dict) in self.evalImgs
"""
p = self.params
p.imgIds = list(np.unique(p.imgIds))
p.useCats = False
p.maxDets = sorted(p.maxDets)
self.params = p
self._prepare()
catIds = [-1]
if p.iouType == "segm" or p.iouType == "bbox":
computeIoU = self.computeIoU
else:
raise RuntimeError(f"Unsupported iou {p.iouType}")
self.ious = {
(imgId, catId): computeIoU(imgId, catId)
for imgId in p.imgIds
for catId in catIds
}
maxDet = p.maxDets[-1]
evalImgs = []
for catId in catIds:
for areaRng in p.areaRng:
for imgId in p.imgIds:
evalImgs.append(self.evaluateImg(imgId, catId, areaRng, maxDet))
evalImgs = np.asarray(evalImgs).reshape(len(catIds), len(p.areaRng), len(p.imgIds))
return p.imgIds, evalImgs
class CGF1Evaluator:
"""
Wrapper class for cgF1 evaluation.
This supports the oracle setting (when several ground-truths are available per image)
"""
def __init__(
self,
gt_path: Union[str, List[str]],
iou_type="segm",
verbose=False,
):
"""
Args:
gt_path (str or list of str): path(s) to ground truth COCO json file(s)
iou_type (str): type of IoU to evaluate
threshold (float): threshold for predictions
"""
self.gt_paths = gt_path if isinstance(gt_path, list) else [gt_path]
self.iou_type = iou_type
self.coco_gts = [COCOCustom(gt) for gt in self.gt_paths]
self.verbose = verbose
self.coco_evals = []
for i, coco_gt in enumerate(self.coco_gts):
self.coco_evals.append(
CGF1Eval(
coco_gt=coco_gt,
iouType=iou_type,
)
)
self.coco_evals[i].useCats = False
exclude_img_ids = set()
for coco_gt in self.coco_gts[1:]:
exclude_img_ids = exclude_img_ids.union(
{
img["id"]
for img in coco_gt.dataset["images"]
if not img["is_instance_exhaustive"]
}
)
self.eval_img_ids = [
img["id"]
for img in self.coco_gts[0].dataset["images"]
if (img["is_instance_exhaustive"] and img["id"] not in exclude_img_ids)
]
def evaluate(self, pred_file: str):
"""
Evaluate the detections using cgF1 metric.
Args:
pred_file: path to the predictions COCO json file
"""
if len(self.coco_gts) == 0:
raise ValueError("No ground truth provided for evaluation.")
if len(self.coco_gts) != len(self.coco_evals):
raise RuntimeError("Mismatch in number of ground truths and evaluators.")
if self.verbose:
print(f"Loading predictions from {pred_file}")
with open(pred_file, "r") as f:
preds = json.load(f)
if self.verbose:
print(f"Loaded {len(preds)} predictions")
img2preds = defaultdict(list)
for pred in preds:
img2preds[pred["image_id"]].append(pred)
all_eval_imgs = []
for img_id in tqdm(self.eval_img_ids, disable=not self.verbose):
results = img2preds[img_id]
all_scorings = []
for cur_coco_gt, coco_eval in zip(self.coco_gts, self.coco_evals):
with open(os.devnull, "w") as devnull:
with contextlib.redirect_stdout(devnull):
coco_dt = (
cur_coco_gt.loadRes(results) if results else COCOCustom()
)
coco_eval.cocoDt = coco_dt
coco_eval.params.imgIds = [img_id]
coco_eval.params.useCats = False
_, eval_imgs = _evaluate(coco_eval)
all_scorings.append(eval_imgs)
selected = self._select_best_scoring(all_scorings)
all_eval_imgs.append(selected)
self.coco_evals[0].evalImgs = list(
np.concatenate(all_eval_imgs, axis=2).flatten()
)
self.coco_evals[0].params.imgIds = self.eval_img_ids
self.coco_evals[0]._paramsEval = copy.deepcopy(self.coco_evals[0].params)
if self.verbose:
print(f"Accumulating results")
self.coco_evals[0].accumulate()
print("cgF1 metric, IoU type={}".format(self.iou_type))
self.coco_evals[0].summarize()
print()
out = {}
for i, value in enumerate(self.coco_evals[0].stats):
name = CGF1_METRICS[i].name
if CGF1_METRICS[i].iou_threshold is not None:
name = f"{name}@{CGF1_METRICS[i].iou_threshold}"
out[f"cgF1_eval_{self.iou_type}_{name}"] = float(value)
return out
@staticmethod
def _select_best_scoring(scorings):
if len(scorings) == 1:
return scorings[0]
if scorings[0].ndim != 3:
raise RuntimeError(
f"Expecting results in [numCats, numAreas, numImgs] format, got {scorings[0].shape}"
)
if scorings[0].shape[0] != 1:
raise RuntimeError(
f"Expecting a single category, got {scorings[0].shape[0]}"
)
for scoring in scorings:
if scoring.shape != scorings[0].shape:
raise RuntimeError(
f"Shape mismatch: {scoring.shape}, {scorings[0].shape}"
)
selected_imgs = []
for img_id in range(scorings[0].shape[-1]):
best = scorings[0][:, :, img_id]
for scoring in scorings[1:]:
current = scoring[:, :, img_id]
if "local_F1s" in best[0, 0] and "local_F1s" in current[0, 0]:
best_score = best[0, 0]["local_F1s"].mean()
current_score = current[0, 0]["local_F1s"].mean()
if current_score > best_score:
best = current
else:
if "local_F1s" not in current[0, 0]:
best = current
selected_imgs.append(best)
result = np.stack(selected_imgs, axis=-1)
if result.shape != scorings[0].shape:
raise RuntimeError(f"Result shape {result.shape} does not match expected {scorings[0].shape}")
return result
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--pred_file",
type=str,
required=True,
help="Path to the prediction file in COCO format.",
)
parser.add_argument(
"--gt_files",
type=str,
nargs="+",
required=True,
help="Paths to the ground truth files in COCO format.",
)
args = parser.parse_args()
if len(args.gt_files) == 0:
raise ValueError("At least one GT file must be provided.")
is_gold = args.gt_files[0].split("_")[-1].startswith("gold_")
if is_gold and len(args.gt_files) < 3:
print(
"WARNING: based on the name, it seems you are using gold GT files. Typically, there should be 3 GT files for gold subsets (a, b, c)."
)
evaluator = CGF1Evaluator(
gt_path=args.gt_files, verbose=True, iou_type="segm"
)
results = evaluator.evaluate(args.pred_file)
print(results)
if __name__ == "__main__":
main()