import os
import time
import cv2
import numpy as np
import openvino as ov
from openvino.preprocess import PrePostProcessor, ColorFormat
from ultralytics import YOLO
class YOLOPose:
def __init__(self, model_name='yolo11n-pose', device='CPU', conf_thresh=0.2, iou_thresh=0.7):
self.device = device
self.conf_thresh = conf_thresh
self.iou_thresh = iou_thresh
self.model_name = model_name
self.input_size = (640, 640)
self.compiled_model = self._prepare_model()
self.infer_request = self.compiled_model.create_infer_request()
def _prepare_model(self):
model_path_xml = f'{self.model_name}_int8_openvino_model/{self.model_name}.xml'
if not os.path.exists(model_path_xml):
print(f"Exporting {self.model_name} to OpenVINO INT8...")
model = YOLO(f'{self.model_name}.pt', task='pose')
model.export(format='openvino', int8=True, imgsz=self.input_size[0])
core = ov.Core()
model = core.read_model(model_path_xml)
ppp = PrePostProcessor(model)
ppp.input().tensor().set_element_type(ov.Type.u8) \
.set_layout(ov.Layout('NHWC')) \
.set_color_format(ColorFormat.BGR)
ppp.input().preprocess().convert_element_type(ov.Type.f32) \
.convert_color(ColorFormat.RGB) \
.scale([255.0, 255.0, 255.0]) \
.convert_layout(ov.Layout('NCHW'))
return core.compile_model(ppp.build(), self.device)
def __call__(self, img):
resized_img = cv2.resize(img, self.input_size)
input_tensor = np.expand_dims(resized_img, axis=0)
self.infer_request.infer(input_tensor)
output = self.infer_request.get_output_tensor(0).data[0]
boxes_raw = output[:4, :]
scores_raw = output[4, :]
kpts_raw = output[5:, :]
mask = scores_raw > self.conf_thresh
if not np.any(mask):
return [], [], []
boxes_raw = boxes_raw[:, mask]
scores = scores_raw[mask]
kpts_raw = kpts_raw[:, mask]
cx, cy, w, h = boxes_raw[0], boxes_raw[1], boxes_raw[2], boxes_raw[3]
boxes_xyxy = np.stack((cx - w / 2, cy - h / 2, cx + w / 2, cy + h / 2), axis=1)
indices = cv2.dnn.NMSBoxes(boxes_xyxy.tolist(), scores.tolist(), self.conf_thresh, self.iou_thresh)
if len(indices) > 0:
indices = indices.flatten()
final_boxes = boxes_xyxy[indices]
final_scores = scores[indices]
final_kpts = kpts_raw[:, indices].T
final_kpts = final_kpts.reshape(-1, 17, 3)
return final_boxes, final_scores, final_kpts
return [], [], []
SKELETON = [
(0, 1), (0, 2), (1, 3), (2, 4),
(5, 6), (5, 7), (7, 9),
(6, 8), (8, 10),
(11, 12), (5, 11), (6, 12),
(11, 13), (13, 15),
(12, 14), (14, 16)
]
LIMB_COLOR = (0, 255, 0)
KPT_COLOR = (0, 0, 255)
def draw_pose(img, boxes, scores, keypoints, conf_thresh=0.5):
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box.astype(int)
cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
cv2.putText(img, f"Person {scores[i]:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
kpts = keypoints[i]
for p1_idx, p2_idx in SKELETON:
x1_k, y1_k, conf1 = kpts[p1_idx]
x2_k, y2_k, conf2 = kpts[p2_idx]
if conf1 > conf_thresh and conf2 > conf_thresh:
cv2.line(img, (int(x1_k), int(y1_k)), (int(x2_k), int(y2_k)), LIMB_COLOR, 2)
for x, y, conf in kpts:
if conf > conf_thresh:
cv2.circle(img, (int(x), int(y)), 4, KPT_COLOR, -1)
def process_video(video_path):
pose_model = YOLOPose(model_name='yolo11n-pose', device='CPU')
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Cannot open {video_path}")
return
print("Processing Pose... Press ESC to exit.")
prev_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
boxes, scores, kpts = pose_model(frame)
display_img = cv2.resize(frame, (640, 640))
if len(boxes) > 0:
draw_pose(display_img, boxes, scores, kpts)
curr_time = time.time()
fps = 1 / (curr_time - prev_time)
prev_time = curr_time
cv2.putText(display_img, f"FPS: {fps:.1f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
display_img = cv2.resize(display_img, (640, 480))
cv2.imshow("YOLO11n-Pose OpenVINO", display_img)
if cv2.waitKey(1) == 27:
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
video_file = '166959951-1-208.mp4'
if os.path.exists(video_file):
process_video(video_file)
else:
print("Please provide a valid video path.")