import threading
import signal
import time
import av
import logging
from mindx.sdk import base
from mindx.sdk.base import (
VideoDecoder,
VideoDecodeConfig,
VdecCallBacker,
VideoEncoder,
VideoEncodeConfig,
VencCallBacker,
)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
DEFAULT_DEVICE_ID = 0
DEFAULT_CHANNEL_ID = 0
DEFAULT_SAVED_FILE_PATH = "./output"
PULLER_TO_VDEC_QUEUE = []
VDEC_TO_VENC_QUEUE = []
VENC_TO_FILE_SAVE_QUEUE = []
SEND_SIGNAL = False
READ_VIDEO_ENDED = False
VDEC_ENDED = False
VENC_ENDED = False
video_path = "path"
video_width = 1920
video_height = 1080
video_fps = 25
class DecodedFrame:
def __init__(self, image, frame_id, channel_id):
self.image = image
self.frame_id = frame_id
self.channel_id = channel_id
class EncodedFrame:
def __init__(self, data, frame_id, channel_id):
self.data = data
self.frame_id = frame_id
self.channel_id = channel_id
def stop_handler(signum, frame):
global SEND_SIGNAL
SEND_SIGNAL = True
def stream_puller_thread(file_path, width, height):
global SEND_SIGNAL
global READ_VIDEO_ENDED
with av.open(file_path) as container:
frame_id = 0
video_stream = next(s for s in container.streams if s.type == 'video')
if video_stream.height != height:
logging.error("Video height is not equal to configured height.")
SEND_SIGNAL = True
return
if video_stream.width != width:
logging.error("Video width is not equal to configured width.")
SEND_SIGNAL = True
return
for packet in container.demux():
if SEND_SIGNAL:
break
if packet.size == 0:
logging.info("Finish to pull rtsp stream.")
READ_VIDEO_ENDED = True
break
PULLER_TO_VDEC_QUEUE.append(EncodedFrame(packet, frame_id, DEFAULT_CHANNEL_ID))
frame_id += 1
logging.info("*********************StreamPullThread end*********************")
def vdec_thread(video_decoder):
global VDEC_ENDED
while True:
if SEND_SIGNAL or (READ_VIDEO_ENDED and len(PULLER_TO_VDEC_QUEUE) == 0):
break
if not PULLER_TO_VDEC_QUEUE:
continue
encoded_frame = PULLER_TO_VDEC_QUEUE.pop(0)
logging.debug("send packet: %d", encoded_frame.frame_id)
video_decoder.decode(encoded_frame.data, encoded_frame.frame_id)
VDEC_ENDED = True
logging.info("*********************VdecThread end*********************")
def vdec_callback_func(decoded_image, channel_id, frame_id):
VDEC_TO_VENC_QUEUE.append(DecodedFrame(decoded_image, frame_id, channel_id))
def venc_thread(video_encoder):
global VENC_ENDED
while True:
if SEND_SIGNAL or (VDEC_ENDED and len(VDEC_TO_VENC_QUEUE) == 0):
break
if not VDEC_TO_VENC_QUEUE:
continue
decoded_frame = VDEC_TO_VENC_QUEUE.pop(0)
video_encoder.encode(decoded_frame.image, decoded_frame.frame_id)
VENC_ENDED = True
logging.info("*********************VencThread end*********************")
def venc_callback_func(output, output_datasize, channel_id, frame_id):
VENC_TO_FILE_SAVE_QUEUE.append(EncodedFrame(output, frame_id, channel_id))
def save_frame_thread(stream_format):
save_path = DEFAULT_SAVED_FILE_PATH
if stream_format == base.h265_main_level:
save_path = save_path + ".265"
else:
save_path = save_path + ".264"
while True:
if SEND_SIGNAL or (VENC_ENDED and len(VENC_TO_FILE_SAVE_QUEUE) == 0):
break
if not VENC_TO_FILE_SAVE_QUEUE:
continue
encoded_frame = VENC_TO_FILE_SAVE_QUEUE.pop(0)
with open(save_path, 'ab') as file:
file.write(encoded_frame.data)
logging.info("*********************Save frame thread end*********************")
def start_service():
vdec_conf = VideoDecodeConfig()
vdec_conf.width = video_width
vdec_conf.height = video_height
vdec_conf.inputVideoFormat = base.h264_main_level
vdec_conf.outputImageFormat = base.nv12
vdec_callbacker = VdecCallBacker()
vdec_callbacker.registerVdecCallBack(vdec_callback_func)
video_decoder = VideoDecoder(vdec_conf, vdec_callbacker, DEFAULT_DEVICE_ID, DEFAULT_CHANNEL_ID)
venc_conf = VideoEncodeConfig()
venc_conf.width = video_width
venc_conf.height = video_height
venc_conf.inputImageFormat = base.nv12
venc_conf.srcRate = video_fps
venc_conf.outputVideoFormat = base.h264_main_level
venc_conf.displayRate = video_fps
venc_callbacker = VencCallBacker()
venc_callbacker.registerVencCallBack(venc_callback_func)
video_encoder = VideoEncoder(venc_conf, venc_callbacker, DEFAULT_DEVICE_ID)
stream_puller = threading.Thread(
target=stream_puller_thread, kwargs={"file_path": video_path, "width": video_width, "height": video_height}
)
stream_puller.start()
logging.info("*********************stream_puller_thread start*********************")
vdec = threading.Thread(target=vdec_thread, kwargs={"video_decoder": video_decoder})
vdec.start()
logging.info("*********************vdec_thread start*********************")
venc = threading.Thread(target=venc_thread, kwargs={"video_encoder": video_encoder})
venc.start()
logging.info("*********************venc_thread start*********************")
save_frame = threading.Thread(target=save_frame_thread, kwargs={"stream_format": venc_conf.outputVideoFormat})
save_frame.start()
logging.info("*********************save_frame_thread start*********************")
stream_puller.join()
vdec.join()
venc.join()
save_frame.join()
PULLER_TO_VDEC_QUEUE.clear()
VDEC_TO_VENC_QUEUE.clear()
VENC_TO_FILE_SAVE_QUEUE.clear()
time.sleep(1)
if __name__ == '__main__':
base.mx_init()
signal.signal(signal.SIGINT, stop_handler)
start_service()
base.mx_deinit()