"""Tests 执行加速
本模块提供测试用例并行执行加速功能, 支持多进程并发运行测试用例, 主要功能包括:
- 多容器/进程并行执行测试用例, 提高测试效率
- 智能用例排序, 基于历史耗时预估进行负载均衡
- 用例耗时缓存机制, 优化重复执行场景
- 实时执行状态监控与异常处理
- 详细的执行报告与统计信息(包括容器执行摘要, 用例耗时统计, 异常信息等)
- CPU亲和性配置, 优化多CPU环境下的性能
主要类:
- TestsAccelerate: 测试加速的主类, 提供完整的并行执行框架
- CaseDesc: 测试用例描述, 包含名称和预估耗时
- ExecParam: 执行参数配置
- ExecResult: 执行结果统计与报告生成
- CntrContext: 容器/进程执行上下文
- CaseContext: 用例执行上下文
使用示例:
1. 继承 TestsAccelerate 类并实现 _prepare_get_params 方法
2. 调用 prepare() 进行准备工作
3. 调用 process() 执行测试
4. 调用 post() 获取执行结果
"""
import argparse
import dataclasses
import logging
import multiprocessing
import os
import re
import queue
import json
import signal
import subprocess
import sys
import time
from abc import ABC
from datetime import datetime, timezone, timedelta
from multiprocessing import JoinableQueue, Event, Process, Value, cpu_count
from typing import List, Any, Optional, Tuple, Dict, Callable
from pathlib import Path
from utils.args_action import ArgsEnvDictAction
from utils.executable import Exec
from utils.table import Table
class ArgsCaseListAction(argparse.Action):
"""解析命令行参数传入的 cases 字段(适配自定义元信息参数)
"""
def __call__(self, parser: argparse.ArgumentParser, namespace: argparse.Namespace, values: List[str],
option_string: Optional[str] = None) -> None:
case_list = []
for value in values:
cases = [cs.strip() for cs in value.split(':') if cs.strip()]
case_list.extend(cases)
setattr(namespace, self.dest, case_list)
class TestsAccelerate(ABC):
"""Tests 加速
"""
@dataclasses.dataclass
class ExecParam:
"""执行参数
"""
cntr_id: Optional[int] = None
envs_func: Optional[Callable] = None
custom: Optional[Any] = None
def __init__(self, cntr_id: int, envs_func: Optional[Callable] = None, custom: Optional[Any] = None):
self.cntr_id = cntr_id
self.envs_func = envs_func
self.custom = custom
def get_envs(self) -> Optional[Dict[str, str]]:
"""获取额外的环境变量配置
"""
if self.envs_func:
return self.envs_func(self)
return None
@dataclasses.dataclass
class ExecResult:
"""执行结果
"""
cntr_name: str = "Cntr"
act_duration: Optional[timedelta] = None
ori_duration: Optional[timedelta] = None
cntr_max_duration: Optional[timedelta] = None
cntr_min_duration: Optional[timedelta] = None
cntr_execution_details: JoinableQueue = JoinableQueue()
cntr_duration_dict: Dict[int, timedelta] = dataclasses.field(default_factory=dict)
case_execution_details: JoinableQueue = JoinableQueue()
case_exception_details: JoinableQueue = JoinableQueue()
case_terminate_details: JoinableQueue = JoinableQueue()
@property
def revenue_desc(self) -> str:
diff = self.ori_duration - self.act_duration
rate = float(diff / self.act_duration) * 100
desc = f"Revenue(Act/Ori, {self.act_duration.total_seconds():.2f}/"
desc += f"{self.ori_duration.total_seconds():.2f}) {rate:.2f}%"
return desc
@property
def cntr_latency_desc(self) -> str:
diff = self.cntr_max_duration - self.cntr_min_duration
rate = float(diff / self.cntr_min_duration) * 100
desc = f"Latency(Max/Min/Diff, {self.cntr_max_duration.total_seconds():.2f}/"
desc += f"{self.cntr_min_duration.total_seconds():.2f}/{diff.total_seconds():.2f}) {rate:.2f}%"
return desc
@staticmethod
def save_case_duration_to_json(sorted_datas: List[List[Any]],
dump_item_num: int = 100, dump_min_duration: float = 5,
path: Optional[Path] = None):
if path is None:
return
path.parent.mkdir(parents=True, exist_ok=True)
item_num = 0
case_name_idx = 1
duration_idx = 2
duration_dict = {}
for item in sorted_datas:
case_name = item[case_name_idx]
duration = float(item[duration_idx])
duration_dict[case_name] = duration
item_num += 1
if item_num >= dump_item_num:
break
if duration <= dump_min_duration:
break
with path.open("w", encoding="utf-8") as f:
json.dump(duration_dict, f, indent=4)
def get_cntr_exec_info(self) -> Tuple[str, str]:
"""获取 Container 执行信息统计.
:returns:
Tuple[str, str]:
- Container 执行信息统计表(str)
- Container 并行执行收益描述(str)
"""
heads = [self.cntr_name, "Total", "Success", "Failed", "Duration"]
datas = []
self.ori_duration = timedelta()
while not self.cntr_execution_details.empty():
_brief = self.cntr_execution_details.get()
devs_id = int(_brief[0])
case_total = int(_brief[1])
case_pass = int(_brief[2])
case_fail = int(_brief[3])
devs_duration = _brief[-1]
if self.cntr_max_duration is None:
self.cntr_max_duration = devs_duration
self.cntr_max_duration = max(self.cntr_max_duration, devs_duration)
if self.cntr_min_duration is None:
self.cntr_min_duration = devs_duration
self.cntr_min_duration = min(self.cntr_min_duration, devs_duration)
self.cntr_duration_dict[devs_id] = devs_duration
self.ori_duration += devs_duration
datas.append([devs_id, case_total, case_pass, case_fail, f"{devs_duration.total_seconds():.2f}"])
self.cntr_execution_details.task_done()
brief = "\nNone"
if len(datas) != 0:
brief = Table.table(datas=datas, headers=heads)
desc = f"Duration {self.act_duration.total_seconds():.2f} secs, {self.revenue_desc}"
return f"\n\n{self.cntr_name} Execution Brief:{brief}", desc
def get_case_exec_terminate_info(self) -> Tuple[str, int]:
"""获取 Case 执行终止信息.
:returns:
Tuple[str, int]:
- Case 终止执行情况信息
- Case 终止执行数量
"""
heads = ["Idx", self.cntr_name, "CaseName", "Duration"]
datas = []
while not self.case_terminate_details.empty():
_brief = self.case_terminate_details.get()
cntr_id = int(_brief[0])
case_name = str(_brief[1])
case_duration = _brief[2]
datas.append([cntr_id, case_name, f"{case_duration.total_seconds():.2f}"])
self.case_terminate_details.task_done()
brief = "\nNone"
if len(datas) != 0:
datas = [[f"{idx}/{len(datas)}"] + ele for idx, ele in enumerate(datas, start=1)]
brief = Table.table(datas=datas, headers=heads)
return f"\n\nCase Terminate Brief({len(datas)}):{brief}", len(datas)
def get_case_exec_exception_info(self) -> Tuple[str, int]:
"""获取 Case 执行异常信息.
:returns:
Tuple[str, int]:
- Case 异常执行情况信息
- Case 异常执行数量
"""
datas = []
brief = ""
while not self.case_exception_details.empty():
chunk = self.case_exception_details.get()
if len(chunk) != 0:
brief += chunk
else:
datas.append(str(brief))
brief = ""
self.case_exception_details.task_done()
brief = "\nNone" if len(datas) == 0 else ""
for idx, data in enumerate(datas, start=1):
brief += f"\nIdx:{idx}/{len(datas)}\n{data}"
return f"\n\nCase Exception Brief({len(datas)}):{brief}", len(datas)
def get_case_exec_duration_info(self, case_dict: Dict[str, Exec.CaseDesc],
min_print_cnt: Optional[int] = None,
dump_json_path: Optional[Path] = None,
dump_item_num: int = 100,
dump_min_duration: float = 5) -> str:
"""获取 Case 执行耗时统计信息.
:return: Case 执行耗时统计信息.
"""
heads = [self.cntr_name, "CaseName", "Duration", "Estimate", f"Ratio({self.cntr_name})", "Ratio(Total)"]
datas = []
while not self.case_execution_details.empty():
_brief = self.case_execution_details.get()
cntr_id = _brief[0]
case_name = str(_brief[1])
case_duration = _brief[2]
case_desc = case_dict.get(case_name, None)
case_estimate = ""
if case_desc and case_desc.duration:
case_estimate = timedelta(seconds=case_desc.duration).total_seconds()
cntr_duration = self.cntr_duration_dict[cntr_id]
ratio_cntr = float(case_duration / cntr_duration) * 100
ratio_process = float(case_duration / self.act_duration) * 100
datas.append(
[cntr_id, case_name, case_duration.total_seconds(), case_estimate,
f"{case_duration.total_seconds():.2f}/{cntr_duration.total_seconds():.2f} {ratio_cntr:.2f}%",
f"{case_duration.total_seconds():.2f}/{self.act_duration.total_seconds():.2f} "
f"{ratio_process:.2f}%"])
self.case_execution_details.task_done()
brief = "\nNone"
add_desc = ""
if len(datas) != 0:
duration_idx = 2
datas = sorted(datas, key=lambda x: x[duration_idx], reverse=True)
for item in datas:
item[duration_idx] = f"{item[duration_idx]:.2f}"
self.save_case_duration_to_json(sorted_datas=datas, path=dump_json_path,
dump_item_num=dump_item_num, dump_min_duration=dump_min_duration)
if min_print_cnt:
print_cnt = min_print_cnt + 50
ori_len = len(datas)
datas = datas[:print_cnt]
cur_len = len(datas)
if ori_len > cur_len:
hidden_cnt = ori_len - cur_len
hidden_first_data = datas[-1]
add_desc = f"\n({hidden_cnt} durations <= {hidden_first_data[2]}s hidden.)"
brief = Table.table(datas=datas, headers=heads, auto_sort=False)
return f"\n\nCase Duration Brief:{brief}" + add_desc
@dataclasses.dataclass
class CntrContext:
"""Cntr处理上下文
"""
cntr_id: int = 0
exec_param: Optional[Any] = None
success: int = 0
failed: int = 0
ts: Optional[datetime] = None
exit_code: int = 0
def __init__(self, cntr_id: int, exec_param):
self.cntr_id = cntr_id
self.exec_param = exec_param
self.ts = datetime.now(tz=timezone.utc)
@property
def total(self) -> int:
return self.success + self.failed
@property
def brief(self) -> List[Any]:
return [self.cntr_id, self.total, self.success, self.failed, (datetime.now(tz=timezone.utc) - self.ts)]
@dataclasses.dataclass
class CaseContext:
"""Case处理上下文
"""
cntr_id: int = 0
exec_param: Optional[Any] = None
ts: Optional[datetime] = None
case_name: str = ""
def __init__(self, cntr_id: int, exec_param, case_name):
self.cntr_id = cntr_id
self.exec_param = exec_param
self.case_name = case_name
self.ts = datetime.now(tz=timezone.utc)
@property
def brief(self) -> List[Any]:
return [self.cntr_id, self.case_name, (datetime.now(tz=timezone.utc) - self.ts)]
@dataclasses.dataclass
class MoveContext:
"""Move进程处理上下文
"""
ele_count: int
src_queue: JoinableQueue
dst_queue: JoinableQueue
def __init__(self, src: JoinableQueue, dst: JoinableQueue):
self.ele_count = 0
self.src_queue = src
self.dst_queue = dst
def move(self, timeout: int = 1) -> bool:
try:
ele = self.src_queue.get(timeout=timeout)
self.src_queue.task_done()
if ele is None:
return False
if isinstance(ele, str):
if len(ele) == 0:
self.ele_count += 1
else:
self.ele_count += 1
self.dst_queue.put(ele)
except (queue.Empty, KeyboardInterrupt):
pass
return True
def __init__(self, args, scene_mark: str, cntr_name: str):
"""
:param args: 命令行参数
:param cntr_name: 容器名称, 用于回显内容
"""
self.mark: str = scene_mark
self.exe: Exec = Exec(file=args.target[0], envs=args.envs, timeout=args.timeout_case)
self.exe_params: List[TestsAccelerate.ExecParam] = []
self.exe_result: TestsAccelerate.ExecResult = TestsAccelerate.ExecResult(cntr_name=cntr_name)
self.exe_timeout: Optional[int] = args.timeout
self.exe_halt_on_error: bool = args.halt_on_error
self.case_duration_json: Path = self._init_get_case_duration_json(args=args)
self.case_duration_max_num: int = self._init_get_case_duration_max_num(args=args)
self.case_duration_min_sec: float = self._init_get_case_duration_min_sec(args=args)
self.case_list: List[Exec.CaseDesc] = []
self.case_dict: Dict[str, Exec.CaseDesc] = {}
self.case_ordered_cnt: int = 0
self.case_ordered_cnt, self.case_list, self.case_dict = self.exe.get_case_name_info(
case_name_list=args.cases, duration_json=self.case_duration_json
)
self.case_queue: JoinableQueue = JoinableQueue()
self.case_execution_queue: JoinableQueue = JoinableQueue()
self.case_exception_queue: JoinableQueue = JoinableQueue()
self.case_terminate_queue: JoinableQueue = JoinableQueue()
self.case_exec_count = Value('i', 0)
self.cntr_name: str = cntr_name
self.cntr_execution_queue: JoinableQueue = JoinableQueue()
self.cntr_terminate_event = Event()
self.cntr_exit_count = Value('i', 0)
self.cpu_rank_size: Optional[int] = self._init_get_cpu_rank_size(args=args)
self.cpu_affinity_policy: Optional[int] = None
@property
def brief(self) -> List[Any]:
ver = sys.version_info
lst = [
["Python3", f"{sys.executable} ({ver.major}.{ver.minor}.{ver.micro})"],
["Timeout", self.exe_timeout],
["HaltOnError", self.exe_halt_on_error],
[f"{self.cntr_name}Num", self.cntr_num],
[f"{self.cntr_name}List", [p.cntr_id for p in self.exe_params]],
["CaseNum", self.case_num],
["CaseTimeout", self.exe.timeout],
["CaseDurationFile", self.case_duration_json],
["CaseDurationMaxNum", self.case_duration_max_num],
["CaseDurationMinSecs", self.case_duration_min_sec],
["Executable", self.exe.file],
]
if self.cpu_rank_size:
lst.append(["CpuRankSize", self.cpu_rank_size])
lst.append(["CpuAffinityPolicy", f"{self.cpu_affinity_policy_str}({self.cpu_affinity_policy})"])
return lst
@property
def cntr_num(self) -> int:
return len(self.exe_params)
@property
def case_num(self) -> int:
return len(self.case_list)
@property
def cpu_affinity_policy_str(self) -> str:
if not self.cpu_affinity_policy:
return "Disable"
elif self.cpu_affinity_policy == 1:
return "Even Allocation"
elif self.cpu_affinity_policy == 2:
return "Cyclic Reuse Allocation"
else:
return "Unknown"
@staticmethod
def reg_args(parser: argparse.ArgumentParser):
"""注册命令行参数
注意事项:
1. 本函数应与 get_container_manager 函数协同使用;
2. 本函数注册了 'cases' 字段, 但 get_container_manager 内不会解析处理, 该字段应由使用者解析处理;
:param parser: ArgumentParser 外部创建
"""
parser.add_argument("-t", "--target", nargs=1, type=str, required=True,
help="Specific target executable file path.")
parser.add_argument("-e", "--env",
nargs="+", action=ArgsEnvDictAction, default={}, dest="envs",
help="Specify additional environment variables to set when executing the target.")
parser.add_argument("--timeout", nargs="?", type=int, default=None,
help="Timeout for executing all cases.")
parser.add_argument("--timeout_case", nargs="?", type=int, default=None,
help="Timeout for executing single case.")
parser.add_argument("--halt_on_error", action="store_true", default=False,
help="If any case failed, subsequent cases are not executed.")
parser.add_argument("-c", "--cases",
nargs='*', action=ArgsCaseListAction, default=[], required=False,
help="Cases, multiple cases are separated by ':'")
parser.add_argument("--cpu_rank_size", nargs="?", type=int, default=None,
help="Specify the rank size for CPU affinity grouping.")
parser.add_argument("--dump_case_duration_json", nargs="?", type=Path, default=None,
help="Specify the path to the case duration json cache file.")
parser.add_argument("--dump_case_duration_max_num", nargs="?", type=int, default=None,
help="Maximum number of cases to dump to duration json cache.")
parser.add_argument("--dump_case_duration_min_secends", nargs="?", type=int, default=None,
help="Minimum duration (in seconds) for cases to dump to duration json cache.")
@staticmethod
def _init_get_cpu_rank_size(args) -> Optional[int]:
cpu_rank_size = None
if args.cpu_rank_size:
cpu_rank_size = args.cpu_rank_size
else:
cpu_rank_size_str = os.environ.get("PYPTO_TESTS_CASE_EXECUTE_CPU_RANK_SIZE", None)
if cpu_rank_size_str:
cpu_rank_size = int(cpu_rank_size_str)
if cpu_rank_size and cpu_rank_size > 0:
return cpu_rank_size
return None
@staticmethod
def _init_get_case_duration_json(args) -> Path:
"""初始化 case_duration_json
命令行参数优先, 然后是环境变量, 最后是默认值
"""
if args.dump_case_duration_json:
return args.dump_case_duration_json.resolve()
env_json_path = os.environ.get("PYPTO_TESTS_DUMP_CASE_DURATION_JSON", None)
if env_json_path:
return Path(env_json_path).resolve()
tagert = Path(args.target[0])
return tagert.parent / f"{tagert.stem}_duration.json"
@staticmethod
def _init_get_case_duration_max_num(args) -> int:
"""初始化 case_duration_max_num
命令行参数优先, 然后是环境变量, 最后是默认值
"""
if args.dump_case_duration_max_num is not None:
return args.dump_case_duration_max_num
env_max_num = os.environ.get("PYPTO_TESTS_DUMP_CASE_DURATION_MAX_NUM", None)
if env_max_num:
return int(env_max_num)
return 500
@staticmethod
def _init_get_case_duration_min_sec(args) -> float:
"""初始化 case_duration_min_sec
命令行参数优先, 然后是环境变量, 最后是默认值
"""
if args.dump_case_duration_min_secends is not None:
return float(args.dump_case_duration_min_secends)
env_min_sec = os.environ.get("PYPTO_TESTS_DUMP_CASE_DURATION_MIN_SECONDS", None)
if env_min_sec:
return float(env_min_sec)
return 5.0
@staticmethod
def _move(src: JoinableQueue, dst: JoinableQueue):
TestsAccelerate._set_process_desc()
ctx = TestsAccelerate.MoveContext(src=src, dst=dst)
while True:
if not ctx.move():
break
logging.info("%s Exist, Move %s elements.", TestsAccelerate._get_process_desc(), ctx.ele_count)
@staticmethod
def _get_process_desc() -> str:
cur_process = multiprocessing.current_process()
return f"{cur_process.name}"
@staticmethod
def _set_process_desc():
try:
import setproctitle
setproctitle.setproctitle(TestsAccelerate._get_process_desc())
except ModuleNotFoundError:
pass
def prepare(self):
"""执行准备
"""
self.exe_params = self._prepare_get_params()
if self.cntr_num == 0:
raise ValueError("ExecParams is empty, won't run any task.")
if self.cntr_num > self.case_num:
logging.info("CaseNum(%s) less than len(ExecParams)=%s, will only start the first %s %s.",
self.case_num, self.cntr_num, self.case_num, self.cntr_name)
self.exe_params = self.exe_params[:self.case_num]
self._prepare_determine_cpu_affinity_policy()
def process(self):
"""执行任务
"""
logging.info("\n\n%s Accelerate Args:%s", self.mark, Table.table(datas=self.brief))
ts = datetime.now(tz=timezone.utc)
self._main()
self.exe_result.act_duration = datetime.now(tz=timezone.utc) - ts
def post(self) -> bool:
"""后处理, 获得执行结果汇总
"""
cntr_exec_brief, cntr_revenue_desc = self.exe_result.get_cntr_exec_info()
case_exec_brief, case_exec_result = self._post_case_exec_info()
out = f"{self.mark}, HaltOnError({self.exe_halt_on_error}), {cntr_revenue_desc}"
out += cntr_exec_brief
out += case_exec_brief
if case_exec_result:
logging.info(out)
logging.info("Use %s %s | Exec %s case | %s | %s",
self.cntr_num, self.cntr_name, self.case_num,
self.exe_result.revenue_desc, self.exe_result.cntr_latency_desc)
else:
logging.error(out)
return case_exec_result
def _prepare_determine_cpu_affinity_policy(self):
"""初始化 CPU 亲和性策略
策略确定需要依赖的 CntrNum 等参数无法在类构造阶段确定, 故本流程延迟到 prepare 阶段处理
"""
self.cpu_affinity_policy = None
if self.cpu_rank_size and self.cpu_rank_size > 0:
if self.cntr_num * self.cpu_rank_size <= cpu_count():
self.cpu_affinity_policy = 1
else:
self.cpu_affinity_policy = 2
logging.info("Determine CpuAffinity, Policy=%s(%s), CntrNum=%s, CpuNum=%s, CpuRankSize=%s",
self.cpu_affinity_policy_str, self.cpu_affinity_policy,
self.cntr_num, cpu_count(), self.cpu_rank_size)
def _prepare_get_params(self) -> List[ExecParam]:
return []
def _post_case_exec_info(self) -> Tuple[str, bool]:
"""获取 Case 执行信息.
:returns:
Tuple[str, bool]:
- Case 执行情况信息
- Case 执行成功与否判定结果
"""
terminate_brief, terminate_count = self.exe_result.get_case_exec_terminate_info()
exception_brief, exception_count = self.exe_result.get_case_exec_exception_info()
duration_brief = self.exe_result.get_case_exec_duration_info(
case_dict=self.case_dict, min_print_cnt=self.case_ordered_cnt,
dump_json_path=self.case_duration_json, dump_item_num=self.case_duration_max_num,
dump_min_duration=self.case_duration_min_sec)
remaining_count = 0
while not self.case_queue.empty():
cs = self.case_queue.get()
if cs is not None:
remaining_count += 1
self.case_queue.task_done()
success_count = self.case_num - remaining_count - terminate_count - exception_count
execution_heads = ["Total", "Success", "Failed", "Terminate", "Remaining"]
execution_datas = [[self.case_num, success_count, exception_count, terminate_count, remaining_count]]
execution_brief = Table.table(datas=execution_datas, headers=execution_heads)
execution_brief = f"\n\nCase Execution Brief:{execution_brief}"
rst = (terminate_count + exception_count + remaining_count) == 0
out = execution_brief + duration_brief + terminate_brief + exception_brief
return out, rst
def _main(self):
"""用例执行, 管理执行状态(主进程)
:return: 执行成功与否
"""
cntr_step = 1
cntr_process_group = []
try:
self._push_all_case_sync()
self._start_move_process_grp()
cntr_process_group = self._start_cntr_process_grp()
self._join_cntr_process_grp(cntr_process_grp=cntr_process_group, step=cntr_step)
except KeyboardInterrupt:
logging.info("MainProcess Recv download terminate event.")
finally:
self._stop_cntr_process_grp(cntr_process_grp=cntr_process_group, timeout=cntr_step)
self._stop_move_process_grp()
def _push_all_case_sync(self):
"""以同步方式将待执行用例插入待执行队列, 按 Container 数量插入终止信号
"""
for cs in self.case_list:
self.case_queue.put(cs.name)
for _ in range(self.cntr_num):
self.case_queue.put(None)
def _start_move_process_grp(self) -> List[Process]:
"""启动 Move 进程组
:return: Move 进程列表
"""
move_grp = []
desc_list = self._get_move_process_grp_desc_list()
for name, src_queue, dst_queue in desc_list:
process = Process(name=f"MoveProcess({name})", target=self._move, args=(src_queue, dst_queue,))
process.start()
move_grp.append(process)
return move_grp
def _stop_move_process_grp(self):
"""停止 Move 进程组
"""
desc_list = self._get_move_process_grp_desc_list()
for _, src_queue, _ in desc_list:
src_queue.put(None)
src_queue.join()
def _get_move_process_grp_desc_list(self) -> List[Tuple[str, JoinableQueue, JoinableQueue]]:
pairs = [
("CaseExecution", self.case_execution_queue, self.exe_result.case_execution_details),
("CaseException", self.case_exception_queue, self.exe_result.case_exception_details),
("CaseTerminate", self.case_terminate_queue, self.exe_result.case_terminate_details),
(f"{self.cntr_name}Execution", self.cntr_execution_queue, self.exe_result.cntr_execution_details),
]
return pairs
def _start_cntr_process_grp(self, delay: int = 2) -> List[Process]:
"""启动 Cntr 进程组
:param delay: 各 Cntr 启动后, 处理具体 Case 前延迟时长, 在多消费者模式下, 各消费者启动时增加一定延迟, 等待所有消费者启动完成
:return: Cntr 进程组
"""
process_group: List[Process] = []
for exec_param in self.exe_params:
process = Process(name=f"{self.cntr_name}Process({self.cntr_name}[{exec_param.cntr_id}])",
target=self._cntr, args=(exec_param.cntr_id, exec_param, delay,))
process_group.append(process)
process.start()
return process_group
def _join_cntr_process_grp(self, cntr_process_grp: List[Process], step: int = 1):
"""以同步方式等待 Cntr 进程组完成
:param cntr_process_grp: Cntr 进程组
:param step: 内部检测步长, 单位为秒
"""
s_time = time.time()
while True:
if not self._wait_cntr_one_step(cntr_process_grp=cntr_process_grp, s_time=s_time, step=step):
break
def _wait_cntr_one_step(self, cntr_process_grp: List[Process], s_time, step: int = 1) -> bool:
"""阻塞当前进程, 检测 Cntr 进程组完成情况
:param cntr_process_grp: Cntr 进程组
:param s_time: 进程组启动时间
:param step: 检测步长
:return: 是否要继续检测
"""
time.sleep(step)
need_next_step = True
timeout = int(time.time() - s_time) > self.exe_timeout if self.exe_timeout else False
if timeout:
self.cntr_terminate_event.set()
need_next_step = False
time.sleep(step)
alive_process_count = 0
for process in cntr_process_grp:
if process.is_alive():
if timeout:
logging.info("%s timeout, terminate it.", process.name)
os.kill(process.pid, signal.SIGINT)
alive_process_count += 1
continue
if process.exitcode != 0 and self.exe_halt_on_error:
need_next_step = False
logging.info("MainProcess Recv %s upload terminate event", process.name)
break
need_next_step = False if alive_process_count == 0 else need_next_step
if not need_next_step:
self._stop_cntr_process_grp(cntr_process_grp=cntr_process_grp, timeout=step)
return need_next_step
def _stop_cntr_process_grp(self, cntr_process_grp: List[Process], timeout: int = 1):
"""停止 Cntr 进程组
:param cntr_process_grp: Cntr 进程组
:param timeout: 等待退出超时时长
"""
self.cntr_terminate_event.set()
for process in cntr_process_grp:
if process.is_alive():
process.join(timeout=timeout)
if process.is_alive():
os.kill(process.pid, signal.SIGINT)
logging.info("MainProcess Send download terminate event to %s.", process.name)
process.join(timeout=timeout)
def _cntr(self, cntr_id: int, exec_param, delay: int):
"""Container 进程
说明:
1. Container 进程执行时, 不会产生 Exception, 用例执行异常信息会上报至异常信息队列;
2. Container 进程在任务队列为空, 或异常终止事件被设置时退出;
:param cntr_id: ContainerId
:param exec_param: ContainerParam
"""
self._set_process_desc()
self._cntr_set_cpu_affinity(cntr_id=cntr_id)
ctx = TestsAccelerate.CntrContext(cntr_id=cntr_id, exec_param=exec_param)
try:
time.sleep(delay)
while not self.cntr_terminate_event.is_set():
case_name = self._cntr_get_case()
if case_name is None:
break
need_next = self._cntr_deal_case(case_name=case_name, ctx=ctx)
if not need_next:
break
except KeyboardInterrupt:
pass
self._put_cntr_execution_info(info=ctx.brief)
if not ctx.exit_code:
logging.info("%s Send terminate event upload.", self._get_process_desc())
logging.info("%s Exist[%s] %s %s",
self._get_process_desc(), ctx.exit_code,
self._cntr_progress(update=True), self._case_progress(update=False))
exit(ctx.exit_code)
def _cntr_get_case(self) -> Optional[str]:
"""获取待执行用例
:return: 待执行用例名, None 表示无待执行用例
"""
try:
case_name = self.case_queue.get()
self.case_queue.task_done()
except queue.Empty:
case_name = None
except KeyboardInterrupt:
case_name = None
return case_name
def _cntr_deal_case(self, case_name: str, ctx: CntrContext) -> Optional[bool]:
"""处理单个 Case
:param case_name: 用例名称
:param ctx: Cntr 处理上下文
:return: 需要继续处理下个 Case
"""
process = None
try:
process = Process(name=f"CaseProcess({self.cntr_name}[{ctx.cntr_id}] Case[{case_name}])",
target=self._case, args=(ctx.cntr_id, ctx.exec_param, case_name,))
process.start()
process.join()
except KeyboardInterrupt:
if process and process.is_alive():
logging.info("%s Recv terminate event download, stop running Case[%s]",
self._get_process_desc(), case_name)
os.kill(process.pid, signal.SIGINT)
process.join()
finally:
need_next = self._cntr_deal_case_finally(process=process, case_name=case_name, ctx=ctx)
return need_next
def _cntr_deal_case_finally(self, process: Process, case_name: str, ctx: CntrContext) -> bool:
"""处理单个 Case 结束
:param process: CaseProcess
:param case_name: 用例名称
:param ctx: Cntr 处理上下文
:return: 需要继续处理下个 Case
"""
if process is None:
return False
if process.exitcode == 0:
ctx.success += 1
return True
ctx.failed += 1
if not self.exe_halt_on_error:
return True
self.cntr_terminate_event.set()
ctx.exit_code = process.exitcode
logging.info("%s Recv Case[%s] upload terminate event.", self._get_process_desc(), case_name)
return False
def _execute_case(self, ctx: CaseContext, param: ExecParam,
case_name: str) -> Tuple[subprocess.CompletedProcess, str, timedelta]:
"""统一的用例执行入口 - 由子类重写此方法实现不同模式"""
return self.exe.run(params=[f"--gtest_filter={case_name}"], envs=param.get_envs())
def _cntr_set_cpu_affinity(self, cntr_id: int):
"""在 Cntr 启动初期, 设置 CPU 亲和性
将 CPU 亲和性配置在 Cntr 进程, 则该 Cntr 所执行的 Case 都会继承该配置
"""
if not self.cpu_affinity_policy:
return
if self.cpu_affinity_policy == 1:
group_idx = cntr_id
else:
cpu_rank_num = cpu_count() // self.cpu_rank_size
group_idx = cntr_id % cpu_rank_num
start_core = group_idx * self.cpu_rank_size
end_core = min(start_core + self.cpu_rank_size, cpu_count())
cpu_core_list = [int(i) for i in range(start_core, end_core)]
try:
os.sched_setaffinity(0, cpu_core_list)
except OSError as e:
logging.error("%s[%s] Failed to set CPU affinity: %s", self.cntr_name, cntr_id, e)
current_affinity = os.sched_getaffinity(0)
logging.debug("%s[%s] cpu affinity cores: %s", self.cntr_name, cntr_id, current_affinity)
def _case(self, cntr_id: int, param: ExecParam, case_name: str):
"""具体用例执行进程
通过子进程实现各 Case 执行上下文隔离, 避免 Case 间相互影响
:param cntr_id: Container ID
:param case_name: 用例名称
"""
self._set_process_desc()
ctx = TestsAccelerate.CaseContext(cntr_id=cntr_id, exec_param=param, case_name=case_name)
run_desc = f"Run {self.mark}{self.exe.brief} Case({case_name})"
try:
logging.info("%s[%s] [BGN] %s", self.cntr_name, cntr_id, run_desc)
ret, cmd, _ = self._execute_case(ctx, param, case_name)
if ret.returncode:
self._case_exception_exit(cntr_id=cntr_id, cmd=cmd,
ret_code=ret.returncode, out=ret.stdout, err=ret.stderr)
else:
msg = f"{ret.stdout}\n{ret.stderr}"
logging.info("%s[%s] [END] %s %s Output Below:\n%s",
self.cntr_name, cntr_id, run_desc, self._case_progress(update=True), msg)
self._put_case_execution_info(info=ctx.brief)
except subprocess.TimeoutExpired as e:
self._put_case_terminate_info(info=ctx.brief)
self._case_exception_exit(cntr_id=cntr_id, cmd=str(e),
ret_code=1, out=None, err=str(e.output))
except KeyboardInterrupt:
self._put_case_terminate_info(info=ctx.brief)
logging.info("%s Recv terminate event download, stop running.", self._get_process_desc())
def _case_exception_exit(self, cntr_id: int, cmd: str, ret_code: int,
out: Optional[str] = None, err: Optional[str] = None):
"""用例执行进程异常退出处理
:param cntr_id: CntrId
:param cmd: 失败命令行
:param ret_code: 进程退出码
:param out: 输出信息
:param err: 异常信息
"""
msg = (f"{self.cntr_name} : {cntr_id}\n"
f"Cmd : {cmd}\n"
f"RetCode : {ret_code}\n"
f"stdout :\n{out}\n"
f"stderr :\n{err}")
self._put_case_exception_info(info=msg)
if self.exe_halt_on_error:
self.cntr_terminate_event.set()
logging.info("%s Send terminate event upload.", self._get_process_desc())
exit(ret_code)
def _cntr_progress(self, update=True) -> str:
"""获取 Container 处理进展, 调用本函数前, 由调用方加锁(dfx_output_lock)
"""
if update:
with self.cntr_exit_count.get_lock():
self.cntr_exit_count.value += 1
cnt = int(self.cntr_exit_count.value)
pgs = cnt / self.cntr_num * 100
return f"{self.cntr_name}Progress[{cnt}/{self.cntr_num} {pgs:.2f}%]"
def _case_progress(self, update=True) -> str:
"""获取 Case 处理进展, 调用本函数前, 由调用方加锁(dfx_output_lock)
"""
if update:
with self.case_exec_count.get_lock():
self.case_exec_count.value += 1
cnt = int(self.case_exec_count.value)
pgs = cnt / self.case_num * 100
return f"CaseProgress[{cnt}/{self.case_num} {pgs:.2f}%]"
def _put_case_execution_info(self, info: List[Any]):
self.case_execution_queue.put(info)
def _put_case_exception_info(self, info: str, chunk_size: int = 4096):
for i in range(0, len(info), chunk_size):
self.case_exception_queue.put(info[i:i + chunk_size])
self.case_exception_queue.put("")
def _put_case_terminate_info(self, info: List[Any]):
self.case_terminate_queue.put(info)
def _put_cntr_execution_info(self, info: List[Any]):
self.cntr_execution_queue.put(info)