import os
import time
import cv2
import numpy as np
import openvino as ov
from openvino.preprocess import PrePostProcessor, ColorFormat
from ultralytics import YOLO
class YOLOSeg:
def __init__(self, model_name='yolo11n-seg', device='CPU', conf_thresh=0.2, iou_thresh=0.7):
self.device = device
self.conf_thresh = conf_thresh
self.iou_thresh = iou_thresh
self.model_name = model_name
self.input_size = (640, 640)
self.compiled_model = self._prepare_model()
self.infer_request = self.compiled_model.create_infer_request()
def _prepare_model(self):
model_path_xml = f'{self.model_name}_int8_openvino_model/{self.model_name}.xml'
if not os.path.exists(model_path_xml):
print(f"Exporting {self.model_name}...")
model = YOLO(f'{self.model_name}.pt', task='segment')
model.export(format='openvino', int8=True, imgsz=self.input_size[0])
core = ov.Core()
model = core.read_model(model_path_xml)
ppp = PrePostProcessor(model)
ppp.input().tensor().set_element_type(ov.Type.u8) \
.set_layout(ov.Layout('NHWC')) \
.set_color_format(ColorFormat.BGR)
ppp.input().preprocess().convert_element_type(ov.Type.f32) \
.convert_color(ColorFormat.RGB) \
.scale([255.0, 255.0, 255.0]) \
.convert_layout(ov.Layout('NCHW'))
return core.compile_model(ppp.build(), self.device)
def _process_mask(self, protos, masks_in, bboxes, shape):
c, mh, mw = protos.shape
masks = masks_in @ protos.reshape(c, -1)
masks = masks.reshape(-1, mh, mw)
masks = 1 / (1 + np.exp(-masks))
upsampled_masks = []
for i, mask in enumerate(masks):
m = cv2.resize(mask, shape, interpolation=cv2.INTER_LINEAR)
x1, y1, x2, y2 = bboxes[i].astype(int)
x1, x2 = np.clip([x1, x2], 0, shape[1])
y1, y2 = np.clip([y1, y2], 0, shape[0])
crop_mask = np.zeros(shape, dtype=np.float32)
crop_mask[y1:y2, x1:x2] = m[y1:y2, x1:x2]
upsampled_masks.append(crop_mask > 0.5)
return np.array(upsampled_masks)
def __call__(self, img):
resized_img = cv2.resize(img, self.input_size)
input_tensor = np.expand_dims(resized_img, axis=0)
self.infer_request.infer(input_tensor)
res0 = self.infer_request.get_output_tensor(0).data
res1 = self.infer_request.get_output_tensor(1).data
if res0.shape[1] == 32: res0, res1 = res1, res0
output0, output1 = res0[0], res1[0]
boxes = output0[:4, :]
scores = output0[4:84, :]
masks_coef = output0[84:, :]
class_ids = np.argmax(scores, axis=0)
confidences = np.max(scores, axis=0)
mask = confidences > self.conf_thresh
if not np.any(mask): return [], [], [], []
boxes = boxes[:, mask]
confidences = confidences[mask]
class_ids = class_ids[mask]
masks_coef = masks_coef[:, mask]
cx, cy, w, h = boxes[0], boxes[1], boxes[2], boxes[3]
boxes_xyxy = np.stack((cx - w / 2, cy - h / 2, cx + w / 2, cy + h / 2), axis=1)
indices = cv2.dnn.NMSBoxes(boxes_xyxy.tolist(), confidences.tolist(), self.conf_thresh, self.iou_thresh)
if len(indices) > 0:
indices = indices.flatten()
final_masks = self._process_mask(output1, masks_coef[:, indices].T, boxes_xyxy[indices], self.input_size)
return boxes_xyxy[indices], confidences[indices], class_ids[indices], final_masks
return [], [], [], []
CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]
np.random.seed(0)
COLORS = np.random.randint(0, 255, size=(len(CLASS_NAMES), 3), dtype="uint8")
def process_video(video_path, output_path=None):
detector = YOLOSeg(model_name='yolo11n-seg', device='CPU')
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Cannot open video {video_path}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
print("Start processing video... Press ESC to exit.")
prev_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
boxes, confs, clss, masks = detector(frame)
display_img = cv2.resize(frame, (640, 640))
if len(boxes) > 0:
mask_overlay = display_img.copy()
for i, (box, conf, cls_id) in enumerate(zip(boxes, confs, clss)):
color = [int(c) for c in COLORS[cls_id]]
if len(masks) > i:
mask_overlay[masks[i]] = color
x1, y1, x2, y2 = box.astype(int)
label = f"{CLASS_NAMES[cls_id]} {conf:.2f}"
cv2.rectangle(display_img, (x1, y1), (x2, y2), color, 2)
cv2.putText(display_img, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
cv2.addWeighted(mask_overlay, 0.4, display_img, 0.6, 0, display_img)
curr_time = time.time()
process_fps = 1 / (curr_time - prev_time)
prev_time = curr_time
cv2.putText(display_img, f"FPS: {process_fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
display_img = cv2.resize(display_img, (640, 480))
cv2.imshow("YOLO11n-Seg OpenVINO", display_img)
if cv2.waitKey(1) == 27:
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
video_file = '166959951-1-208.mp4'
if os.path.exists(video_file):
process_video(video_file)
else:
print("Please provide a valid video path.")