"""
Copyright (c) 2023-2024 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Description: run regress test case
"""
import argparse
import dataclasses
import datetime
import json
import logging
import multiprocessing
import os
import platform
import re
import shutil
import stat
import subprocess
import sys
from abc import ABC
from typing import Optional, List, Type, Dict, Set, Tuple, Callable
from os.path import dirname, join
from pathlib import Path
import xml.etree.cElementTree as XTree
from enum import Enum, auto
from regress_test_config import RegressTestConfig
ENV_PATTERN = re.compile(r"//\s+Environment Variables:(.*)")
def init_log_file(args):
logging.basicConfig(filename=args.out_log, format=RegressTestConfig.DEFAULT_LOG_FORMAT, level=logging.INFO)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--test-dir', metavar='DIR',
help='Directory to test ')
parser.add_argument('--test-file', metavar='FILE',
help='File to test')
parser.add_argument('--test-list', metavar='FILE', dest="test_list", default=None,
help='File with list of tests to run')
parser.add_argument('--ignore-list', metavar='FILE', dest="ignore_list", default=None,
help='File with known failed tests list')
parser.add_argument('--timeout', default=RegressTestConfig.DEFAULT_TIMEOUT, type=int,
help='Set a custom test timeout in seconds !!!\n')
parser.add_argument('--processes', default=RegressTestConfig.DEFAULT_PROCESSES, type=int,
help='set number of processes to use. Default value: 1\n')
parser.add_argument('--stub-path',
help="stub file for run in AOT modes")
parser.add_argument('--LD_LIBRARY_PATH', '--libs-dir',
dest='ld_library_path', default=None, help='LD_LIBRARY_PATH')
parser.add_argument('--icu-path',
dest='icu_path', help='icu-data-path')
parser.add_argument('--out-dir',
default=None, help='target out dir')
parser = parse_args_run_mode(parser)
return parser.parse_args()
def parse_args_run_mode(parser):
parser.add_argument('--merge-abc-binary',
help="merge-abc's binary tool")
parser.add_argument('--ark-tool',
help="ark's binary tool")
parser.add_argument('--ark-aot-tool',
help="ark_aot's binary tool")
parser.add_argument('--ark-aot', default=False, action='store_true',
help="runs in ark-aot mode")
parser.add_argument('--run-pgo', default=False, action='store_true',
help="runs in pgo mode")
parser.add_argument('--enable-litecg', default=False, action='store_true',
help="runs in litecg mode")
parser.add_argument('--ark-frontend-binary',
help="ark frontend conversion binary tool")
parser.add_argument('--force-clone', action="store_true",
default=False, help='Force to clone tests folder')
parser.add_argument('--ark-arch',
default=RegressTestConfig.DEFAULT_ARK_ARCH,
required=False,
nargs='?', choices=RegressTestConfig.ARK_ARCH_LIST, type=str)
parser.add_argument('--ark-arch-root',
default=RegressTestConfig.DEFAULT_ARK_ARCH,
required=False,
help="the root path for qemu-aarch64 or qemu-arm")
parser.add_argument('--disable-force-gc', action='store_true',
help="Run regress tests with close force-gc")
parser.add_argument('--multi-context', action='store_true',
help="Run regress tests with multi context")
parser.add_argument('--compiler-opt-track-field', default=False, action='store',
dest="compiler_opt_track_field",
help='enable compiler opt track field. Default value: False\n')
return parser
def check_ark_frontend_binary(args) -> bool:
if args.ark_frontend_binary is None:
output('ark_frontend_binary is required, please add this parameter')
return False
return True
def check_frontend_library(args) -> bool:
current_dir = str(os.getcwd())
current_frontend_binary = os.path.join(current_dir, str(args.ark_frontend_binary))
test_tool_frontend_binary = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_frontend_binary)
if not os.path.exists(current_frontend_binary) and not os.path.exists(test_tool_frontend_binary):
output('entered ark_frontend_binary does not exist. please confirm')
return False
args.ark_frontend_binary = current_frontend_binary if os.path.exists(
current_frontend_binary) else test_tool_frontend_binary
args.ark_frontend_binary = os.path.abspath(args.ark_frontend_binary)
return True
def check_ark_tool(args) -> bool:
current_dir = str(os.getcwd())
if args.ark_tool is None:
output('ark_tool is required, please add this parameter')
return False
current_ark_tool = os.path.join(current_dir, str(args.ark_tool))
test_tool_ark_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_tool)
if not os.path.exists(current_ark_tool) and not os.path.exists(test_tool_ark_tool):
output('entered ark_tool does not exist. please confirm')
return False
args.ark_tool = current_ark_tool if os.path.exists(current_ark_tool) else test_tool_ark_tool
args.ark_tool = os.path.abspath(args.ark_tool)
return True
def check_ark_aot(args) -> bool:
if args.ark_aot:
current_dir = str(os.getcwd())
current_ark_aot_tool = os.path.join(current_dir, str(args.ark_aot_tool))
test_tool_ark_aot_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_aot_tool)
if not os.path.exists(current_ark_aot_tool) and not os.path.exists(test_tool_ark_aot_tool):
output(f'entered ark_aot_tool "{args.ark_aot_tool}" does not exist. Please check')
return False
args.ark_aot_tool = current_ark_aot_tool if os.path.exists(current_ark_aot_tool) else test_tool_ark_aot_tool
args.ark_aot_tool = os.path.abspath(args.ark_aot_tool)
return True
if args.run_pgo and not args.ark_aot:
output('pgo mode cannot be used without aot')
return False
return True
def check_stub_path(args) -> bool:
if args.stub_path:
current_dir = str(os.getcwd())
stub_path = os.path.join(current_dir, str(args.stub_path))
if not os.path.exists(stub_path):
output(f'entered stub-file "{args.stub_path}" does not exist. Please check')
return False
args.stub_path = os.path.abspath(args.stub_path)
return True
def is_ignore_file_present(ignore_path: str) -> bool:
if os.path.exists(ignore_path):
return True
output(f"Cannot find ignore list '{ignore_path}'")
return False
def check_ignore_list(args) -> bool:
if args.ignore_list:
if os.path.isabs(args.ignore_list):
return is_ignore_file_present(args.ignore_list)
args.ignore_list = str(os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ignore_list))
return is_ignore_file_present(args.ignore_list)
return True
def check_args(args):
result = check_ark_frontend_binary(args)
result = result and check_frontend_library(args)
result = result and check_ark_tool(args)
result = result and check_ark_aot(args)
result = result and check_stub_path(args)
result = result and check_ignore_list(args)
if not result:
return False
if args.ld_library_path is not None:
libs = args.ld_library_path.split(":")
current_dir = str(os.getcwd())
libs = [os.path.abspath(os.path.join(current_dir, str(lib))) for lib in libs]
args.ld_library_path = ":".join(libs)
else:
args.ld_library_path = RegressTestConfig.DEFAULT_LIBS_DIR
if args.icu_path is None:
args.icu_path = RegressTestConfig.ICU_PATH_DATA
if args.out_dir is None:
args.out_dir = RegressTestConfig.PROJECT_BASE_OUT_DIR
else:
args.out_dir = os.path.abspath(os.path.join(RegressTestConfig.CURRENT_PATH, args.out_dir))
if not args.out_dir.endswith("/"):
args.out_dir = f"{args.out_dir}/"
args.regress_out_dir = os.path.join(args.out_dir, "regresstest")
args.out_result = os.path.join(args.regress_out_dir, 'result.txt')
args.junit_report = os.path.join(args.regress_out_dir, 'report.xml')
args.out_log = os.path.join(args.regress_out_dir, 'test.log')
args.test_case_out_dir = os.path.join(args.regress_out_dir, RegressTestConfig.REGRESS_GIT_REPO)
return True
def remove_dir(path):
if os.path.exists(path):
shutil.rmtree(path)
def output(msg):
print(str(msg))
logging.info(str(msg))
def output_debug(msg):
logging.debug(str(msg))
def get_extra_error_message(ret_code: int) -> str:
error_messages = {
0: '',
-6: 'Aborted (core dumped)',
-4: 'Aborted (core dumped)',
-11: 'Segmentation fault (core dumped)',
255: '(uncaught error)'
}
error_message = error_messages.get(ret_code, f'Unknown Error: {str(ret_code)}')
return error_message
@dataclasses.dataclass
class StepResult:
step_name: str
is_passed: bool = False
command: List[str] = dataclasses.field(default_factory=list)
return_code: int = -1
stdout: Optional[str] = None
stderr: Optional[str] = None
fileinfo: Optional[str] = None
def report(self) -> str:
stdout = self.stdout if self.stdout else ''
stderr = self.stderr if self.stderr else ''
cmd = " ".join([str(cmd) for cmd in self.command])
result: List[str] = [
f"{self.step_name}:",
f"\tCommand: {cmd}",
f"\treturn code={self.return_code}",
f"\toutput='{stdout}'",
f"\terrors='{stderr}'"]
if self.fileinfo:
result.append(f"\tFileInfo:\n{self.fileinfo}")
return "\n".join(result)
@dataclasses.dataclass
class TestReport:
src_path: str
test_id: str = ""
out_path: str = ""
passed: bool = False
is_skipped: bool = False
is_ignored: bool = False
steps: List[StepResult] = dataclasses.field(default_factory=list)
def report(self) -> str:
result: List[str] = [f"{self.test_id}:"]
if self.steps is None:
return ""
for step in self.steps:
result.append(f"\t{step.report()}")
return "\n".join(result)
class RegressTestStep(ABC):
step_obj: Optional['RegressTestStep'] = None
def __init__(self, args, name):
output(f"--- Start step {name} ---")
self.args = args
self.__start: Optional[datetime.datetime] = None
self.__end: Optional[datetime.datetime] = None
self.__duration: Optional[datetime.timedelta] = None
self.name: str = name
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
pass
def get_duration(self) -> datetime.timedelta:
if self.__duration is None:
output(f"Step {self.name} not started or not completed")
sys.exit(1)
return self.__duration
def _start(self):
self.__start = datetime.datetime.now()
def _end(self):
self.__end = datetime.datetime.now()
self.__duration = self.__end - self.__start
class RegressTestRepoPrepare(RegressTestStep):
def __init__(self, args):
RegressTestStep.__init__(self, args, "Repo preparation")
self.test_list: List[str] = self.read_test_list(args.test_list)
@staticmethod
def read_test_list(test_list_name: Optional[str]) -> List[str]:
if test_list_name is None:
return []
filename = join(dirname(__file__), test_list_name)
if not Path(filename).exists():
output(f"File {filename} set as --test-list value cannot be found")
exit(1)
with open(filename, 'r') as stream:
return stream.read().split("\n")
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
repo = RegressTestRepoPrepare(args)
RegressTestRepoPrepare.step_obj = repo
repo._start()
repo.run_regress_test_prepare()
repo.prepare_clean_data()
repo.get_test_case()
test_list = repo.get_regress_test_files()
skip_list = repo.get_skip_test_cases()
if test_reports is None:
test_reports = []
for test in test_list:
shorten = Utils.get_inside_path(test)
test_id = f"regresstest/ark-regress/{shorten}"
if shorten not in skip_list:
report = TestReport(src_path=test, test_id=test_id)
test_reports.append(report)
repo._end()
return test_reports
@staticmethod
def git_checkout(checkout_options, check_out_dir=os.getcwd()):
cmds = ['git', 'checkout', checkout_options]
result = True
with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
ret = proc.wait()
if ret:
output(f"\n error: git checkout '{checkout_options}' failed.")
result = False
return result
@staticmethod
def git_pull(check_out_dir=os.getcwd()):
cmds = ['git', 'pull', '--rebase']
with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
proc.wait()
@staticmethod
def git_clean(clean_dir=os.getcwd()):
cmds = ['git', 'checkout', '--', '.']
with subprocess.Popen(cmds, cwd=clean_dir) as proc:
proc.wait()
@staticmethod
def git_clone(git_url, code_dir):
cmds = ['git', 'clone', git_url, code_dir]
retries = RegressTestConfig.DEFAULT_RETRIES
while retries > 0:
with subprocess.Popen(cmds) as proc:
ret = proc.wait()
if ret:
output(f"\n Error: Cloning '{git_url}' failed. Retry remaining '{retries}' times")
retries -= 1
else:
return True
sys.exit(1)
@staticmethod
def get_skip_test_cases() -> List[str]:
return Utils.read_skip_list(RegressTestConfig.SKIP_LIST_FILE)
def get_test_case(self):
if not os.path.isdir(os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, '.git')):
self.git_clone(RegressTestConfig.REGRESS_GIT_URL, RegressTestConfig.REGRESS_TEST_CASE_DIR)
return self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
return True
def prepare_clean_data(self):
self.git_clean(RegressTestConfig.REGRESS_TEST_CASE_DIR)
self.git_pull(RegressTestConfig.REGRESS_TEST_CASE_DIR)
self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
def run_regress_test_prepare(self):
if self.args.force_clone:
remove_dir(self.args.regress_out_dir)
remove_dir(RegressTestConfig.REGRESS_TEST_CASE_DIR)
os.makedirs(self.args.regress_out_dir, exist_ok=True)
os.makedirs(RegressTestConfig.REGRESS_TEST_CASE_DIR, exist_ok=True)
init_log_file(self.args)
def get_regress_test_files(self) -> List[str]:
result: List[str] = []
if self.args.test_file is not None and len(self.args.test_file) > 0:
test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_file)
result.append(str(test_file_list))
return result
elif self.args.test_dir is not None and len(self.args.test_dir) > 0:
test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_dir)
else:
test_file_list = RegressTestConfig.REGRESS_TEST_CASE_DIR
for dir_path, path, filenames in os.walk(test_file_list):
if dir_path.find(".git") != -1:
continue
for filename in filenames:
if filename.endswith(".js") or filename.endswith(".mjs"):
result.append(str(os.path.join(dir_path, filename)))
return result
class RegressTestCompile(RegressTestStep):
def __init__(self, args, test_reports: List[TestReport]):
RegressTestStep.__init__(self, args, "Regress test compilation")
self.out_dir = args.out_dir
self.test_reports = test_reports
for test in self.test_reports:
test.out_path = os.path.dirname(os.path.join(self.out_dir, test.test_id))
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
if test_reports is None:
output("No tests loaded")
exit(-1)
test_prepare = RegressTestCompile(args, test_reports)
RegressTestCompile.step_obj = test_prepare
test_prepare._start()
test_reports = test_prepare.gen_abc_files()
test_prepare._end()
return test_reports
@staticmethod
def create_files_info(test_report: TestReport) -> Tuple[str, str]:
src_files_info = [
RegressTestConfig.REGRESS_TEST_TOOL_DIR,
test_report.src_path
]
file_info_content: List[str] = []
file_info_path = str(os.path.join(
test_report.out_path,
f"{Utils.get_file_only_name(test_report.src_path)}-filesInfo.txt"))
os.makedirs(test_report.out_path, exist_ok=True)
with os.fdopen(
os.open(file_info_path, flags=os.O_RDWR | os.O_CREAT, mode=stat.S_IRUSR | stat.S_IWUSR),
mode="w+", encoding="utf-8"
) as fp:
for src_file_info in src_files_info:
line = f"{src_file_info};{Utils.get_file_only_name(src_file_info)};esm;xxx;yyy\n"
file_info_content.append(line)
fp.write(line)
return file_info_path, "\n".join(file_info_content)
def gen_abc_files(self) -> List[TestReport]:
with multiprocessing.Pool(processes=self.args.processes) as pool:
results = pool.imap_unordered(self.gen_abc_file, self.test_reports)
results = list(results)
pool.close()
pool.join()
return results
def gen_abc_file(self, test_report: TestReport) -> Optional[TestReport]:
file_info_path, file_info_content = self.create_files_info(test_report)
out_file = change_extension(test_report.src_path, '.out')
expect_file_exists = os.path.exists(out_file)
output_file = change_extension(
os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
".abc")
if test_report.src_path == RegressTestConfig.REGRESS_TEST_TOOL_DIR:
command = [
self.args.ark_frontend_binary,
test_report.src_path,
f'--output={output_file}'
]
else:
command = [
self.args.ark_frontend_binary,
f"@{file_info_path}",
"--merge-abc",
"--module",
f'--output={output_file}'
]
step_result = StepResult(self.name, command=command, fileinfo=file_info_content)
Utils.exec_command(command, test_report.test_id, step_result, self.args.timeout,
lambda rt, _, _2: get_extra_error_message(rt))
test_report.steps.append(step_result)
test_report.passed = step_result.is_passed
if expect_file_exists:
out_file_path = os.path.join(test_report.out_path, change_extension(test_report.test_id, '.out'))
shutil.copy(str(out_file), str(out_file_path))
return test_report
class RegressTestPgo(RegressTestStep):
def __init__(self, args):
RegressTestStep.__init__(self, args, "Regress Test PGO ")
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
pgo = RegressTestPgo(args)
RegressTestPgo.step_obj = pgo
pgo._start()
test_reports = pgo.generate_aps(test_reports)
pgo._end()
return test_reports
def get_test_ap_cmd(self, test_report: TestReport) -> List[str]:
abc_file = change_extension(
os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
".abc")
ap_file = change_extension(abc_file, ".ap")
entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
gen_ap_cmd = []
if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
qemu_tool = "qemu-aarch64"
gen_ap_cmd = [
qemu_tool,
"-L",
self.args.ark_arch_root
]
gen_ap_cmd.append(self.args.ark_tool)
gen_ap_cmd.append("--log-level=info")
gen_ap_cmd.append(f"--icu-data-path={self.args.icu_path}")
gen_ap_cmd.append("--compiler-target-triple=aarch64-unknown-linux-gn")
gen_ap_cmd.append("--enable-pgo-profiler=true")
gen_ap_cmd.append("--compiler-opt-inlining=true")
gen_ap_cmd.append(f"--compiler-pgo-profiler-path={ap_file}")
gen_ap_cmd.append("--asm-interpreter=true")
gen_ap_cmd.append(f"--entry-point={entry_point}")
gen_ap_cmd.append(f"{abc_file}")
return gen_ap_cmd
def generate_ap(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
if test_report is None or not test_report.passed:
return test_report
command = self.get_test_ap_cmd(test_report)
step = StepResult(self.name, command=command)
Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
lambda rt, _, _2: get_extra_error_message(rt))
test_report.steps.append(step)
test_report.passed = step.is_passed
return test_report
def generate_aps(self, test_reports: List[TestReport]) -> List[TestReport]:
with multiprocessing.Pool(processes=self.args.processes) as pool:
results = pool.imap_unordered(self.generate_ap, test_reports)
results = list(results)
pool.close()
pool.join()
return results
class Utils:
ark_regress = "ark-regress"
@staticmethod
def get_file_only_name(full_file_name: str) -> str:
_, file_name = os.path.split(full_file_name)
only_name, _ = os.path.splitext(file_name)
return only_name
@staticmethod
def get_file_name(full_file_name: str) -> str:
_, file_name = os.path.split(full_file_name)
return file_name
@staticmethod
def mk_dst_dir(file, src_dir, dist_dir):
idx = file.rfind(src_dir)
fpath, _ = os.path.split(file[idx:])
fpath = fpath.replace(src_dir, dist_dir)
os.makedirs(fpath, exist_ok=True)
@staticmethod
def get_inside_path(file_path: str, marker: Optional[str] = None) -> str:
if marker is None:
marker = Utils.ark_regress
index = file_path.find(marker)
if index > -1:
return file_path[index + len(marker) + 1:]
return file_path
@staticmethod
def exec_command(cmd_args, test_id: str, step_result: StepResult, timeout=RegressTestConfig.DEFAULT_TIMEOUT,
get_extra_error_msg: Optional[Callable[[int, str, str], str]] = None) -> None:
code_format = 'utf-8'
if platform.system() == "Windows":
code_format = 'gbk'
cmd_string = "\n\t".join([str(arg).strip() for arg in cmd_args if arg is not None])
msg_cmd = "\n".join([
f"Run command:\n{cmd_string}",
f"Env: {os.environ.get('LD_LIBRARY_PATH')}"
])
msg_result = f"TEST ({step_result.step_name.strip()}): {test_id}"
try:
with subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True,
start_new_session=True) as process:
output_res, errs = process.communicate(timeout=timeout)
ret_code = process.poll()
step_result.return_code = ret_code
stderr = str(errs.decode(code_format, 'ignore').strip())
stdout = str(output_res.decode(code_format, 'ignore').strip())
extra_message = get_extra_error_msg(ret_code, stdout, stderr) if get_extra_error_msg is not None else ""
step_result.stderr = f"{extra_message + '. ' if extra_message else '' }{stderr if stderr else ''}"
step_result.stdout = stdout
if ret_code == 0:
msg_result = f"{msg_result} PASSED"
step_result.is_passed = True
else:
msg_result = f"{msg_result} FAILED"
except subprocess.TimeoutExpired:
process.kill()
process.terminate()
step_result.return_code = -1
step_result.stderr = f"Timeout: timed out after {timeout} seconds"
msg_result = f"{msg_result} FAILED"
except Exception as exc:
step_result.return_code = -1
step_result.stderr = f"Unknown error: {exc}"
msg_result = f"{msg_result} FAILED"
if step_result.is_passed:
output(msg_result)
output_debug(msg_cmd)
else:
output(f"{msg_result}\n{step_result.stderr}\n{msg_cmd}")
@staticmethod
def read_skip_list(skip_list_path: str) -> List[str]:
skip_tests_list = []
with os.fdopen(os.open(skip_list_path, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
json_data = json.load(file_object)
for key in json_data:
skip_tests_list.extend(key["files"])
return skip_tests_list
@staticmethod
def read_file_as_str(file_name: str) -> str:
with os.fdopen(os.open(file_name, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
content = [line.strip() for line in file_object.readlines()]
return "\n".join(content)
class RegressTestAot(RegressTestStep):
def __init__(self, args):
RegressTestStep.__init__(self, args, "Regress Test AOT mode")
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
aot = RegressTestAot(args)
RegressTestAot.step_obj = aot
aot._start()
test_reports = aot.compile_aots(test_reports)
aot._end()
return test_reports
def get_test_aot_cmd(self, test_report: TestReport) -> List[str]:
abc_file = change_extension(
os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
".abc")
ap_file = change_extension(abc_file, ".ap")
aot_file = change_extension(abc_file, "")
compiler_opt_track_field = self.args.compiler_opt_track_field
os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
aot_cmd = [
"qemu-aarch64",
"-L",
self.args.ark_arch_root,
self.args.ark_aot_tool,
"--compiler-target-triple=aarch64-unknown-linux-gnu",
f"--aot-file={aot_file}"
]
else:
aot_cmd = [
self.args.ark_aot_tool,
f"--aot-file={aot_file}",
]
pgo = [
"--compiler-opt-loop-peeling=true",
"--compiler-fast-compile=false",
f"--compiler-opt-track-field={compiler_opt_track_field}",
"--compiler-opt-inlining=true",
"--compiler-max-inline-bytecodes=45",
"--compiler-opt-level=2",
f"--compiler-pgo-profiler-path={ap_file}",
]
litecg = [
"--compiler-enable-litecg=true",
]
aot_cmd_tail = [
f"{abc_file}",
]
if self.args.run_pgo:
aot_cmd.extend(pgo)
if self.args.enable_litecg:
aot_cmd.extend(litecg)
aot_cmd.extend(aot_cmd_tail)
return aot_cmd
def compile_aot(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
if test_report is None or not test_report.passed:
return test_report
command = self.get_test_aot_cmd(test_report)
step = StepResult(self.name, command=command)
Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
lambda rt, _, _2: get_extra_error_message(rt))
test_report.steps.append(step)
test_report.passed = step.is_passed
return test_report
def compile_aots(self, test_reports: List[TestReport]) -> List[TestReport]:
with multiprocessing.Pool(processes=self.args.processes) as pool:
results = pool.imap_unordered(self.compile_aot, test_reports)
results = list(results)
pool.close()
pool.join()
return results
class RegressOption(Enum):
NO_FORCE_GC = auto()
ELEMENTS_KIND = auto()
def get_regress_groups() -> Dict[RegressOption, List[str]]:
groups = {}
with os.fdopen(os.open(RegressTestConfig.REGRESS_TEST_OPTIONS, os.O_RDONLY, stat.S_IRUSR), "r") as file:
for group in json.load(file):
groups[RegressOption[group["name"]]] = group["files"]
return groups
def get_test_options(test: str, test_groups: Dict[RegressOption, List[str]], regress_option: RegressOption) -> List[str]:
opt_values: Dict[RegressOption, str] = {
RegressOption.NO_FORCE_GC: "--enable-force-gc=",
RegressOption.ELEMENTS_KIND: "--enable-elements-kind="
}
def match(opt: RegressOption) -> bool:
return test in test_groups.get(opt, [])
def to_flag(b: bool) -> str:
return str(b).lower()
try:
return [opt_values.get(regress_option) + to_flag(not match(regress_option))]
except KeyError:
return []
class RegressTestRun(RegressTestStep):
def __init__(self, args):
RegressTestStep.__init__(self, args, "Regress Test Run ")
self.test_groups: Dict[RegressOption, List[str]] = get_regress_groups()
@staticmethod
def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
runner = RegressTestRun(args)
RegressTestRun.step_obj = runner
runner._start()
test_reports = runner.run_test_case_dir(test_reports)
runner._end()
return test_reports
@staticmethod
def extra_check_with_expect(ret_code: int, test_report: TestReport, expect_file, stdout: str, stderr: str) -> str:
expect_output_str = read_expect_file(expect_file, test_report.src_path)
test_case_file = Utils.read_file_as_str(test_report.src_path)
if stdout == expect_output_str.strip() or stderr == expect_output_str.strip():
if ret_code == 0 or (ret_code == 255 and "/fail/" in test_case_file):
return ""
else:
return get_extra_error_message(ret_code)
msg = f'expect: [{expect_output_str}]\nbut got: [{stderr}]'
return msg
@staticmethod
def extra_check_with_assert(ret_code: int, stderr: Optional[str]) -> str:
if ret_code == 0 and stderr and "[ecmascript] Stack overflow" not in stderr:
return str(stderr)
return get_extra_error_message(ret_code)
def run_test_case_dir(self, test_reports: List[TestReport]) -> List[TestReport]:
with multiprocessing.Pool(processes=self.args.processes, initializer=init_worker,
initargs=(self.args,)) as pool:
results = pool.imap_unordered(self.run_test_case, test_reports)
results = list(results)
pool.close()
pool.join()
return results
def run_test_case(self, test_report: TestReport) -> Optional[TestReport]:
self.args = worker_wrapper_args
if self.args is None or test_report is None or not test_report.passed:
return test_report
if test_report.src_path.endswith(RegressTestConfig.TEST_TOOL_FILE_JS_NAME):
return None
abc_file = change_extension(
os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
".abc")
tool_abc_file = os.path.join(self.args.test_case_out_dir, RegressTestConfig.TEST_TOOL_FILE_NAME)
aot_file = change_extension(abc_file, "")
expect_file = change_extension(abc_file, ".out")
entry_point = Utils.get_file_only_name(test_report.src_path)
os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
test_name = test_report.test_id.replace('regresstest/ark-regress/', '')
set_test_environ(test_report.src_path)
command = []
if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
qemu_tool = "qemu-aarch64"
qemu_arg1 = "-L"
qemu_arg2 = self.args.ark_arch_root
command = [qemu_tool, qemu_arg1, qemu_arg2]
command.append(self.args.ark_tool)
command.append(f"--icu-data-path={self.args.icu_path}")
command.append(f"--entry-point={entry_point}")
if self.args.ark_aot:
command.append(f"--stub-file={self.args.stub_path}")
command.append(f"--aot-file={aot_file}")
if self.args.multi_context:
command.append("--multi-context=true")
if self.args.disable_force_gc:
command.append("--enable-force-gc=false")
else:
command.extend(get_test_options(test_name, self.test_groups, RegressOption.NO_FORCE_GC))
command.append("--open-ark-tools=true")
command.extend(get_test_options(test_name, self.test_groups, RegressOption.ELEMENTS_KIND))
command.append(tool_abc_file + ":" + abc_file)
return self.run_test_case_file(command, test_report, expect_file)
def run_test_case_file(self, command, test_report: TestReport, expect_file) -> TestReport:
expect_file_exits = os.path.exists(expect_file)
step = StepResult(self.name, command=command)
if expect_file_exits:
self.run_test_case_with_expect(command, step, test_report, expect_file)
else:
self.run_test_case_with_assert(command, step, test_report)
test_report.steps.append(step)
test_report.passed = step.is_passed
return test_report
def run_test_case_with_expect(self, command, step: StepResult, test_report: TestReport, expect_file) -> None:
Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
lambda rt, out, err: self.extra_check_with_expect(rt, test_report, expect_file, out, err))
def run_test_case_with_assert(self, command, step: StepResult, test_report: TestReport) -> None:
Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
lambda rt, _, err: self.extra_check_with_assert(rt, err))
class Stats:
def __init__(self, args, test_reports: List[TestReport]):
self.args = args
self.pass_count = 0
self.fail_count = 0
self.test_reports = test_reports
self.errors: Dict[str, List[TestReport]] = {}
def read_ignore_list(self) -> Optional[Set[str]]:
if self.args.ignore_list is None:
return None
with os.fdopen(os.open(self.args.ignore_list, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
lines = file_object.readlines()
lines = [line.strip() for line in lines if not line.strip().startswith('#')]
return set(lines)
def get_new_failures(self) -> Optional[List[TestReport]]:
ignore_list = self.read_ignore_list()
if ignore_list is None:
return None
new_failures: List[TestReport] = []
for test_report in self.test_reports:
if test_report and not test_report.passed and test_report.steps:
if test_report.test_id not in ignore_list:
new_failures.append(test_report)
return new_failures
def statistics(self):
root = XTree.Element("testsuite")
root.set("name", "Regression")
result_file = open_write_file(self.args.out_result, False)
for test_report in self.test_reports:
if test_report is None:
continue
testcase = XTree.SubElement(root, "testcase")
testcase.set("name", f"{test_report.test_id}")
if test_report.passed:
write_result_file(f"PASS: {test_report.test_id}", result_file)
self.pass_count += 1
else:
self.fail_count += 1
write_result_file(f"FAIL: {test_report.test_id}", result_file)
failed_step = test_report.steps[-1]
if failed_step.step_name not in self.errors:
self.errors[failed_step.step_name] = []
self.errors[failed_step.step_name].append(test_report)
XTree.SubElement(testcase, "failure").text = f"<![CDATA[{test_report.report()}]]>"
root.set("tests", f"{self.pass_count + self.fail_count}")
root.set("failures", f"{self.fail_count}")
tree = XTree.ElementTree(root)
tree.write(self.args.junit_report, xml_declaration=True, encoding="UTF-8")
result_file.close()
def print_result(self, args, steps):
result_file = open_write_file(args.out_result, True)
summary_duration = datetime.timedelta()
for step in steps:
output(f"Step {step.step_obj.name} - duration {step.step_obj.get_duration()}")
summary_duration += step.step_obj.get_duration()
msg = f'\npass count: {self.pass_count}'
write_result_file(msg, result_file)
output(msg)
msg = f'fail count: {self.fail_count}'
write_result_file(msg, result_file)
output(msg)
msg = f'total count: {self.fail_count + self.pass_count}'
write_result_file(msg, result_file)
output(msg)
msg = f'total used time is: {str(summary_duration)}'
write_result_file(msg, result_file)
output(msg)
result_file.close()
def print_failed_tests(self):
output("=== Failed tests ===")
for key, values in self.errors.items():
output(f"{key}: {len(values)} tests")
def change_extension(path, new_ext: str):
base_path, ext = os.path.splitext(path)
if ext:
new_path = base_path + new_ext
else:
new_path = path + new_ext
return new_path
def get_files_by_ext(start_dir, suffix):
result = []
for dir_path, dir_names, filenames in os.walk(start_dir):
for filename in filenames:
if filename.endswith(suffix):
result.append(os.path.join(dir_path, filename))
return result
def read_expect_file(expect_file, test_case_file):
with os.fdopen(os.open(expect_file, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
lines = file_object.readlines()
lines = [line for line in lines if not line.strip().startswith('#')]
expect_output = ''.join(lines)
if test_case_file.startswith("/"):
test_case_file = test_case_file.lstrip("/")
expect_file = test_case_file.replace('regresstest/', '')
test_file_path = os.path.join(RegressTestConfig.REGRESS_BASE_TEST_DIR, expect_file)
expect_output_str = expect_output.replace('*%(basename)s', test_file_path)
return expect_output_str
def open_write_file(file_path, append):
if append:
args = os.O_RDWR | os.O_CREAT | os.O_APPEND
else:
args = os.O_RDWR | os.O_CREAT
file_descriptor = os.open(file_path, args, stat.S_IRUSR | stat.S_IWUSR)
file_object = os.fdopen(file_descriptor, "w+")
return file_object
def open_result_excel(file_path):
file_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_APPEND, stat.S_IRUSR | stat.S_IWUSR)
file_object = os.fdopen(file_descriptor, "w+")
return file_object
def get_file_source(file):
with open(file, encoding='ISO-8859-1') as f:
return f.read()
def set_test_environ(case):
if 'LC_ALL' in os.environ:
del os.environ['LC_ALL']
if 'TZ' in os.environ:
del os.environ['TZ']
if not os.path.exists(case):
return
source = get_file_source(case)
env_match = ENV_PATTERN.search(source)
if env_match:
for env_pair in env_match.group(1).strip().split():
var, value = env_pair.split('=')
if var.find('TZ') >= 0:
os.environ['TZ'] = value
if var.find('LC_ALL') >= 0:
os.environ['LC_ALL'] = value
break
worker_wrapper_args = None
def init_worker(args):
global worker_wrapper_args
worker_wrapper_args = args
def write_result_file(msg: str, result_file):
result_file.write(f'{msg}\n')
def main(args):
if not check_args(args):
return 1
output("\nStart regresstest........")
steps: List[Type[RegressTestStep]] = [
RegressTestRepoPrepare,
RegressTestCompile,
]
if args.ark_aot:
if args.run_pgo:
steps.append(RegressTestPgo)
steps.append(RegressTestAot)
steps.append(RegressTestRun)
test_reports: List[TestReport] = []
for step in steps:
test_reports = step.run(args, test_reports)
stats = Stats(args, test_reports)
stats.statistics()
stats.print_result(args, steps)
stats.print_failed_tests()
new_failures = stats.get_new_failures()
if new_failures is None:
return 0
if len(new_failures) > 0:
msg = [f"Found {len(new_failures)} new failures:"]
for failure in new_failures:
msg.append(f"\t{failure.test_id}")
output("\n".join(msg))
else:
output("No new failures have been found")
return len(new_failures)
if __name__ == "__main__":
sys.exit(main(parse_args()))