import os
import time
import queue
import threading
import numpy as np
import cv2
import openvino as ov
from openvino.preprocess import PrePostProcessor, ColorFormat
from ultralytics import YOLO
CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(len(CLASS_NAMES), 3), dtype="uint8")
class VideoDetectionApp:
def __init__(self, video_path, device='CPU', conf_thresh=0.2, iou_thresh=0.7):
self.video_path = video_path
self.device = device
self.conf_thresh = conf_thresh
self.iou_thresh = iou_thresh
self.abort_flag = False
self.input_queue = queue.Queue(maxsize=30)
self.output_queue = queue.Queue(maxsize=30)
self.compiled_model = self.prepare_openvino_model()
self.infer_request = self.compiled_model.create_infer_request()
def prepare_openvino_model(self):
"""导出并加载集成了预处理的 OpenVINO 模型"""
model_path_xml = 'yolo11n_int8_openvino_model/yolo11n.xml'
if not os.path.exists(model_path_xml):
print("Exporting YOLO model to OpenVINO INT8...")
model = YOLO('yolo11n.pt', task='detect')
model.export(format='openvino', int8=True, imgsz=640)
core = ov.Core()
print(f"Loading model to {self.device}...")
model = core.read_model(model_path_xml)
ppp = PrePostProcessor(model)
ppp.input().tensor() \
.set_element_type(ov.Type.u8) \
.set_layout(ov.Layout('NHWC')) \
.set_color_format(ColorFormat.BGR)
ppp.input().preprocess() \
.convert_element_type(ov.Type.f32) \
.convert_color(ColorFormat.RGB) \
.scale([255.0, 255.0, 255.0]) \
.convert_layout(ov.Layout('NCHW'))
model = ppp.build()
return core.compile_model(model, self.device)
def nms_postprocess(self, output):
"""处理 YOLO 输出并进行 NMS"""
output = output[0]
boxes_raw = output[:4, :]
scores_raw = output[4:, :]
class_ids = np.argmax(scores_raw, axis=0)
confidences = np.max(scores_raw, axis=0)
mask = confidences > self.conf_thresh
boxes_raw = boxes_raw[:, mask]
confidences = confidences[mask]
class_ids = class_ids[mask]
if len(confidences) == 0:
return [], [], []
cx, cy, w, h = boxes_raw[0], boxes_raw[1], boxes_raw[2], boxes_raw[3]
x1 = cx - w / 2
y1 = cy - h / 2
x2 = cx + w / 2
y2 = cy + h / 2
boxes_xyxy = np.stack((x1, y1, x2, y2), axis=1)
indices = cv2.dnn.NMSBoxes(boxes_xyxy.tolist(), confidences.tolist(), self.conf_thresh, self.iou_thresh)
if len(indices) > 0:
indices = indices.flatten()
return boxes_xyxy[indices], confidences[indices], class_ids[indices]
return [], [], []
def input_thread_func(self):
"""读取视频并预处理"""
cap = cv2.VideoCapture(self.video_path)
print("Input thread started.")
while not self.abort_flag:
ret, frame = cap.read()
if not ret:
print("Video finished.")
self.abort_flag = True
break
resized_frame = cv2.resize(frame, (640, 640))
input_tensor = np.expand_dims(resized_frame, axis=0)
try:
self.input_queue.put((input_tensor, resized_frame), timeout=1)
except queue.Full:
pass
cap.release()
def rendering_thread_func(self):
"""从输出队列获取结果并显示"""
print("Rendering thread started.")
while not self.abort_flag:
try:
img = self.output_queue.get(timeout=1)
except queue.Empty:
continue
display_img = cv2.resize(img, (640, 480))
cv2.imshow('YOLO11n + OpenVINO (No Tracking)', display_img)
key = cv2.waitKey(1)
if key == 27:
self.abort_flag = True
cv2.destroyAllWindows()
def run(self):
input_th = threading.Thread(target=self.input_thread_func, daemon=True)
render_th = threading.Thread(target=self.rendering_thread_func, daemon=True)
input_th.start()
render_th.start()
fps_count = 0
fps = 0
start_time = time.perf_counter()
print("Main inference loop started.")
while not self.abort_flag:
try:
input_tensor, img = self.input_queue.get(timeout=0.1)
except queue.Empty:
continue
self.infer_request.infer(input_tensor)
output = self.infer_request.get_output_tensor(0).data
boxes, confidences, class_ids = self.nms_postprocess(output)
if len(boxes) > 0:
for box, conf, cls_id in zip(boxes, confidences, class_ids):
x1, y1, x2, y2 = box.astype(int)
color = [int(c) for c in COLORS[cls_id]]
label = f"{CLASS_NAMES[cls_id]} {conf:.2f}"
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
cv2.putText(img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
fps_count += 1
if fps_count >= 10:
end_time = time.perf_counter()
fps = fps_count / (end_time - start_time)
start_time = end_time
fps_count = 0
print(f"FPS: {fps:.2f}")
cv2.putText(img, f"FPS: {fps:.2f} ", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
try:
self.output_queue.put(img, timeout=1)
except queue.Full:
pass
input_th.join()
render_th.join()
if __name__ == "__main__":
video_file = '166959951-1-208.mp4'
if os.path.exists(video_file):
app = VideoDetectionApp(video_file, device='CPU')
app.run()
else:
print(f"Error: Video file '{video_file}' not found.")