#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1.运行环境
#   pycharm
#   Python 3.10
#   测试框架 pytest (pycharm setting -> Tools -> Testing -> pytest)
# 2.配置测试脚本
#   需配置hdc环境变量-NORMAL_HEAD为可执行程序hdc
#   配置TEST_FILE_PATH为测试用文件所在路径,取resource测试文件放入该路径下
#   测试tcp模式连接设备,需设备,PC连同一网络(手机热点):配置TCP_CFG['ip']为设备的ip
import os
import random
import subprocess
import socket
import time
import threading

TCP_CFG = {
    'ip': '',
    'port': "8710",
}

NORMAL_HEAD = "hdc "
REMOTE_PATH = "/data/local/tmp/"
IP = ""
TEST_FILE_PATH = "{}{}".format(os.path.abspath("D:/Download/sdk/"), '\\')

HAP_ONE = {
    'HAP_NAME': "entry-default-signed-debug.hap",
    'PACKAGE_NAME': "diagnosis",
}

TCP_CONN = {
    'bright_screen': "shell \"power-shell setmode 602\"",
    'tmode': "{}{}".format("tmode port ", TCP_CFG['port']),
    'tconn': "{}{}{}{}".format("tconn ", TCP_CFG['ip'], ":", TCP_CFG['port']),
}

PATH = {
    'file_send': {
        'local': "{}{}".format(TEST_FILE_PATH, "hdc.log"),
        'remote': "{}{}".format(REMOTE_PATH, "hdc.log")
    },
    'dir_send': {
        'local': "{}{}".format(TEST_FILE_PATH, "log"),
        'remote': "{}{}".format(REMOTE_PATH, "log")
    },
    'file_recv': {
        'remote': "{}{}".format(REMOTE_PATH, "hdc.log"),
        'local': "{}{}".format(TEST_FILE_PATH, "dev_data")
    },
    'dir_recv': {
        'remote': "{}{}".format(REMOTE_PATH, "log"),
        'local': "{}{}".format(TEST_FILE_PATH, "hdc\\log")
    },
    'file_empty': {
        'local': "{}{}".format(TEST_FILE_PATH, "empty.txt"),
        'remote': "{}{}".format(REMOTE_PATH, "empty.txt")
    }
}

EXTRA_COMMANDS = {
    'global': ["kill -r", "kill", "-l5 start", "start -r", "-v", "version", "checkserver"],
    'smode': ["smode -r", "smode"],
    'boot': ["target boot"],
    'choose': "-t "
}

BASIC_COMMANDS = {
    'shell': [
        "shell \"ls\""
    ],
    'component': [
        "list targets", "list targets -v", "target mount"
    ],
    'file_task': [
        "{}{}{}{}".format("file send ", PATH['file_send']['local'], " ", PATH['file_send']['remote']),
        "{}{}{}{}".format("file send ", PATH['file_empty']['local'], " ", PATH['file_empty']['remote']),
        "{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_send']['remote']),
        "{}{}{}{}".format("file recv ", PATH['file_recv']['remote'], " ", PATH['file_recv']['local']),
        "{}{}{}{}".format("file recv ", PATH['file_empty']['remote'], " ", PATH['file_empty']['local']),
        "{}{}{}{}".format("file recv ", PATH['dir_recv']['remote'], " ", PATH['dir_recv']['local'])
    ],
    'fport_task': [
        "fport tcp:1234 tcp:1080",
        "rport tcp:13608 localabstract:8888BananaBanana",
        "fport ls",
        "fport rm tcp:1234 tcp:1080"
    ],
    'install_task': [
        "{}{}{}".format("install ", TEST_FILE_PATH, HAP_ONE['HAP_NAME']),
        "{}{}".format("uninstall ", HAP_ONE['PACKAGE_NAME'])
    ]
}

TEST_FILES = {
    'one': {
        'send_file': "{}{}".format(TEST_FILE_PATH, "100M.txt"),
        'send_file_one': "{}{}".format(REMOTE_PATH, "a100M.txt"),
        'send_file_two': "{}{}".format(REMOTE_PATH, "c100M.txt"),
        'recv_file': "{}{}".format(REMOTE_PATH, "recv100M.txt"),
        'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv100M.txt"),
        'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv200M.txt"),
    },
    'two': {
        'send_file': "{}{}".format(TEST_FILE_PATH, "hdc_file.log"),
        'send_file_one': "{}{}".format(REMOTE_PATH, "send_one.log"),
        'send_file_two': "{}{}".format(REMOTE_PATH, "send_two.log"),
        'recv_file': "{}{}".format(REMOTE_PATH, "hdcd.log"),
        'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv_one.log"),
        'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv_two.log"),
    }
}


def command_judge(cmd):
    ret = False
    cmd_parts = cmd.split()
    if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
        ret = True
    if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
        ret = True
    if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
        ret = True
    return ret


def command_callback(cmd, head, need_del, res=""):
    cmd_parts = cmd.split()
    if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
        if need_del:
            assert "FileTransfer finish" in res
        check_file_send(cmd_parts[2], cmd_parts[3], head, need_del)
    if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
        if need_del:
            assert "FileTransfer finish" in res
        check_file_recv(cmd_parts[2], cmd_parts[3], head, need_del)
    if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
        check_install(head, res)
    if cmd == 'smode':
        time.sleep(4)
        check_root(head)
    if cmd == 'smode -r':
        time.sleep(4)
        check_user(head)
    if cmd == 'target boot':
        time.sleep(35)


def check_file_send(local_file, remote_file, head, need_del):
    ret = get_win_file_type(local_file)
    file_type = ""
    if 'file' in ret:
        file_type = '-f'
    if "dir" in ret:
        file_type = '-d'
    res = run_command_stdout("{}{}{}{}{}".format("shell \"[ ", file_type, " ", remote_file,
                             " ] && echo yes || echo no\""), head)
    assert 'yes' in res
    if file_type == '-d' or 'empty.txt' in local_file:
        rm_send_file(remote_file, head, need_del)
        return
    local_md5 = get_win_md5(local_file)
    remote_md5 = get_md5(remote_file, head)
    rm_send_file(remote_file, head, need_del)
    assert local_md5 == remote_md5
    print("check_file_send success ", res)


def rm_send_file(file_path, head, need_del):
    if need_del:
        run_command("{}{}{}{}".format("shell \"rm -rf ", file_path, "\"", head))


def check_file_recv(remote_file, local_file, head, need_del):
    if 'dir' in get_win_file_type(local_file) or 'empty.txt' in local_file:
        rm_recv_file(local_file, need_del)
        return
    res = run_command_stdout("{}{}{}".format("attrib ", local_file, ""))
    local_md5 = get_win_md5(local_file)
    remote_md5 = get_md5(remote_file, head)
    assert '-' not in res
    if local_md5 != remote_md5:
        print("check_file_recv fail ", remote_file, local_file)
    assert local_md5 == remote_md5
    rm_recv_file(local_file, need_del)
    print("check_file_recv success ", res)


def get_win_file_type(file_path):
    ret = run_command_stdout("{}{}{}".format("if exist ", file_path, " echo yes"), '')
    assert "yes" in ret
    res = run_command_stdout("{}{}{}".format("dir/ad ", file_path, " >nul 2>nul && echo dir || echo file"), '')
    return res


def rm_recv_file(file_path, need_del):
    if need_del:
        res = get_win_file_type(file_path)
        if "dir" in res:
            run_command("{}{}".format("rmdir /s/q ", file_path), "")
        if "file" in res:
            run_command("{}{}".format("del ", file_path), "")


def check_install(head, res):
    print("check_install")
    print(res)
    if "msg:install bundle successfully." not in res:
        print("install msg error")
        assert False
    res = run_command_stdout("shell \"bm dump -a\"", head)
    if HAP_ONE['PACKAGE_NAME'] in res:
        print("check_install success ", HAP_ONE['PACKAGE_NAME'])
        assert True
    else:
        assert False


def check_root(head):
    res = run_command_stdout("shell \"whoami\"", head)
    print("check_root res: ", res)
    assert 'root' in res


def check_user(head):
    res = run_command_stdout("shell \"whoami\"", head)
    print("check_user res: ", res)
    assert 'shell' in res
    return 'shell' in res


def get_devs(head=NORMAL_HEAD):
    res = run_command_stdout(BASIC_COMMANDS['component'][0], head)
    devs = res.split()
    print(devs)
    return devs


def tmode_to_tcp():
    run_command(TCP_CONN['tmode'])
    res = run_command_stdout(TCP_CONN['tconn'])
    print(res)
    if "Connect OK" in res:
        return True
    return False


def remote_server_start(server_head):
    global IP
    cmd = "{}{}".format(server_head, "-m")
    print(cmd)
    os.popen(cmd)


def run_command(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
    command = "{}{}".format(head, cmd)
    if head != '':
        print(command)

    subprocess.Popen(command,
                     shell=True).communicate()
    if need_callback:
        command_callback(cmd, head, need_del)


def run_command_stdout(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
    command = "{}{}".format(head, cmd)
    if head != '' and 'echo' not in cmd:
        print(command)
    dec = "UTF-8"
    if head == '':
        dec = 'gbk'
    res = subprocess.Popen(command,
                           shell=True,
                           stdout=subprocess.PIPE).communicate()
    res = res[0].decode(dec)
    if need_callback:
        command_callback(cmd, head, need_del, res)
    return res


def run_commands(cmds, head=NORMAL_HEAD, need_del=True):
    for command in cmds:
        run_command(command, head, need_del)


def get_basic_commands():
    commands = []
    for tasks in BASIC_COMMANDS.values():
        commands += tasks
    return commands


def run_global_cmd():
    global_commands = EXTRA_COMMANDS['global']
    run_commands(global_commands)


def run_split_commands(commands, head=NORMAL_HEAD):
    for command in commands:
        if command_judge(command):
            run_command_stdout(command, head, False)
        else:
            run_command(command, head, False)


def run_device_cmd(head=NORMAL_HEAD):
    run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_recv']['remote']),
                       head, False)
    for smd in EXTRA_COMMANDS['smode']:
        run_command(smd)
        commands = get_basic_commands() + EXTRA_COMMANDS['boot']
        if smd == "smode -r":
            for item in commands:
                if "{}{}".format(REMOTE_PATH, "log") in item:
                    print(item)
                    commands.remove(item)
        run_split_commands(commands, head)


def extract_ip():
    global IP
    st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        st.connect(('10.255.255.255', 1))
        IP = st.getsockname()[0]
    except Exception:
        IP = ""
    finally:
        st.close()
    return IP


def mix_path(path, i):
    ret = path.find('.')
    if ret > 0:
        return "{}{}{}".format(path[:ret], i, path[ret:])
    return path


def file_send(send_path, recv_path, i, wait_time=0):
    time.sleep(wait_time)
    res = run_command_stdout("{}{}{}{}".format("file send ", send_path, " ", mix_path(recv_path, str(i))))
    print(res)


def file_recv(remote_path, recv_path, i, wait_time=0):
    time.sleep(wait_time)
    res = run_command_stdout("{}{}{}{}".format("file recv ", remote_path, " ", mix_path(recv_path, str(i))))
    print(res)


def get_win_md5(file_path):
    real_md5 = "win_md5"
    send_md5 = run_command_stdout("{}{}{}".format("certutil -hashfile ", os.path.abspath(file_path), " MD5"),
                                  "").split()
    for x in send_md5:
        if len(x) == 32:
            real_md5 = x
    return real_md5


def get_md5(file_path, head=NORMAL_HEAD):
    md5 = run_command_stdout("{}{}{}".format("shell \"md5sum ", file_path, "\""), head)
    return md5.split()[0]


class TestCommands:
    def test_file_cmd(self):
        print("HDC TEST: start test_file_cmd\n")
        for item in TEST_FILES:
            send_file = TEST_FILES[item]['send_file']
            send_file_one = TEST_FILES[item]['send_file_one']
            send_file_two = TEST_FILES[item]['send_file_two']

            recv_file = TEST_FILES[item]['recv_file']
            recv_file_one = TEST_FILES[item]['recv_file_one']
            recv_file_two = TEST_FILES[item]['recv_file_two']

            run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(PATH['file_empty']['local']), ' ',
                               PATH['file_empty']['remote']), NORMAL_HEAD, False, False)
            run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(send_file), ' ', recv_file),
                               NORMAL_HEAD, False)
            for i in range(10):
                wait_time = random.uniform(0, 1)
                if i == 0:
                    wait_time = 0
                print("{}{}".format("HDC TEST: start test_file_cmd \n", str(i)))
                send_one = threading.Thread(target=file_send, args=(os.path.abspath(send_file), send_file_one, i))
                send_two = threading.Thread(target=file_send,
                                            args=(os.path.abspath(send_file), send_file_two, i, wait_time))
                recv_one = threading.Thread(target=file_recv, args=(recv_file, os.path.abspath(recv_file_one), i))
                recv_two = threading.Thread(target=file_recv,
                                            args=(recv_file, os.path.abspath(recv_file_two), i, wait_time))

                send_one.start()
                send_two.start()
                recv_one.start()
                recv_two.start()
                send_one.join()
                send_two.join()
                recv_one.join()
                recv_two.join()

    def test_global_server(self):
        print("HDC TEST: start test_global_server_cmd\n")
        run_global_cmd()

    def test_device_cmd(self):
        print("HDC TEST: start test_device_cmd\n")
        devs = get_devs()
        if len(devs) == 1:
            run_device_cmd()
        if len(devs) > 1:
            for dev in devs:
                run_device_cmd("{}{}{}{}".format(NORMAL_HEAD, EXTRA_COMMANDS['choose'], dev, " "))

    def test_tcp_mode(self):
        print("HDC TEST: start test_tcp_mode\n")
        extract_ip()
        global IP
        if len(IP) == 0 or TCP_CFG['ip'] == '':
            print("请连接热点 配置TCP_CFG")
            return
        if tmode_to_tcp():
            commands = get_basic_commands()
            run_commands(commands, NORMAL_HEAD, False)

    def test_remote_server(self):
        time.sleep(10)
        print("HDC TEST: start test_remote_server\n")
        extract_ip()
        ret = run_command_stdout("kill")
        if 'finish' not in ret:
            print('test_remote_server kill server failed')
            return
        global IP
        server_head = "{}{}{}{}{}".format(NORMAL_HEAD, "-s ", IP, ":", "8710 ")
        thread_start = threading.Thread(target=remote_server_start(server_head))
        thread_start.start()
        thread_start.join()
        devs = get_devs(server_head)
        for dev in devs:
            head = "{}{}{}{}".format(server_head, EXTRA_COMMANDS['choose'], dev, " ")
            run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ",
                                                 PATH['dir_recv']['remote']), head, False)
            threading.Thread(target=run_split_commands(get_basic_commands(), head)).start()
        run_command("kill", server_head)