#!/usr/bin/env python3
#coding: utf-8

"""
Copyright (c) 2021-2026 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Description: run script
    expect_output will get run result,
    expect_sub_output will catch pivotal sub output,
    expect_file will get print string
"""

import argparse
import os
import platform
import shlex
import shutil
import subprocess
import sys
import time
import zipfile
from io import StringIO


IS_MACOS = platform.system().lower() == "darwin"
# On macOS, DYLD_LIBRARY_PATH is stripped by SIP when the process is launched
# via a protected binary like /bin/sh (used by shell=True). Use shell=False
# with a proper argument list to avoid this issue.
LIB_PATH_ENV_VAR = "DYLD_LIBRARY_PATH" if IS_MACOS else "LD_LIBRARY_PATH"


def get_env_path_from_rsp(script_file: str) -> list:
    """get env path from response file recursively."""
    rsp_file = "{0}{1}".format(script_file, ".rsp")
    if not os.path.exists(rsp_file):
        print(
            "File \"{}\" does not exist!\n" \
            "This indicates that its related shared_library is not compiled by this project, but there is an " \
            "executable or shared_library depend on its related shared_library!".format(rsp_file))
        sys.exit(1)

    rsp_info_list = []
    with open(rsp_file, "r") as fi:
        rsp_info_str = fi.read()
        rsp_info_list = rsp_info_str.split(" ")

    env_path_list = []
    for element in rsp_info_list:
        if element.endswith(".so") or element.endswith(".dll"):
            env_path_list.extend(get_env_path_from_rsp(element))
            env_path_list.append(os.path.dirname(element))
    return env_path_list


def get_command_and_env_path(args: object) -> [str, str]:
    """get command and environment path from args for running excutable."""
    if args.arkjsvmpath is None:
        env_path_list = list(set(get_env_path_from_rsp(args.script_file)))
        env_path_list.append(args.clang_lib_path)
        env_path = ":".join(env_path_list)
    else:
        env_path_list = list()
        env_path = ""
    if args.qemu_binary_path:
        if not os.path.exists(args.qemu_binary_path):
            print("Have you set up environment for running executables with qemu?\n" \
                "If not, get set-up steps from https://gitee.com/ark_standalone_build/docs ," \
                " append your build command of ark.py with option \"--clean-continue\"," \
                " and execute the appended command after setting up the environment.\n" \
                "If yes, the environment settings for qemu on your host machine may be different from what the link" \
                " above shows, it is suggested to match your local environment settings with what the link shows.")
            sys.exit(1)
        cmd = \
            "{}".format(args.qemu_binary_path) + \
            " -L {}".format(args.qemu_ld_prefix) + \
            " -E LD_LIBRARY_PATH={}".format(env_path) + \
            " {}".format(args.script_file)
    else:
        cmd = "{}".format(args.script_file)
    cmd += " {}".format(args.script_options) if args.script_options else ""
    cmd += " {}".format(args.script_args) if args.script_args else ""
    return [cmd, env_path]


def parse_args() -> object:
    """parse arguments."""
    parser = argparse.ArgumentParser()
    parser.add_argument('--script-file', help='execute script file')
    parser.add_argument('--script-options', help='execute script options')
    parser.add_argument('--script-args', help='args of script')
    parser.add_argument('--expect-output', help='expect output')
    parser.add_argument('--expect-sub-output', help='expect sub output')
    parser.add_argument('--expect-file', help='expect file')
    parser.add_argument('--env-path', help='LD_LIBRARY_PATH env')
    parser.add_argument('--timeout-limit', help='timeout limit')
    parser.add_argument('--clang-lib-path', help='part for LD_LIBRARY_PATH, it is not in .rsp file')
    parser.add_argument('--qemu-binary-path', help='path to qemu binary, run executable with qemu if assigned')
    parser.add_argument('--qemu-ld-prefix', help='elf interpreter prefix')
    parser.add_argument('--expect-sub-error', help='use error output to compare result')
    parser.add_argument('--L', help='qemu lib path')
    parser.add_argument('--arkjsvmpath', help='ark_js_vm_path')
    parser.add_argument('--test-abc-path', help='abc path')
    parser.add_argument('--skip', type=bool, default=False, help='skip execute')
    parser.add_argument('--clean-path', action='append', default=[],
                        help='remove file/dir before execute, can be specified multiple times')
    args = parser.parse_args()
    return args


def clean_paths(paths: list):
    """Remove stale files/dirs before command execution."""
    for path in paths:
        if not path:
            continue
        if os.path.islink(path) or os.path.isfile(path):
            os.remove(path)
        elif os.path.isdir(path):
            shutil.rmtree(path)


def process_open(args: object) -> [str, object]:
    """get command and open subprocess."""
    if args.env_path:
        # use the given env-path
        cmd_list = [args.script_file]
        if args.script_options:
            cmd_list.extend(shlex.split(args.script_options))
        if args.script_args:
            cmd_list.extend(shlex.split(args.script_args))
        # Build the display command string for logging
        cmd = " ".join(cmd_list)
        # On macOS, avoid shell=True to prevent SIP from stripping
        # DYLD_LIBRARY_PATH from the environment.
        env = os.environ.copy()
        env[LIB_PATH_ENV_VAR] = str(args.env_path)
        subp = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, env=env)
    else:
        # get env-path from response file recursively
        [cmd, env_path] = get_command_and_env_path(args)
        args.env_path = env_path
        if args.qemu_binary_path:
            # process for running executable with qemu
            subp = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        else:
            # process for running executable directly
            cmd_list = shlex.split(cmd)
            env = os.environ.copy()
            env[LIB_PATH_ENV_VAR] = str(env_path)
            subp = subprocess.Popen(cmd_list, stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE, env=env)
    return [cmd, subp]


def generate_stub_code_comment(out_str:str):
    dir_path = './gen/arkcompiler/ets_runtime/'
    if not os.path.exists(dir_path):
        return
    zip_path = dir_path + 'stub_code_comment.zip'
    if out_str != '':
        memory_file = StringIO()
        memory_file.write(out_str)
        memory_file.seek(0)
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
            zip_file.writestr('stub_code_comment.txt', memory_file.getvalue())


def check_expect_output(cmd: str, returncode: str, out_str: str, err_str: str, expect_output: str, start_time: float, env_path: str):
    """Check if return code matches expected output."""
    if returncode != expect_output:
        print(">>>>> ret <<<<<")
        print(returncode)
        print(">>>>> out <<<<<")
        print(out_str)
        print(">>>>> err <<<<<")
        print(err_str)
        print(">>>>> Expect return: [" + expect_output \
            + "]\n>>>>> But got: [" + returncode + "]")
        raise RuntimeError("Run [LD_LIBRARY_PATH=" + env_path + " " + cmd + "] failed! used: %.5f seconds" % (time.time() - start_time))


def check_expect_sub_output(cmd: str, returncode: str, out_str: str, err_str: str, expect_sub_output: str, start_time: float, env_path: str):
    """Check if output contains expected substring."""
    if out_str.find(expect_sub_output) == -1 or returncode != "0":
        print(">>>>> ret <<<<<")
        print(returncode)
        print(">>>>> err <<<<<")
        print(err_str)
        print(">>>>> Expect contain: [" + expect_sub_output \
            + "]\n>>>>> But got: [" + out_str + "]")
        raise RuntimeError("Run [LD_LIBRARY_PATH=" + env_path + " " + cmd + "] failed! used: %.5f seconds" % (time.time() - start_time))


def check_expect_sub_error(cmd: str, returncode: str, out_str: str, err_str: str, expect_sub_error: str, start_time: float, env_path: str):
    """Check if error output contains expected substring."""
    if err_str.find(expect_sub_error) == -1 or returncode == '0':
        print(">>>>> returnCode <<<<<")
        print(returncode)
        print(">>>>> expect err <<<<<")
        print(err_str)
        if returncode == '0':
            print(">>>>> Expect err : [" + expect_sub_error \
                + "]\n>>>>> But got corret: [" + out_str + "]")
        else:
            print(">>>>> Expect err : [" + expect_sub_error \
                + "]\n>>>>> But got wrong err: [" + err_str + "]")
        raise RuntimeError("Run [LD_LIBRARY_PATH=" + env_path + " " + cmd + "] failed! used: %.5f seconds" % (time.time() - start_time))


def check_expect_file(cmd: str, returncode: str, out_str: str, err_str: str, expect_file: str, start_time: float, env_path: str):
    """Check if output matches expected file content."""
    with open(expect_file, mode='r') as file:
        # skip license header
        expect_output = ''.join(file.readlines()[13:])
        file.close()
        result_cmp = compare_line_by_line(expect_output, out_str)
        if result_cmp or returncode != "0":
            print(">>>>> ret <<<<<")
            print(returncode)
            print(">>>>> err <<<<<")
            print(err_str)
            print(">>>>> Expect {} lines: [{}]\n>>>>> But got {} lines: [{}]".format(
                expect_output.count('\n'), expect_output, out_str.count('\n'), out_str
            ))
            raise RuntimeError("Run [LD_LIBRARY_PATH=" + env_path + " " + cmd + "] failed! used: %.5f seconds" % (time.time() - start_time))


def judge_output(args: object):
    """Run executable and judge if success or not."""

    if args.skip:
        return

    clean_paths(args.clean_path)

    start_time = time.time()
    [cmd, subp] = process_open(args)
    timeout_limit = int(args.timeout_limit) if args.timeout_limit else 2400  # units: s

    try:
        out, err = subp.communicate(timeout=timeout_limit)
    except subprocess.TimeoutExpired:
        raise RuntimeError('Run [LD_LIBRARY_PATH=' + args.env_path + " " + cmd + '] timeout, timeout_limit = ', timeout_limit, 's')

    out_str = out.decode('UTF-8', errors="ignore")
    err_str = err.decode('UTF-8', errors="ignore")
    generate_stub_code_comment(out_str)
    returncode = str(subp.returncode)

    if args.expect_output:
        check_expect_output(cmd, returncode, out_str, err_str, args.expect_output, start_time, args.env_path)
    elif args.expect_sub_output:
        check_expect_sub_output(cmd, returncode, out_str, err_str, args.expect_sub_output, start_time, args.env_path)
    elif args.expect_sub_error:
        check_expect_sub_error(cmd, returncode, out_str, err_str, args.expect_sub_error, start_time, args.env_path)
    elif args.expect_file:
        check_expect_file(cmd, returncode, out_str, err_str, args.expect_file, start_time, args.env_path)
    else:
        raise RuntimeError("Run [LD_LIBRARY_PATH=" + args.env_path + " " + cmd + "] with no expect! used: %.5f seconds" % (time.time() - start_time))

    print("Run [LD_LIBRARY_PATH=" + args.env_path + " " + cmd + "] success!")
    print("used: %.5f seconds" % (time.time() - start_time))

def compare_line_by_line(expect_output:str, got_output:str):
    expect_output_list = expect_output.split("\n")
    got_output_list = got_output.split("\n")
    for index, (expect_line, got_line) in enumerate(zip(expect_output_list, got_output_list)):
        if expect_line == got_line:
            continue
        error_msg = ""

        if "__INT_MORE_PREV__" in expect_line:
            prev_got_value = reverse_find_first_not_trace_line(got_output_list, index-1)
            if got_line.isdigit() and prev_got_value.isdigit() and int(prev_got_value) < int(got_line):
                continue
            error_msg = "Got integer result is not more than previous integer result"

        if "__INT__" in expect_line:
            if got_line.isdigit():
                continue
            error_msg = "Got not integer"

        print(">>>>> diff <<<<<")
        if error_msg:
            print(error_msg)
        print("Difference in line {}:\nExcepted: [{}]\nBut got:  [{}]".format(index+1, expect_line, got_line))
        return True
    return False

def reverse_find_first_not_trace_line(output_list: list, init_index: int) -> str:
    for i in range(init_index, -1, -1):
        if "[trace]" not in output_list[i]:
            return output_list[i]
    return ""

if __name__ == '__main__':
    input_args = parse_args()
    judge_output(input_args)