YYour Nameadd openvino
1def4e0e创建于 2025年12月15日历史提交
import os
import time
import queue
import threading
import numpy as np
import cv2
import openvino as ov  # Layout 就在这里面,例如 ov.Layout
from openvino.preprocess import PrePostProcessor, ColorFormat
from ultralytics import YOLO

# 定义类别
CLASS_NAMES = [
    'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
    'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
    'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
    'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
    'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
    'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
    'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

# 为每个类别生成固定的随机颜色
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(len(CLASS_NAMES), 3), dtype="uint8")


class VideoDetectionApp:
    def __init__(self, video_path, device='CPU', conf_thresh=0.2, iou_thresh=0.7):
        self.video_path = video_path
        self.device = device
        self.conf_thresh = conf_thresh
        self.iou_thresh = iou_thresh

        self.abort_flag = False
        # 使用 maxsize 防止队列无限增长导致内存溢出
        self.input_queue = queue.Queue(maxsize=30)
        self.output_queue = queue.Queue(maxsize=30)

        # 编译模型
        self.compiled_model = self.prepare_openvino_model()
        self.infer_request = self.compiled_model.create_infer_request()

    def prepare_openvino_model(self):
        """导出并加载集成了预处理的 OpenVINO 模型"""
        model_path_xml = 'yolo11n_int8_openvino_model/yolo11n.xml'
        if not os.path.exists(model_path_xml):
            print("Exporting YOLO model to OpenVINO INT8...")
            model = YOLO('yolo11n.pt', task='detect')
            model.export(format='openvino', int8=True, imgsz=640)

        core = ov.Core()
        print(f"Loading model to {self.device}...")
        model = core.read_model(model_path_xml)

        # === 核心优化:将预处理步骤(归一化、通道转换)集成到模型中 ===
        ppp = PrePostProcessor(model)

        # 1. 设置输入张量信息:uint8, NHWC, BGR
        # 【修复点】这里直接使用 ov.Layout
        ppp.input().tensor() \
            .set_element_type(ov.Type.u8) \
            .set_layout(ov.Layout('NHWC')) \
            .set_color_format(ColorFormat.BGR)

        # 2. 设置预处理步骤:转float -> 转RGB -> 归一化 -> 转NCHW
        # 【修复点】这里直接使用 ov.Layout
        ppp.input().preprocess() \
            .convert_element_type(ov.Type.f32) \
            .convert_color(ColorFormat.RGB) \
            .scale([255.0, 255.0, 255.0]) \
            .convert_layout(ov.Layout('NCHW'))

        model = ppp.build()
        return core.compile_model(model, self.device)

    def nms_postprocess(self, output):
        """处理 YOLO 输出并进行 NMS"""
        # Output shape: (1, 84, 8400)
        output = output[0]  # remove batch dim -> (84, 8400)

        # 分离 bbox 和 scores
        boxes_raw = output[:4, :]
        scores_raw = output[4:, :]

        # 找到每个 anchor 的最大 score 和对应 class index
        class_ids = np.argmax(scores_raw, axis=0)
        confidences = np.max(scores_raw, axis=0)

        # 阈值过滤
        mask = confidences > self.conf_thresh
        boxes_raw = boxes_raw[:, mask]
        confidences = confidences[mask]
        class_ids = class_ids[mask]

        if len(confidences) == 0:
            return [], [], []

        # xywh 转 xyxy
        cx, cy, w, h = boxes_raw[0], boxes_raw[1], boxes_raw[2], boxes_raw[3]
        x1 = cx - w / 2
        y1 = cy - h / 2
        x2 = cx + w / 2
        y2 = cy + h / 2

        boxes_xyxy = np.stack((x1, y1, x2, y2), axis=1)

        # OpenCV NMS
        indices = cv2.dnn.NMSBoxes(boxes_xyxy.tolist(), confidences.tolist(), self.conf_thresh, self.iou_thresh)

        if len(indices) > 0:
            indices = indices.flatten()
            return boxes_xyxy[indices], confidences[indices], class_ids[indices]
        return [], [], []

    def input_thread_func(self):
        """读取视频并预处理"""
        cap = cv2.VideoCapture(self.video_path)
        print("Input thread started.")
        while not self.abort_flag:
            ret, frame = cap.read()
            if not ret:
                print("Video finished.")
                self.abort_flag = True
                break

            # Resize 图像以符合模型输入 (640x640)
            resized_frame = cv2.resize(frame, (640, 640))

            # 扩展维度 [H,W,C] -> [1,H,W,C]
            input_tensor = np.expand_dims(resized_frame, axis=0)

            try:
                # 放入队列,超时1秒以便能响应 abort_flag
                self.input_queue.put((input_tensor, resized_frame), timeout=1)
            except queue.Full:
                pass
        cap.release()

    def rendering_thread_func(self):
        """从输出队列获取结果并显示"""
        print("Rendering thread started.")
        while not self.abort_flag:
            try:
                img = self.output_queue.get(timeout=1)
            except queue.Empty:
                continue

            # 显示大图
            display_img = cv2.resize(img, (640, 480))
            cv2.imshow('YOLO11n + OpenVINO (No Tracking)', display_img)

            key = cv2.waitKey(1)
            if key == 27:  # ESC
                self.abort_flag = True
        cv2.destroyAllWindows()

    def run(self):
        input_th = threading.Thread(target=self.input_thread_func, daemon=True)
        render_th = threading.Thread(target=self.rendering_thread_func, daemon=True)
        input_th.start()
        render_th.start()

        fps_count = 0
        fps = 0
        start_time = time.perf_counter()

        print("Main inference loop started.")
        while not self.abort_flag:
            try:
                # 获取输入
                input_tensor, img = self.input_queue.get(timeout=0.1)
            except queue.Empty:
                continue

            # 1. 推理
            self.infer_request.infer(input_tensor)
            output = self.infer_request.get_output_tensor(0).data

            # 2. 后处理 (NMS)
            boxes, confidences, class_ids = self.nms_postprocess(output)

            # 3. 画框
            if len(boxes) > 0:
                for box, conf, cls_id in zip(boxes, confidences, class_ids):
                    x1, y1, x2, y2 = box.astype(int)
                    color = [int(c) for c in COLORS[cls_id]]
                    label = f"{CLASS_NAMES[cls_id]} {conf:.2f}"

                    cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            # 4. 计算 FPS
            fps_count += 1
            if fps_count >= 10:
                end_time = time.perf_counter()
                fps = fps_count / (end_time - start_time)
                start_time = end_time
                fps_count = 0
                print(f"FPS: {fps:.2f}")

            # 绘制 FPS
            cv2.putText(img, f"FPS: {fps:.2f} ", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            # 5. 发送到显示队列
            try:
                self.output_queue.put(img, timeout=1)
            except queue.Full:
                pass

        input_th.join()
        render_th.join()


if __name__ == "__main__":
    video_file = '166959951-1-208.mp4'
    if os.path.exists(video_file):
        app = VideoDetectionApp(video_file, device='CPU')
        app.run()
    else:
        print(f"Error: Video file '{video_file}' not found.")