import os
import random
import subprocess
import socket
import time
import threading
TCP_CFG = {
'ip': '',
'port': "8710",
}
NORMAL_HEAD = "hdc "
REMOTE_PATH = "/data/local/tmp/"
IP = ""
TEST_FILE_PATH = "{}{}".format(os.path.abspath("D:/Download/sdk/"), '\\')
HAP_ONE = {
'HAP_NAME': "entry-default-signed-debug.hap",
'PACKAGE_NAME': "diagnosis",
}
TCP_CONN = {
'bright_screen': "shell \"power-shell setmode 602\"",
'tmode': "{}{}".format("tmode port ", TCP_CFG['port']),
'tconn': "{}{}{}{}".format("tconn ", TCP_CFG['ip'], ":", TCP_CFG['port']),
}
PATH = {
'file_send': {
'local': "{}{}".format(TEST_FILE_PATH, "hdc.log"),
'remote': "{}{}".format(REMOTE_PATH, "hdc.log")
},
'dir_send': {
'local': "{}{}".format(TEST_FILE_PATH, "log"),
'remote': "{}{}".format(REMOTE_PATH, "log")
},
'file_recv': {
'remote': "{}{}".format(REMOTE_PATH, "hdc.log"),
'local': "{}{}".format(TEST_FILE_PATH, "dev_data")
},
'dir_recv': {
'remote': "{}{}".format(REMOTE_PATH, "log"),
'local': "{}{}".format(TEST_FILE_PATH, "hdc\\log")
},
'file_empty': {
'local': "{}{}".format(TEST_FILE_PATH, "empty.txt"),
'remote': "{}{}".format(REMOTE_PATH, "empty.txt")
}
}
EXTRA_COMMANDS = {
'global': ["kill -r", "kill", "-l5 start", "start -r", "-v", "version", "checkserver"],
'smode': ["smode -r", "smode"],
'boot': ["target boot"],
'choose': "-t "
}
BASIC_COMMANDS = {
'shell': [
"shell \"ls\""
],
'component': [
"list targets", "list targets -v", "target mount"
],
'file_task': [
"{}{}{}{}".format("file send ", PATH['file_send']['local'], " ", PATH['file_send']['remote']),
"{}{}{}{}".format("file send ", PATH['file_empty']['local'], " ", PATH['file_empty']['remote']),
"{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_send']['remote']),
"{}{}{}{}".format("file recv ", PATH['file_recv']['remote'], " ", PATH['file_recv']['local']),
"{}{}{}{}".format("file recv ", PATH['file_empty']['remote'], " ", PATH['file_empty']['local']),
"{}{}{}{}".format("file recv ", PATH['dir_recv']['remote'], " ", PATH['dir_recv']['local'])
],
'fport_task': [
"fport tcp:1234 tcp:1080",
"rport tcp:13608 localabstract:8888BananaBanana",
"fport ls",
"fport rm tcp:1234 tcp:1080"
],
'install_task': [
"{}{}{}".format("install ", TEST_FILE_PATH, HAP_ONE['HAP_NAME']),
"{}{}".format("uninstall ", HAP_ONE['PACKAGE_NAME'])
]
}
TEST_FILES = {
'one': {
'send_file': "{}{}".format(TEST_FILE_PATH, "100M.txt"),
'send_file_one': "{}{}".format(REMOTE_PATH, "a100M.txt"),
'send_file_two': "{}{}".format(REMOTE_PATH, "c100M.txt"),
'recv_file': "{}{}".format(REMOTE_PATH, "recv100M.txt"),
'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv100M.txt"),
'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv200M.txt"),
},
'two': {
'send_file': "{}{}".format(TEST_FILE_PATH, "hdc_file.log"),
'send_file_one': "{}{}".format(REMOTE_PATH, "send_one.log"),
'send_file_two': "{}{}".format(REMOTE_PATH, "send_two.log"),
'recv_file': "{}{}".format(REMOTE_PATH, "hdcd.log"),
'recv_file_one': "{}{}".format(TEST_FILE_PATH, "recv_one.log"),
'recv_file_two': "{}{}".format(TEST_FILE_PATH, "recv_two.log"),
}
}
def command_judge(cmd):
ret = False
cmd_parts = cmd.split()
if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
ret = True
if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
ret = True
if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
ret = True
return ret
def command_callback(cmd, head, need_del, res=""):
cmd_parts = cmd.split()
if 'file send' in cmd and cmd[:9] == 'file send' and len(cmd_parts) == 4:
if need_del:
assert "FileTransfer finish" in res
check_file_send(cmd_parts[2], cmd_parts[3], head, need_del)
if 'file recv' in cmd and cmd[:9] == 'file recv' and len(cmd_parts) == 4:
if need_del:
assert "FileTransfer finish" in res
check_file_recv(cmd_parts[2], cmd_parts[3], head, need_del)
if 'install' in cmd and cmd[:7] == 'install' and len(cmd_parts) == 2:
check_install(head, res)
if cmd == 'smode':
time.sleep(4)
check_root(head)
if cmd == 'smode -r':
time.sleep(4)
check_user(head)
if cmd == 'target boot':
time.sleep(35)
def check_file_send(local_file, remote_file, head, need_del):
ret = get_win_file_type(local_file)
file_type = ""
if 'file' in ret:
file_type = '-f'
if "dir" in ret:
file_type = '-d'
res = run_command_stdout("{}{}{}{}{}".format("shell \"[ ", file_type, " ", remote_file,
" ] && echo yes || echo no\""), head)
assert 'yes' in res
if file_type == '-d' or 'empty.txt' in local_file:
rm_send_file(remote_file, head, need_del)
return
local_md5 = get_win_md5(local_file)
remote_md5 = get_md5(remote_file, head)
rm_send_file(remote_file, head, need_del)
assert local_md5 == remote_md5
print("check_file_send success ", res)
def rm_send_file(file_path, head, need_del):
if need_del:
run_command("{}{}{}{}".format("shell \"rm -rf ", file_path, "\"", head))
def check_file_recv(remote_file, local_file, head, need_del):
if 'dir' in get_win_file_type(local_file) or 'empty.txt' in local_file:
rm_recv_file(local_file, need_del)
return
res = run_command_stdout("{}{}{}".format("attrib ", local_file, ""))
local_md5 = get_win_md5(local_file)
remote_md5 = get_md5(remote_file, head)
assert '-' not in res
if local_md5 != remote_md5:
print("check_file_recv fail ", remote_file, local_file)
assert local_md5 == remote_md5
rm_recv_file(local_file, need_del)
print("check_file_recv success ", res)
def get_win_file_type(file_path):
ret = run_command_stdout("{}{}{}".format("if exist ", file_path, " echo yes"), '')
assert "yes" in ret
res = run_command_stdout("{}{}{}".format("dir/ad ", file_path, " >nul 2>nul && echo dir || echo file"), '')
return res
def rm_recv_file(file_path, need_del):
if need_del:
res = get_win_file_type(file_path)
if "dir" in res:
run_command("{}{}".format("rmdir /s/q ", file_path), "")
if "file" in res:
run_command("{}{}".format("del ", file_path), "")
def check_install(head, res):
print("check_install")
print(res)
if "msg:install bundle successfully." not in res:
print("install msg error")
assert False
res = run_command_stdout("shell \"bm dump -a\"", head)
if HAP_ONE['PACKAGE_NAME'] in res:
print("check_install success ", HAP_ONE['PACKAGE_NAME'])
assert True
else:
assert False
def check_root(head):
res = run_command_stdout("shell \"whoami\"", head)
print("check_root res: ", res)
assert 'root' in res
def check_user(head):
res = run_command_stdout("shell \"whoami\"", head)
print("check_user res: ", res)
assert 'shell' in res
return 'shell' in res
def get_devs(head=NORMAL_HEAD):
res = run_command_stdout(BASIC_COMMANDS['component'][0], head)
devs = res.split()
print(devs)
return devs
def tmode_to_tcp():
run_command(TCP_CONN['tmode'])
res = run_command_stdout(TCP_CONN['tconn'])
print(res)
if "Connect OK" in res:
return True
return False
def remote_server_start(server_head):
global IP
cmd = "{}{}".format(server_head, "-m")
print(cmd)
os.popen(cmd)
def run_command(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
command = "{}{}".format(head, cmd)
if head != '':
print(command)
subprocess.Popen(command,
shell=True).communicate()
if need_callback:
command_callback(cmd, head, need_del)
def run_command_stdout(cmd, head=NORMAL_HEAD, need_del=True, need_callback=True):
command = "{}{}".format(head, cmd)
if head != '' and 'echo' not in cmd:
print(command)
dec = "UTF-8"
if head == '':
dec = 'gbk'
res = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE).communicate()
res = res[0].decode(dec)
if need_callback:
command_callback(cmd, head, need_del, res)
return res
def run_commands(cmds, head=NORMAL_HEAD, need_del=True):
for command in cmds:
run_command(command, head, need_del)
def get_basic_commands():
commands = []
for tasks in BASIC_COMMANDS.values():
commands += tasks
return commands
def run_global_cmd():
global_commands = EXTRA_COMMANDS['global']
run_commands(global_commands)
def run_split_commands(commands, head=NORMAL_HEAD):
for command in commands:
if command_judge(command):
run_command_stdout(command, head, False)
else:
run_command(command, head, False)
def run_device_cmd(head=NORMAL_HEAD):
run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ", PATH['dir_recv']['remote']),
head, False)
for smd in EXTRA_COMMANDS['smode']:
run_command(smd)
commands = get_basic_commands() + EXTRA_COMMANDS['boot']
if smd == "smode -r":
for item in commands:
if "{}{}".format(REMOTE_PATH, "log") in item:
print(item)
commands.remove(item)
run_split_commands(commands, head)
def extract_ip():
global IP
st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
st.connect(('10.255.255.255', 1))
IP = st.getsockname()[0]
except Exception:
IP = ""
finally:
st.close()
return IP
def mix_path(path, i):
ret = path.find('.')
if ret > 0:
return "{}{}{}".format(path[:ret], i, path[ret:])
return path
def file_send(send_path, recv_path, i, wait_time=0):
time.sleep(wait_time)
res = run_command_stdout("{}{}{}{}".format("file send ", send_path, " ", mix_path(recv_path, str(i))))
print(res)
def file_recv(remote_path, recv_path, i, wait_time=0):
time.sleep(wait_time)
res = run_command_stdout("{}{}{}{}".format("file recv ", remote_path, " ", mix_path(recv_path, str(i))))
print(res)
def get_win_md5(file_path):
real_md5 = "win_md5"
send_md5 = run_command_stdout("{}{}{}".format("certutil -hashfile ", os.path.abspath(file_path), " MD5"),
"").split()
for x in send_md5:
if len(x) == 32:
real_md5 = x
return real_md5
def get_md5(file_path, head=NORMAL_HEAD):
md5 = run_command_stdout("{}{}{}".format("shell \"md5sum ", file_path, "\""), head)
return md5.split()[0]
class TestCommands:
def test_file_cmd(self):
print("HDC TEST: start test_file_cmd\n")
for item in TEST_FILES:
send_file = TEST_FILES[item]['send_file']
send_file_one = TEST_FILES[item]['send_file_one']
send_file_two = TEST_FILES[item]['send_file_two']
recv_file = TEST_FILES[item]['recv_file']
recv_file_one = TEST_FILES[item]['recv_file_one']
recv_file_two = TEST_FILES[item]['recv_file_two']
run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(PATH['file_empty']['local']), ' ',
PATH['file_empty']['remote']), NORMAL_HEAD, False, False)
run_command_stdout("{}{}{}{}".format("file send ", os.path.abspath(send_file), ' ', recv_file),
NORMAL_HEAD, False)
for i in range(10):
wait_time = random.uniform(0, 1)
if i == 0:
wait_time = 0
print("{}{}".format("HDC TEST: start test_file_cmd \n", str(i)))
send_one = threading.Thread(target=file_send, args=(os.path.abspath(send_file), send_file_one, i))
send_two = threading.Thread(target=file_send,
args=(os.path.abspath(send_file), send_file_two, i, wait_time))
recv_one = threading.Thread(target=file_recv, args=(recv_file, os.path.abspath(recv_file_one), i))
recv_two = threading.Thread(target=file_recv,
args=(recv_file, os.path.abspath(recv_file_two), i, wait_time))
send_one.start()
send_two.start()
recv_one.start()
recv_two.start()
send_one.join()
send_two.join()
recv_one.join()
recv_two.join()
def test_global_server(self):
print("HDC TEST: start test_global_server_cmd\n")
run_global_cmd()
def test_device_cmd(self):
print("HDC TEST: start test_device_cmd\n")
devs = get_devs()
if len(devs) == 1:
run_device_cmd()
if len(devs) > 1:
for dev in devs:
run_device_cmd("{}{}{}{}".format(NORMAL_HEAD, EXTRA_COMMANDS['choose'], dev, " "))
def test_tcp_mode(self):
print("HDC TEST: start test_tcp_mode\n")
extract_ip()
global IP
if len(IP) == 0 or TCP_CFG['ip'] == '':
print("请连接热点 配置TCP_CFG")
return
if tmode_to_tcp():
commands = get_basic_commands()
run_commands(commands, NORMAL_HEAD, False)
def test_remote_server(self):
time.sleep(10)
print("HDC TEST: start test_remote_server\n")
extract_ip()
ret = run_command_stdout("kill")
if 'finish' not in ret:
print('test_remote_server kill server failed')
return
global IP
server_head = "{}{}{}{}{}".format(NORMAL_HEAD, "-s ", IP, ":", "8710 ")
thread_start = threading.Thread(target=remote_server_start(server_head))
thread_start.start()
thread_start.join()
devs = get_devs(server_head)
for dev in devs:
head = "{}{}{}{}".format(server_head, EXTRA_COMMANDS['choose'], dev, " ")
run_command_stdout("{}{}{}{}".format("file send ", PATH['dir_send']['local'], " ",
PATH['dir_recv']['remote']), head, False)
threading.Thread(target=run_split_commands(get_basic_commands(), head)).start()
run_command("kill", server_head)