#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Copyright (c) 2023-2024 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Description: run regress test case
"""
import argparse
import dataclasses
import datetime
import json
import logging
import multiprocessing
import os
import platform
import re
import shutil
import stat
import subprocess
import sys
from abc import ABC
from typing import Optional, List, Type, Dict, Set, Tuple, Callable
from os.path import dirname, join
from pathlib import Path
import xml.etree.cElementTree as XTree
from enum import Enum, auto

from regress_test_config import RegressTestConfig

ENV_PATTERN = re.compile(r"//\s+Environment Variables:(.*)")


def init_log_file(args):
    logging.basicConfig(filename=args.out_log, format=RegressTestConfig.DEFAULT_LOG_FORMAT, level=logging.INFO)


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--test-dir', metavar='DIR',
                        help='Directory to test ')
    parser.add_argument('--test-file', metavar='FILE',
                        help='File to test')
    parser.add_argument('--test-list', metavar='FILE', dest="test_list", default=None,
                        help='File with list of tests to run')
    parser.add_argument('--ignore-list', metavar='FILE', dest="ignore_list", default=None,
                        help='File with known failed tests list')
    parser.add_argument('--timeout', default=RegressTestConfig.DEFAULT_TIMEOUT, type=int,
                        help='Set a custom test timeout in seconds !!!\n')
    parser.add_argument('--processes', default=RegressTestConfig.DEFAULT_PROCESSES, type=int,
                        help='set number of processes to use. Default value: 1\n')
    parser.add_argument('--stub-path',
                        help="stub file for run in AOT modes")
    parser.add_argument('--LD_LIBRARY_PATH', '--libs-dir',
                        dest='ld_library_path', default=None, help='LD_LIBRARY_PATH')
    parser.add_argument('--icu-path',
                        dest='icu_path', help='icu-data-path')
    parser.add_argument('--out-dir',
                        default=None, help='target out dir')
    parser = parse_args_run_mode(parser)
    return parser.parse_args()


def parse_args_run_mode(parser):
    parser.add_argument('--merge-abc-binary',
                        help="merge-abc's binary tool")
    parser.add_argument('--ark-tool',
                        help="ark's binary tool")
    parser.add_argument('--ark-aot-tool',
                        help="ark_aot's binary tool")
    parser.add_argument('--ark-aot', default=False, action='store_true',
                        help="runs in ark-aot mode")
    parser.add_argument('--run-pgo', default=False, action='store_true',
                        help="runs in pgo mode")
    parser.add_argument('--enable-litecg', default=False, action='store_true',
                        help="runs in litecg mode")
    parser.add_argument('--ark-frontend-binary',
                        help="ark frontend conversion binary tool")
    parser.add_argument('--force-clone', action="store_true",
                        default=False, help='Force to clone tests folder')
    parser.add_argument('--ark-arch',
                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
                        required=False,
                        nargs='?', choices=RegressTestConfig.ARK_ARCH_LIST, type=str)
    parser.add_argument('--ark-arch-root',
                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
                        required=False,
                        help="the root path for qemu-aarch64 or qemu-arm")
    parser.add_argument('--disable-force-gc', action='store_true',
                        help="Run regress tests with close force-gc")
    parser.add_argument('--multi-context', action='store_true',
                        help="Run regress tests with multi context")
    parser.add_argument('--compiler-opt-track-field', default=False, action='store',
                        dest="compiler_opt_track_field",
                        help='enable compiler opt track field. Default value: False\n')
    return parser


def check_ark_frontend_binary(args) -> bool:
    if args.ark_frontend_binary is None:
        output('ark_frontend_binary is required, please add this parameter')
        return False
    return True


def check_frontend_library(args) -> bool:
    current_dir = str(os.getcwd())
    current_frontend_binary = os.path.join(current_dir, str(args.ark_frontend_binary))
    test_tool_frontend_binary = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_frontend_binary)
    if not os.path.exists(current_frontend_binary) and not os.path.exists(test_tool_frontend_binary):
        output('entered ark_frontend_binary does not exist. please confirm')
        return False
    args.ark_frontend_binary = current_frontend_binary if os.path.exists(
        current_frontend_binary) else test_tool_frontend_binary
    args.ark_frontend_binary = os.path.abspath(args.ark_frontend_binary)
    return True


def check_ark_tool(args) -> bool:
    current_dir = str(os.getcwd())
    if args.ark_tool is None:
        output('ark_tool is required, please add this parameter')
        return False

    current_ark_tool = os.path.join(current_dir, str(args.ark_tool))
    test_tool_ark_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_tool)
    if not os.path.exists(current_ark_tool) and not os.path.exists(test_tool_ark_tool):
        output('entered ark_tool does not exist. please confirm')
        return False

    args.ark_tool = current_ark_tool if os.path.exists(current_ark_tool) else test_tool_ark_tool
    args.ark_tool = os.path.abspath(args.ark_tool)
    return True


def check_ark_aot(args) -> bool:
    if args.ark_aot:
        current_dir = str(os.getcwd())
        current_ark_aot_tool = os.path.join(current_dir, str(args.ark_aot_tool))
        test_tool_ark_aot_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_aot_tool)
        if not os.path.exists(current_ark_aot_tool) and not os.path.exists(test_tool_ark_aot_tool):
            output(f'entered ark_aot_tool "{args.ark_aot_tool}" does not exist. Please check')
            return False
        args.ark_aot_tool = current_ark_aot_tool if os.path.exists(current_ark_aot_tool) else test_tool_ark_aot_tool
        args.ark_aot_tool = os.path.abspath(args.ark_aot_tool)
        return True
    if args.run_pgo and not args.ark_aot:
        output('pgo mode cannot be used without aot')
        return False
    return True


def check_stub_path(args) -> bool:
    if args.stub_path:
        current_dir = str(os.getcwd())
        stub_path = os.path.join(current_dir, str(args.stub_path))
        if not os.path.exists(stub_path):
            output(f'entered stub-file "{args.stub_path}" does not exist. Please check')
            return False
        args.stub_path = os.path.abspath(args.stub_path)
    return True


def is_ignore_file_present(ignore_path: str) -> bool:
    if os.path.exists(ignore_path):
        return True
    output(f"Cannot find ignore list '{ignore_path}'")
    return False


def check_ignore_list(args) -> bool:
    if args.ignore_list:
        if os.path.isabs(args.ignore_list):
            return is_ignore_file_present(args.ignore_list)
        args.ignore_list = str(os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ignore_list))
        return is_ignore_file_present(args.ignore_list)
    return True


def check_args(args):
    result = check_ark_frontend_binary(args)
    result = result and check_frontend_library(args)
    result = result and check_ark_tool(args)
    result = result and check_ark_aot(args)
    result = result and check_stub_path(args)
    result = result and check_ignore_list(args)

    if not result:
        return False

    if args.ld_library_path is not None:
        libs = args.ld_library_path.split(":")
        current_dir = str(os.getcwd())
        libs = [os.path.abspath(os.path.join(current_dir, str(lib))) for lib in libs]
        args.ld_library_path = ":".join(libs)
    else:
        args.ld_library_path = RegressTestConfig.DEFAULT_LIBS_DIR
    if args.icu_path is None:
        args.icu_path = RegressTestConfig.ICU_PATH_DATA
    if args.out_dir is None:
        args.out_dir = RegressTestConfig.PROJECT_BASE_OUT_DIR
    else:
        args.out_dir = os.path.abspath(os.path.join(RegressTestConfig.CURRENT_PATH, args.out_dir))
    if not args.out_dir.endswith("/"):
        args.out_dir = f"{args.out_dir}/"
    args.regress_out_dir = os.path.join(args.out_dir, "regresstest")
    args.out_result = os.path.join(args.regress_out_dir, 'result.txt')
    args.junit_report = os.path.join(args.regress_out_dir, 'report.xml')
    args.out_log = os.path.join(args.regress_out_dir, 'test.log')
    args.test_case_out_dir = os.path.join(args.regress_out_dir, RegressTestConfig.REGRESS_GIT_REPO)
    return True


def remove_dir(path):
    if os.path.exists(path):
        shutil.rmtree(path)


def output(msg):
    print(str(msg))
    logging.info(str(msg))


def output_debug(msg):
    logging.debug(str(msg))


def get_extra_error_message(ret_code: int) -> str:
    error_messages = {
        0: '',
        -6: 'Aborted (core dumped)',
        -4: 'Aborted (core dumped)',
        -11: 'Segmentation fault (core dumped)',
        255: '(uncaught error)'
    }
    error_message = error_messages.get(ret_code, f'Unknown Error: {str(ret_code)}')
    return error_message


@dataclasses.dataclass
class StepResult:
    step_name: str  # a copy of the step name
    is_passed: bool = False  # True if passed, any other state is False
    command: List[str] = dataclasses.field(default_factory=list)  # command to run
    return_code: int = -1
    stdout: Optional[str] = None  # present only if there is some output
    stderr: Optional[str] = None  # can be present only if is_passed == False
    fileinfo: Optional[str] = None  # content of fileinfo file if present

    def report(self) -> str:
        stdout = self.stdout if self.stdout else ''
        stderr = self.stderr if self.stderr else ''
        cmd = " ".join([str(cmd) for cmd in self.command])
        result: List[str] = [
            f"{self.step_name}:",
            f"\tCommand: {cmd}",
            f"\treturn code={self.return_code}",
            f"\toutput='{stdout}'",
            f"\terrors='{stderr}'"]
        if self.fileinfo:
            result.append(f"\tFileInfo:\n{self.fileinfo}")
        return "\n".join(result)


@dataclasses.dataclass
class TestReport:
    src_path: str  # full path to the source test
    test_id: str = ""  # path starting from regresstest
    out_path: str = ""  # full path to intermediate files up to folder
    passed: bool = False  # False if the test has not started or failed
    is_skipped: bool = False  # True if the test has found in the skipped (excluded) list
    is_ignored: bool = False  # True if the test has found in the ignored list
    steps: List[StepResult] = dataclasses.field(default_factory=list)  # list of results

    def report(self) -> str:
        result: List[str] = [f"{self.test_id}:"]
        if self.steps is None:
            return ""
        for step in self.steps:
            result.append(f"\t{step.report()}")
        return "\n".join(result)


class RegressTestStep(ABC):
    step_obj: Optional['RegressTestStep'] = None

    def __init__(self, args, name):
        output(f"--- Start step {name} ---")
        self.args = args
        self.__start: Optional[datetime.datetime] = None
        self.__end: Optional[datetime.datetime] = None
        self.__duration: Optional[datetime.timedelta] = None
        self.name: str = name

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        pass

    def get_duration(self) -> datetime.timedelta:
        if self.__duration is None:
            output(f"Step {self.name} not started or not completed")
            sys.exit(1)
        return self.__duration

    def _start(self):
        self.__start = datetime.datetime.now()

    def _end(self):
        self.__end = datetime.datetime.now()
        self.__duration = self.__end - self.__start


class RegressTestRepoPrepare(RegressTestStep):
    def __init__(self, args):
        RegressTestStep.__init__(self, args, "Repo preparation")
        self.test_list: List[str] = self.read_test_list(args.test_list)

    @staticmethod
    def read_test_list(test_list_name: Optional[str]) -> List[str]:
        if test_list_name is None:
            return []
        filename = join(dirname(__file__), test_list_name)
        if not Path(filename).exists():
            output(f"File {filename} set as --test-list value cannot be found")
            exit(1)
        with open(filename, 'r') as stream:
            return stream.read().split("\n")

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        repo = RegressTestRepoPrepare(args)
        RegressTestRepoPrepare.step_obj = repo
        repo._start()

        repo.run_regress_test_prepare()
        repo.prepare_clean_data()
        repo.get_test_case()
        test_list = repo.get_regress_test_files()
        skip_list = repo.get_skip_test_cases()
        if test_reports is None:
            test_reports = []
        for test in test_list:
            shorten = Utils.get_inside_path(test)
            test_id = f"regresstest/ark-regress/{shorten}"
            if shorten not in skip_list:
                report = TestReport(src_path=test, test_id=test_id)
                test_reports.append(report)

        repo._end()
        return test_reports

    @staticmethod
    def git_checkout(checkout_options, check_out_dir=os.getcwd()):
        cmds = ['git', 'checkout', checkout_options]
        result = True
        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
            ret = proc.wait()
            if ret:
                output(f"\n error: git checkout '{checkout_options}' failed.")
                result = False
        return result

    @staticmethod
    def git_pull(check_out_dir=os.getcwd()):
        cmds = ['git', 'pull', '--rebase']
        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
            proc.wait()

    @staticmethod
    def git_clean(clean_dir=os.getcwd()):
        cmds = ['git', 'checkout', '--', '.']
        with subprocess.Popen(cmds, cwd=clean_dir) as proc:
            proc.wait()

    @staticmethod
    def git_clone(git_url, code_dir):
        cmds = ['git', 'clone', git_url, code_dir]
        retries = RegressTestConfig.DEFAULT_RETRIES
        while retries > 0:
            with subprocess.Popen(cmds) as proc:
                ret = proc.wait()
                if ret:
                    output(f"\n Error: Cloning '{git_url}' failed. Retry remaining '{retries}' times")
                    retries -= 1
                else:
                    return True
        sys.exit(1)

    @staticmethod
    def get_skip_test_cases() -> List[str]:
        return Utils.read_skip_list(RegressTestConfig.SKIP_LIST_FILE)

    def get_test_case(self):
        if not os.path.isdir(os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, '.git')):
            self.git_clone(RegressTestConfig.REGRESS_GIT_URL, RegressTestConfig.REGRESS_TEST_CASE_DIR)
            return self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
        return True

    def prepare_clean_data(self):
        self.git_clean(RegressTestConfig.REGRESS_TEST_CASE_DIR)
        self.git_pull(RegressTestConfig.REGRESS_TEST_CASE_DIR)
        self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)

    def run_regress_test_prepare(self):
        if self.args.force_clone:
            remove_dir(self.args.regress_out_dir)
            remove_dir(RegressTestConfig.REGRESS_TEST_CASE_DIR)
        os.makedirs(self.args.regress_out_dir, exist_ok=True)
        os.makedirs(RegressTestConfig.REGRESS_TEST_CASE_DIR, exist_ok=True)
        init_log_file(self.args)

    def get_regress_test_files(self) -> List[str]:
        result: List[str] = []
        if self.args.test_file is not None and len(self.args.test_file) > 0:
            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_file)
            result.append(str(test_file_list))
            return result
        elif self.args.test_dir is not None and len(self.args.test_dir) > 0:
            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_dir)
        else:
            test_file_list = RegressTestConfig.REGRESS_TEST_CASE_DIR
        for dir_path, path, filenames in os.walk(test_file_list):
            if dir_path.find(".git") != -1:
                continue
            for filename in filenames:
                if filename.endswith(".js") or filename.endswith(".mjs"):
                    result.append(str(os.path.join(dir_path, filename)))
        return result


class RegressTestCompile(RegressTestStep):
    def __init__(self, args, test_reports: List[TestReport]):
        RegressTestStep.__init__(self, args, "Regress test compilation")
        self.out_dir = args.out_dir
        self.test_reports = test_reports
        for test in self.test_reports:
            test.out_path = os.path.dirname(os.path.join(self.out_dir, test.test_id))

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        if test_reports is None:
            output("No tests loaded")
            exit(-1)
        test_prepare = RegressTestCompile(args, test_reports)
        RegressTestCompile.step_obj = test_prepare
        test_prepare._start()
        test_reports = test_prepare.gen_abc_files()
        test_prepare._end()
        return test_reports

    @staticmethod
    def create_files_info(test_report: TestReport) -> Tuple[str, str]:
        src_files_info = [
            RegressTestConfig.REGRESS_TEST_TOOL_DIR,
            test_report.src_path
        ]
        file_info_content: List[str] = []
        file_info_path = str(os.path.join(
            test_report.out_path,
            f"{Utils.get_file_only_name(test_report.src_path)}-filesInfo.txt"))
        os.makedirs(test_report.out_path, exist_ok=True)
        with os.fdopen(
                os.open(file_info_path, flags=os.O_RDWR | os.O_CREAT, mode=stat.S_IRUSR | stat.S_IWUSR),
                mode="w+", encoding="utf-8"
        ) as fp:
            for src_file_info in src_files_info:
                line = f"{src_file_info};{Utils.get_file_only_name(src_file_info)};esm;xxx;yyy\n"
                file_info_content.append(line)
                fp.write(line)
        return file_info_path, "\n".join(file_info_content)

    def gen_abc_files(self) -> List[TestReport]:
        with multiprocessing.Pool(processes=self.args.processes) as pool:
            results = pool.imap_unordered(self.gen_abc_file, self.test_reports)
            results = list(results)
            pool.close()
            pool.join()

        return results

    def gen_abc_file(self, test_report: TestReport) -> Optional[TestReport]:
        file_info_path, file_info_content = self.create_files_info(test_report)
        out_file = change_extension(test_report.src_path, '.out')
        expect_file_exists = os.path.exists(out_file)
        output_file = change_extension(
            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
            ".abc")
        if test_report.src_path == RegressTestConfig.REGRESS_TEST_TOOL_DIR:
            # generate abc file for mjsunit.js
            command = [
                self.args.ark_frontend_binary,
                test_report.src_path,
                f'--output={output_file}'
            ]
        else:
            command = [
                self.args.ark_frontend_binary,
                f"@{file_info_path}",
                "--merge-abc",
                "--module",
                f'--output={output_file}'
            ]
        step_result = StepResult(self.name, command=command, fileinfo=file_info_content)
        Utils.exec_command(command, test_report.test_id, step_result, self.args.timeout,
                           lambda rt, _, _2: get_extra_error_message(rt))
        test_report.steps.append(step_result)
        test_report.passed = step_result.is_passed
        if expect_file_exists:
            out_file_path = os.path.join(test_report.out_path, change_extension(test_report.test_id, '.out'))
            shutil.copy(str(out_file), str(out_file_path))
        return test_report


class RegressTestPgo(RegressTestStep):
    def __init__(self, args):
        RegressTestStep.__init__(self, args, "Regress Test PGO ")

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        pgo = RegressTestPgo(args)
        RegressTestPgo.step_obj = pgo
        pgo._start()
        test_reports = pgo.generate_aps(test_reports)
        pgo._end()
        return test_reports

    def get_test_ap_cmd(self, test_report: TestReport) -> List[str]:
        abc_file = change_extension(
            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
            ".abc")
        ap_file = change_extension(abc_file, ".ap")
        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
        gen_ap_cmd = []
        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
            qemu_tool = "qemu-aarch64"
            gen_ap_cmd = [
                qemu_tool,
                "-L",
                self.args.ark_arch_root
            ]
        gen_ap_cmd.append(self.args.ark_tool)
        gen_ap_cmd.append("--log-level=info")
        gen_ap_cmd.append(f"--icu-data-path={self.args.icu_path}")
        gen_ap_cmd.append("--compiler-target-triple=aarch64-unknown-linux-gn")
        gen_ap_cmd.append("--enable-pgo-profiler=true")
        gen_ap_cmd.append("--compiler-opt-inlining=true")
        gen_ap_cmd.append(f"--compiler-pgo-profiler-path={ap_file}")
        gen_ap_cmd.append("--asm-interpreter=true")
        gen_ap_cmd.append(f"--entry-point={entry_point}")
        gen_ap_cmd.append(f"{abc_file}")
        return gen_ap_cmd

    def generate_ap(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
        if test_report is None or not test_report.passed:
            return test_report
        command = self.get_test_ap_cmd(test_report)
        step = StepResult(self.name, command=command)
        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
                           lambda rt, _, _2: get_extra_error_message(rt))
        test_report.steps.append(step)
        test_report.passed = step.is_passed
        return test_report

    def generate_aps(self, test_reports: List[TestReport]) -> List[TestReport]:
        with multiprocessing.Pool(processes=self.args.processes) as pool:
            results = pool.imap_unordered(self.generate_ap, test_reports)
            results = list(results)
            pool.close()
            pool.join()

        return results


class Utils:
    ark_regress = "ark-regress"

    @staticmethod
    def get_file_only_name(full_file_name: str) -> str:
        _, file_name = os.path.split(full_file_name)
        only_name, _ = os.path.splitext(file_name)
        return only_name

    @staticmethod
    def get_file_name(full_file_name: str) -> str:
        _, file_name = os.path.split(full_file_name)
        return file_name

    @staticmethod
    def mk_dst_dir(file, src_dir, dist_dir):
        idx = file.rfind(src_dir)
        fpath, _ = os.path.split(file[idx:])
        fpath = fpath.replace(src_dir, dist_dir)
        os.makedirs(fpath, exist_ok=True)

    @staticmethod
    def get_inside_path(file_path: str, marker: Optional[str] = None) -> str:
        if marker is None:
            marker = Utils.ark_regress
        index = file_path.find(marker)
        if index > -1:
            return file_path[index + len(marker) + 1:]
        return file_path

    @staticmethod
    def exec_command(cmd_args, test_id: str, step_result: StepResult, timeout=RegressTestConfig.DEFAULT_TIMEOUT,
                     get_extra_error_msg: Optional[Callable[[int, str, str], str]] = None) -> None:
        code_format = 'utf-8'
        if platform.system() == "Windows":
            code_format = 'gbk'
        cmd_string = "\n\t".join([str(arg).strip() for arg in cmd_args if arg is not None])
        msg_cmd = "\n".join([
            f"Run command:\n{cmd_string}",
            f"Env: {os.environ.get('LD_LIBRARY_PATH')}"
        ])
        msg_result = f"TEST ({step_result.step_name.strip()}): {test_id}"
        try:
            with subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True,
                                  start_new_session=True) as process:
                output_res, errs = process.communicate(timeout=timeout)
                ret_code = process.poll()
                step_result.return_code = ret_code
                stderr = str(errs.decode(code_format, 'ignore').strip())
                stdout = str(output_res.decode(code_format, 'ignore').strip())
                extra_message = get_extra_error_msg(ret_code, stdout, stderr) if get_extra_error_msg is not None else ""
                step_result.stderr = f"{extra_message + '. ' if extra_message else '' }{stderr if stderr else ''}"
                step_result.stdout = stdout
                if ret_code == 0:
                    msg_result = f"{msg_result} PASSED"
                    step_result.is_passed = True
                else:
                    msg_result = f"{msg_result} FAILED"
        except subprocess.TimeoutExpired:
            process.kill()
            process.terminate()
            step_result.return_code = -1
            step_result.stderr = f"Timeout: timed out after {timeout} seconds"
            msg_result = f"{msg_result} FAILED"
        except Exception as exc:
            step_result.return_code = -1
            step_result.stderr = f"Unknown error: {exc}"
            msg_result = f"{msg_result} FAILED"
        if step_result.is_passed:
            output(msg_result)
            output_debug(msg_cmd)
        else:
            output(f"{msg_result}\n{step_result.stderr}\n{msg_cmd}")

    @staticmethod
    def read_skip_list(skip_list_path: str) -> List[str]:
        skip_tests_list = []
        with os.fdopen(os.open(skip_list_path, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
            json_data = json.load(file_object)
            for key in json_data:
                skip_tests_list.extend(key["files"])
        return skip_tests_list

    @staticmethod
    def read_file_as_str(file_name: str) -> str:
        with os.fdopen(os.open(file_name, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
            content = [line.strip() for line in file_object.readlines()]
        return "\n".join(content)


class RegressTestAot(RegressTestStep):
    def __init__(self, args):
        RegressTestStep.__init__(self, args, "Regress Test AOT mode")

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        aot = RegressTestAot(args)
        RegressTestAot.step_obj = aot
        aot._start()
        test_reports = aot.compile_aots(test_reports)
        aot._end()
        return test_reports

    def get_test_aot_cmd(self, test_report: TestReport) -> List[str]:
        abc_file = change_extension(
            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
            ".abc")
        ap_file = change_extension(abc_file, ".ap")
        aot_file = change_extension(abc_file, "")
        compiler_opt_track_field = self.args.compiler_opt_track_field
        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
            aot_cmd = [
                "qemu-aarch64",
                "-L",
                self.args.ark_arch_root,
                self.args.ark_aot_tool,
                "--compiler-target-triple=aarch64-unknown-linux-gnu",
                f"--aot-file={aot_file}"
            ]
        else:
            aot_cmd = [
            self.args.ark_aot_tool,
            f"--aot-file={aot_file}",
        ]
        pgo = [
            "--compiler-opt-loop-peeling=true",
            "--compiler-fast-compile=false",
            f"--compiler-opt-track-field={compiler_opt_track_field}",
            "--compiler-opt-inlining=true",
            "--compiler-max-inline-bytecodes=45",
            "--compiler-opt-level=2",
            f"--compiler-pgo-profiler-path={ap_file}",
        ]
        litecg = [
            "--compiler-enable-litecg=true",
        ]
        aot_cmd_tail = [
            f"{abc_file}",
        ]

        if self.args.run_pgo:
            aot_cmd.extend(pgo)
        if self.args.enable_litecg:
            aot_cmd.extend(litecg)
        aot_cmd.extend(aot_cmd_tail)
        return aot_cmd

    def compile_aot(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
        if test_report is None or not test_report.passed:
            return test_report
        command = self.get_test_aot_cmd(test_report)
        step = StepResult(self.name, command=command)
        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
                           lambda rt, _, _2: get_extra_error_message(rt))
        test_report.steps.append(step)
        test_report.passed = step.is_passed
        return test_report

    def compile_aots(self, test_reports: List[TestReport]) -> List[TestReport]:
        with multiprocessing.Pool(processes=self.args.processes) as pool:
            results = pool.imap_unordered(self.compile_aot, test_reports)
            results = list(results)
            pool.close()
            pool.join()

        return results


class RegressOption(Enum):
    NO_FORCE_GC = auto()
    ELEMENTS_KIND = auto()


def get_regress_groups() -> Dict[RegressOption, List[str]]:
    groups = {}
    with os.fdopen(os.open(RegressTestConfig.REGRESS_TEST_OPTIONS, os.O_RDONLY, stat.S_IRUSR), "r") as file:
        for group in json.load(file):
            groups[RegressOption[group["name"]]] = group["files"]
    return groups


def get_test_options(test: str, test_groups: Dict[RegressOption, List[str]], regress_option: RegressOption) -> List[str]:
    opt_values: Dict[RegressOption, str] = {
        RegressOption.NO_FORCE_GC: "--enable-force-gc=",
        RegressOption.ELEMENTS_KIND: "--enable-elements-kind="
    }

    def match(opt: RegressOption) -> bool:
        return test in test_groups.get(opt, [])

    def to_flag(b: bool) -> str:
        return str(b).lower()

    try:
        return [opt_values.get(regress_option) + to_flag(not match(regress_option))]
    except KeyError:
        return []


class RegressTestRun(RegressTestStep):
    def __init__(self, args):
        RegressTestStep.__init__(self, args, "Regress Test Run ")
        self.test_groups: Dict[RegressOption, List[str]] = get_regress_groups()

    @staticmethod
    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
        runner = RegressTestRun(args)
        RegressTestRun.step_obj = runner
        runner._start()
        test_reports = runner.run_test_case_dir(test_reports)
        runner._end()
        return test_reports

    @staticmethod
    def extra_check_with_expect(ret_code: int, test_report: TestReport, expect_file, stdout: str, stderr: str) -> str:
        expect_output_str = read_expect_file(expect_file, test_report.src_path)
        test_case_file = Utils.read_file_as_str(test_report.src_path)
        if stdout == expect_output_str.strip() or stderr == expect_output_str.strip():
            if ret_code == 0 or (ret_code == 255 and "/fail/" in test_case_file):
                return ""
            else:
                return get_extra_error_message(ret_code)
        msg = f'expect: [{expect_output_str}]\nbut got: [{stderr}]'
        return msg

    @staticmethod
    def extra_check_with_assert(ret_code: int, stderr: Optional[str]) -> str:
        if ret_code == 0 and stderr and "[ecmascript] Stack overflow" not in stderr:
            return str(stderr)
        return get_extra_error_message(ret_code)

    def run_test_case_dir(self, test_reports: List[TestReport]) -> List[TestReport]:
        with multiprocessing.Pool(processes=self.args.processes, initializer=init_worker,
                                  initargs=(self.args,)) as pool:
            results = pool.imap_unordered(self.run_test_case, test_reports)
            results = list(results)
            pool.close()
            pool.join()

        return results

    def run_test_case(self, test_report: TestReport) -> Optional[TestReport]:
        self.args = worker_wrapper_args
        if self.args is None or test_report is None or not test_report.passed:
            return test_report
        if test_report.src_path.endswith(RegressTestConfig.TEST_TOOL_FILE_JS_NAME):
            return None

        abc_file = change_extension(
            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
            ".abc")
        tool_abc_file = os.path.join(self.args.test_case_out_dir, RegressTestConfig.TEST_TOOL_FILE_NAME)
        aot_file = change_extension(abc_file, "")
        expect_file = change_extension(abc_file, ".out")
        entry_point = Utils.get_file_only_name(test_report.src_path)

        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path

        # test environ LC_ALL/TZ
        test_name = test_report.test_id.replace('regresstest/ark-regress/', '')
        set_test_environ(test_report.src_path)
        command = []
        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
            qemu_tool = "qemu-aarch64"
            qemu_arg1 = "-L"
            qemu_arg2 = self.args.ark_arch_root
            command = [qemu_tool, qemu_arg1, qemu_arg2]
        command.append(self.args.ark_tool)
        command.append(f"--icu-data-path={self.args.icu_path}")
        command.append(f"--entry-point={entry_point}")
        if self.args.ark_aot:
            command.append(f"--stub-file={self.args.stub_path}")
            command.append(f"--aot-file={aot_file}")
        if self.args.multi_context:
            command.append("--multi-context=true")
        if self.args.disable_force_gc:
            command.append("--enable-force-gc=false")
        else:
            command.extend(get_test_options(test_name, self.test_groups, RegressOption.NO_FORCE_GC))
        command.append("--open-ark-tools=true")
        command.extend(get_test_options(test_name, self.test_groups, RegressOption.ELEMENTS_KIND))
        command.append(tool_abc_file + ":" + abc_file)

        return self.run_test_case_file(command, test_report, expect_file)

    def run_test_case_file(self, command, test_report: TestReport, expect_file) -> TestReport:
        expect_file_exits = os.path.exists(expect_file)
        step = StepResult(self.name, command=command)
        if expect_file_exits:
            self.run_test_case_with_expect(command, step, test_report, expect_file)
        else:
            self.run_test_case_with_assert(command, step, test_report)
        test_report.steps.append(step)
        test_report.passed = step.is_passed
        return test_report

    def run_test_case_with_expect(self, command, step: StepResult, test_report: TestReport, expect_file) -> None:
        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
                           lambda rt, out, err: self.extra_check_with_expect(rt, test_report, expect_file, out, err))

    def run_test_case_with_assert(self, command, step: StepResult, test_report: TestReport) -> None:
        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
                           lambda rt, _, err: self.extra_check_with_assert(rt, err))


class Stats:
    def __init__(self, args, test_reports: List[TestReport]):
        self.args = args
        self.pass_count = 0
        self.fail_count = 0
        self.test_reports = test_reports
        self.errors: Dict[str, List[TestReport]] = {}

    def read_ignore_list(self) -> Optional[Set[str]]:
        if self.args.ignore_list is None:
            return None
        with os.fdopen(os.open(self.args.ignore_list, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
            lines = file_object.readlines()
            lines = [line.strip() for line in lines if not line.strip().startswith('#')]
        return set(lines)

    def get_new_failures(self) -> Optional[List[TestReport]]:
        ignore_list = self.read_ignore_list()
        if ignore_list is None:
            return None
        new_failures: List[TestReport] = []
        for test_report in self.test_reports:
            if test_report and not test_report.passed and test_report.steps:
                if test_report.test_id not in ignore_list:
                    new_failures.append(test_report)
        return new_failures

    def statistics(self):
        root = XTree.Element("testsuite")
        root.set("name", "Regression")

        result_file = open_write_file(self.args.out_result, False)
        for test_report in self.test_reports:
            if test_report is None:
                continue
            testcase = XTree.SubElement(root, "testcase")
            testcase.set("name", f"{test_report.test_id}")
            if test_report.passed:
                write_result_file(f"PASS: {test_report.test_id}", result_file)
                self.pass_count += 1
            else:
                self.fail_count += 1
                write_result_file(f"FAIL: {test_report.test_id}", result_file)
                failed_step = test_report.steps[-1]
                if failed_step.step_name not in self.errors:
                    self.errors[failed_step.step_name] = []
                self.errors[failed_step.step_name].append(test_report)
                XTree.SubElement(testcase, "failure").text = f"<![CDATA[{test_report.report()}]]>"

        root.set("tests", f"{self.pass_count + self.fail_count}")
        root.set("failures", f"{self.fail_count}")

        tree = XTree.ElementTree(root)
        tree.write(self.args.junit_report, xml_declaration=True, encoding="UTF-8")
        result_file.close()

    def print_result(self, args, steps):
        result_file = open_write_file(args.out_result, True)
        summary_duration = datetime.timedelta()
        for step in steps:
            output(f"Step {step.step_obj.name} - duration {step.step_obj.get_duration()}")
            summary_duration += step.step_obj.get_duration()
        msg = f'\npass count: {self.pass_count}'
        write_result_file(msg, result_file)
        output(msg)
        msg = f'fail count: {self.fail_count}'
        write_result_file(msg, result_file)
        output(msg)
        msg = f'total count: {self.fail_count + self.pass_count}'
        write_result_file(msg, result_file)
        output(msg)
        msg = f'total used time is: {str(summary_duration)}'
        write_result_file(msg, result_file)
        output(msg)
        result_file.close()

    def print_failed_tests(self):
        output("=== Failed tests ===")
        for key, values in self.errors.items():
            output(f"{key}: {len(values)} tests")


def change_extension(path, new_ext: str):
    base_path, ext = os.path.splitext(path)
    if ext:
        new_path = base_path + new_ext
    else:
        new_path = path + new_ext
    return new_path


def get_files_by_ext(start_dir, suffix):
    result = []
    for dir_path, dir_names, filenames in os.walk(start_dir):
        for filename in filenames:
            if filename.endswith(suffix):
                result.append(os.path.join(dir_path, filename))
    return result


def read_expect_file(expect_file, test_case_file):
    with os.fdopen(os.open(expect_file, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
        lines = file_object.readlines()
        lines = [line for line in lines if not line.strip().startswith('#')]
        expect_output = ''.join(lines)
        if test_case_file.startswith("/"):
            test_case_file = test_case_file.lstrip("/")
        expect_file = test_case_file.replace('regresstest/', '')
        test_file_path = os.path.join(RegressTestConfig.REGRESS_BASE_TEST_DIR, expect_file)
        expect_output_str = expect_output.replace('*%(basename)s', test_file_path)
    return expect_output_str


def open_write_file(file_path, append):
    if append:
        args = os.O_RDWR | os.O_CREAT | os.O_APPEND
    else:
        args = os.O_RDWR | os.O_CREAT
    file_descriptor = os.open(file_path, args, stat.S_IRUSR | stat.S_IWUSR)
    file_object = os.fdopen(file_descriptor, "w+")
    return file_object


def open_result_excel(file_path):
    file_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_APPEND, stat.S_IRUSR | stat.S_IWUSR)
    file_object = os.fdopen(file_descriptor, "w+")
    return file_object


def get_file_source(file):
    with open(file, encoding='ISO-8859-1') as f:
        return f.read()


def set_test_environ(case):
    # intl environ LC_ALL
    if 'LC_ALL' in os.environ:
        del os.environ['LC_ALL']
    if 'TZ' in os.environ:
        del os.environ['TZ']
    if not os.path.exists(case):
        return
    source = get_file_source(case)
    env_match = ENV_PATTERN.search(source)
    if env_match:
        for env_pair in env_match.group(1).strip().split():
            var, value = env_pair.split('=')
            if var.find('TZ') >= 0:
                os.environ['TZ'] = value
            if var.find('LC_ALL') >= 0:
                os.environ['LC_ALL'] = value
            break


# pylint: disable=invalid-name,global-statement
worker_wrapper_args = None


def init_worker(args):
    global worker_wrapper_args
    worker_wrapper_args = args


def write_result_file(msg: str, result_file):
    result_file.write(f'{msg}\n')


def main(args):
    if not check_args(args):
        return 1
    output("\nStart regresstest........")
    steps: List[Type[RegressTestStep]] = [
        RegressTestRepoPrepare,
        RegressTestCompile,
    ]
    if args.ark_aot:
        if args.run_pgo:
            steps.append(RegressTestPgo)
        steps.append(RegressTestAot)
    steps.append(RegressTestRun)

    test_reports: List[TestReport] = []
    for step in steps:
        test_reports = step.run(args, test_reports)

    stats = Stats(args, test_reports)
    stats.statistics()
    stats.print_result(args, steps)
    stats.print_failed_tests()
    new_failures = stats.get_new_failures()
    if new_failures is None:
        return 0
    if len(new_failures) > 0:
        msg = [f"Found {len(new_failures)} new failures:"]
        for failure in new_failures:
            msg.append(f"\t{failure.test_id}")
        output("\n".join(msg))
    else:
        output("No new failures have been found")
    return len(new_failures)


if __name__ == "__main__":
    sys.exit(main(parse_args()))