import argparse
import configparser
import locale
import os
import sys
import threading
import timeit
import re
import platform
import shlex
from functools import wraps
from pathlib import Path
from typing import Set
RUN_FLAG = "RUN"
EXEC_FLAG = "EXEC"
PIPE_KEY = 'PIPE'
SEPARATE_FILE_DIRECTIVE = 'SEPARATE-FILE'
EXIT_CODE = 'exit_code'
ERRCHECK_FLAG = "ERRCHECK"
ASSERT_FLAG = "ASSERT"
EXPECT_FLAG = "EXPECT"
DEPENDENCE_FLAG = "DEPENDENCE"
TIMEOUT_FLAG = "TIMEOUT"
LEVEL_FLAG = "LEVEL"
FLAGS = [RUN_FLAG, EXEC_FLAG, SEPARATE_FILE_DIRECTIVE, ERRCHECK_FLAG, ASSERT_FLAG,
DEPENDENCE_FLAG, TIMEOUT_FLAG, EXPECT_FLAG, LEVEL_FLAG]
PASS = "PASS"
FAIL = "FAIL"
XFAIL = "XFAIL"
XPASS = "XPASS"
NOT_RUN = "NOT_RUN"
UNRESOLVED = "UNRESOLVED"
UNSUPPORTED = "UNSUPPORTED"
LOGIC_KEY_WORD = ["and", "or", "not", "True", "False"]
UNSUCCESSFUL = [FAIL, UNSUPPORTED, UNRESOLVED, NOT_RUN]
ALL = [PASS, FAIL, XFAIL, XPASS, UNSUPPORTED, UNRESOLVED]
DEFAULT_PRINT = [PASS, FAIL, XFAIL, XPASS, UNSUPPORTED]
RESULT = {
PASS: 0,
FAIL: 0,
NOT_RUN: 0,
UNRESOLVED: 0,
XFAIL: 0,
XPASS: 0,
}
BASE_DIR = Path(__file__).parent.absolute()
ENCODING = "utf-8"
OS_SEP = os.path.sep
OS = platform.system()
if OS == 'Windows' and ' ' in sys.executable:
EXECUTABLE = '"' + sys.executable + '"'
else:
EXECUTABLE = sys.executable
COMPARE = BASE_DIR / "compare.py"
PRINT_LOCK = threading.Lock()
def read_file(file_path):
"""Read files based on encoding and return all file lines
:param file_path: Path
:param encoding: str
:return:
"""
lines = []
with file_path.open(encoding="utf-8") as file:
all_lines = file.readlines()
for line in all_lines:
if line.strip():
lines.append(line.strip())
return lines
def read_config(file_path):
"""Read config file based on encoding and return test config"""
if not file_path.exists() or not file_path.is_file():
return None
config = configparser.ConfigParser()
config.optionxform = str
config.read(str(file_path), encoding="utf-8")
return config
def get_config_value(config, section, option):
"""Read config value from test config"""
try:
return config[section][option]
except KeyError:
return None
def config_section_to_dict(config, section):
try:
return {k: v for k, v in config.items(section)}
except configparser.NoSectionError:
return {}
def config_section_to_set(config, section):
try:
res = set()
for k, v in config.items(section):
temp = str(v).strip().split()
for x in temp:
if x in LOGIC_KEY_WORD:
raise Exception('[ERROR]Condition Key Word can not be "and","not","or"! Please modify!')
res.add(x)
return res
except configparser.NoSectionError:
return {}
def ls_all(path, suffix=None):
"""Output all files in a directory"""
all_files = []
_path = complete_path(path)
if _path.is_file() and is_case(_path, suffix):
return [_path]
for name, _, files in os.walk(str(_path)):
for file in files:
if is_case(Path(name) / file, suffix):
all_files.append(Path(name) / file)
return all_files
def is_case(path, suffix):
"""Determine if it is a test case based on the suffix
:param suffix: tuple
:param path: Path
:return:
"""
if suffix is None:
return True
elif isinstance(suffix, str):
return path.suffix[1:] == suffix
return path.suffix[1:] in suffix
def split_and_complete_path(paths):
"""Split the paths and returns the canonical path of each path"""
canonicalPaths = []
for path in paths.split(","):
canonicalPaths.append(complete_path(path))
return canonicalPaths
def complete_path(path):
"""Returns the canonical path of a path"""
path = Path(path)
if not path.exists():
return Path(os.path.realpath(str(path)))
return path.expanduser().resolve()
def filter_line(line, flag=None, condition=None):
"""Returns the line starting with the flag"""
if flag is None:
return line
if condition == None:
line_flag = line.strip().split(":")[0].strip()
if line_flag == flag:
new_line = line.strip()[len(flag) + 1:].strip().lstrip(":").strip()
return new_line
return None
else:
line_flag = line.strip().split(":")[0].strip()
cur_condition = re.match(r'\([0-9a-zA-Z_\|\&\!\(\) ]+\)', line_flag)
if cur_condition == None:
if line_flag == flag:
new_line = line.strip()[len(flag) + 1:].strip().lstrip(":").strip()
return new_line
return None
else:
cur_con = cur_condition.group()
if process_condition(cur_con, condition):
new_line_flag = line_flag.replace(cur_con, "").strip()
if new_line_flag == flag:
new_line = line.strip()[len(line_flag) + 1:].strip().lstrip(":").strip()
return new_line
else:
return None
def filter_command_number(command: str):
cmd = {}
cmd.setdefault(EXIT_CODE, 0)
cmd.setdefault(PIPE_KEY, False)
if not re.match(r'^EXEC(-PIPE)?(-(-)?\d+)?$', command):
sys.exit("""[ERROR] Only accept command like EXEC(-PIPE)(-number)!
Please check your command : {} """.format(command))
number = 0
if '-' in command:
temp = command.split('-')
if command.count('-') == 3:
number = - int(temp[-1])
cmd[PIPE_KEY] = True
elif command.count('-') == 2:
if len(temp[1]) == 0:
number = - int(temp[-1])
else:
number = int(temp[-1])
cmd[PIPE_KEY] = True
elif command.count('-') == 1:
if temp[-1] == 'PIPE':
number = 0
cmd[PIPE_KEY] = True
else:
number = int(temp[-1])
cmd[EXIT_CODE] = number
return cmd
def process_exit_code(number):
if number < 0:
if sys.platform == 'linux':
return 0x80 - number
else:
return 0x80000000 - number
return number
def parse_condition_from_str_to_set(condition: str):
res = set()
if not condition:
return res
temp = condition.split(',')
for x in temp:
x = x.strip().strip("'")
res.add(x)
return res
def parse_level_from_str_to_set(level: str):
res = set()
if not level:
return level
temp = level.split(',')
for x in temp:
x = x.strip()
if not x.isdigit() or not (0 <= int(x) <= 4):
raise argparse.ArgumentTypeError("{} is not a level, the level should be between 0-4".format(x))
res.add(x)
return res
def process_condition(cur_con: str, conditions: Set[str]) -> bool:
"""
Parse conditional expressions
Args:
cur_con: conditional expressions
conditions
Returns:
bool
Raises:
ValueError
"""
original_expr = cur_con
cur_con = (
cur_con.replace("&", " and ")
.replace("|", " or ")
.replace("!", " not ")
)
words = re.findall(r'[0-9a-zA-Z_]+', cur_con)
logic_ops = {'and', 'or', 'not'}
for word in words:
if word not in logic_ops:
replacement = 'True' if word in conditions else 'False'
cur_con = re.sub(rf'\b{word}\b', replacement, cur_con)
remaining_words = re.findall(r'[a-zA-Z_]+', cur_con)
allowed_words = logic_ops | {'True', 'False'}
for word in remaining_words:
if word not in allowed_words:
raise ValueError(f"Illegal word '{word}' in expression: '{original_expr}'")
try:
return eval(cur_con)
except SyntaxError as e:
raise ValueError(f"Invalid syntax in '{original_expr}': {e}")
def precheck_flag(line_flag):
res = all(x not in line_flag for x in FLAGS)
return res
def filter_level(line: str, level: set) -> bool:
return line in level
def filter_command_line(line: str, conditions: set, run_script: str, compatible: bool):
"""Returns and updates the command line starting with the flag"""
line_flag = line.strip().split(":")[0].strip()
if precheck_flag(line_flag):
return None
cur_con = re.match(r'\([0-9a-zA-Z_\|\&\!\(\) ]+\)', line_flag)
if cur_con != None:
target = cur_con.group()
if not process_condition(target, conditions):
return None
else:
line = line.replace(target, " " * len(target))
line_flag = line_flag.replace(target, "").strip()
new_line = line.strip()[len(line_flag) + 1:].strip().lstrip(":").strip()
def escape_cmd(cmd: str):
espace_words = '\\"$`'
for i in espace_words:
cmd = cmd.replace(i, '\\{}'.format(i))
return cmd
if (line_flag.startswith(EXEC_FLAG) or line_flag.startswith(ERRCHECK_FLAG)) and compatible:
return None
if line_flag.startswith(RUN_FLAG):
line_flag = line_flag[len(RUN_FLAG) + 1:]
if run_script is not None:
redirect = False
if '|' in run_script and 'ignore_cmd_redirect' in run_script:
run_script = run_script[:run_script.rfind('|')]
redirect = True
if '|' in new_line and 'compare' in new_line:
compare_part = new_line[new_line.rfind('|'):]
actual_run_cmd = new_line[:new_line.rfind('|')]
new_line = '{} "{}" {}'.format(run_script, escape_cmd(actual_run_cmd), compare_part)
if redirect:
if '>' in actual_run_cmd:
cmd_list = actual_run_cmd.split()
index = 0
for i, cmd in enumerate(cmd_list):
if '%run' in cmd:
index = i
break
for i, cmd in enumerate(cmd_list):
if '>' in cmd and i > index:
index = i
break
actual_cmd = ' '.join(cmd_list[:index])
redirect_cmd = ' '.join(cmd_list[index:])
else:
actual_cmd = actual_run_cmd
redirect_cmd = ''
if platform.system() == "Windows":
new_line = '{} "{}" {} {}'.format(run_script, escape_cmd(actual_cmd), redirect_cmd, compare_part)
else:
new_line = '{} "{}" {} {}'.format(run_script, escape_cmd(actual_cmd), redirect_cmd, compare_part)
else:
new_line = '{} "{}"'.format(run_script, escape_cmd(new_line))
if line_flag.startswith(EXEC_FLAG) and new_line:
cmd = filter_command_number(line_flag)
cmd.setdefault('cmd', new_line)
if cmd[PIPE_KEY]:
if new_line.count('|') != 1:
cmd[EXIT_CODE] = 0
cmd[PIPE_KEY] = False
else:
new_lines = new_line.split('|')
cmd['cmd'] = new_lines
return cmd
elif line_flag == ERRCHECK_FLAG:
cmd = {}
cmd.setdefault('exit_code', 1)
cmd.setdefault('PIPE', True)
if platform.system() == "Windows":
cmd['cmd'] = (new_line + " 2>&1 1>nul | compare %f").split('|')
else:
cmd['cmd'] = (new_line + " 2>&1 1>/dev/null | compare %f").split('|')
return cmd
return None
def split_comment(comment, lines):
"""Split text based on comments"""
comment_lines = []
uncomment_lines = []
comment_len = len(comment)
for line in lines:
if line.strip()[:comment_len] == comment:
if not precheck_flag(line.strip()[comment_len:]):
comment_lines.append(line.strip()[comment_len:])
else:
uncomment_lines.append(line)
return uncomment_lines, comment_lines
def add_run_path(new_path):
"""Add path to PATH"""
run_env = os.environ.copy()
old_path = run_env.get("PATH")
if old_path:
if sys.platform == 'linux' or sys.platform == 'darwin':
run_env["PATH"] = old_path + ":" + new_path
else:
run_env["PATH"] = old_path + ";" + new_path
else:
run_env["PATH"] = new_path
return run_env
def timer(func):
"""Decorator that reports the execution time."""
@wraps(func)
def wrapper(*args, **kwargs):
start = timeit.default_timer()
result = func(*args, **kwargs)
end = timeit.default_timer()
safe_print(
"Function: {}, args: {}, kwargs: {}, Time Consuming: {}s\n".format(
func.__name__, str(args), str(kwargs), end - start
)
)
return result
return wrapper
def is_relative(path1, path2):
"""Is path1 relative to path2"""
_p1 = complete_path(path1)
_p2 = complete_path(path2)
try:
_p1.relative_to(_p2)
except ValueError:
return 0
return 1
def merge_result(multi_results):
for result in multi_results:
if result in UNSUCCESSFUL:
return result
return PASS
def escape(special_chars, original_string):
special_re = re.compile(
"(" + "|".join(re.escape(char) for char in list(special_chars)) + ")"
)
special_map = {char: "\\%s" % char for char in special_chars}
def escape_special_char(m):
char = m.group(1)
return special_map[char]
return special_re.sub(escape_special_char, original_string)
def quote(original_string):
if platform.system() == "Windows":
return '"' + escape('\\"', original_string) + '"'
else:
return shlex.quote(original_string)
def safe_print(*args):
with PRINT_LOCK:
print(*args)