import datetime
import subprocess
import os
import stat
import logging
from multiprocessing import Pool
import tqdm
cwd = os.getcwd()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_files_list(args=""):
"""
This function returns a list of files for the input args.
"""
if args == "":
cmd = f"hdc shell find"
else:
cmd = f"hdc shell find {args}"
output = subprocess.check_output(cmd.split()).decode()
return output.split('\r\n')[:-1]
def ls_z_check(path):
"""
List all files in directory with selinux context.
"""
cmd = f"hdc shell \"ls -alZ {path}\""
output = subprocess.check_output(cmd.split()).decode()
return output.replace('\r\n', ' ').replace(',', ' ').replace('\r', ' ')
def file_copy_check(local, remote, file_id):
"""
Check if the file has been successfully copied and could be transferred by hdc.
"""
if file_id >= 0:
pass
else:
return f"file_id error"
check_ret = False
recv_ret = False
copy_output = ""
check_output = ""
recv_output = ""
remove_output = ""
list_z_check = ls_z_check(remote)
if "ls:" not in list_z_check:
check_ret = True
local_real = os.path.join(local, str(file_id))
if local != "":
if os.path.exists(local):
pass
else:
os.mkdir(local)
device_tmp_dir = "data/local/tmp"
copy_cmd = f"hdc shell mkdir -p {device_tmp_dir}/{file_id} && cp -rf {remote} {device_tmp_dir}/{file_id}/"
shell_cmd = f"hdc shell ls -alZ {device_tmp_dir}/{file_id}/"
recv_cmd = f"hdc file recv {device_tmp_dir}/{file_id} {local}"
remove_cmd = f"hdc shell rm -rf {device_tmp_dir}/{file_id}"
copy_output = subprocess.check_output(copy_cmd.split()).decode().replace(',', ' ')
check_output = subprocess.check_output(shell_cmd.split()).decode().replace(',', ' ')
recv_output = subprocess.check_output(recv_cmd.split()).decode().replace(',', ' ')
remove_output = subprocess.check_output(remove_cmd.split()).decode().replace(',', ' ')
if os.path.exists(local_real):
recv_ret = True
out_string = f"{check_ret},{recv_ret},{remote},{local}\
{list_z_check},{copy_output},{check_output},{recv_output},{remove_output}"
return out_string
def file_recv_check(local, remote, file_id):
"""
Check if the file could be transferred by hdc directly.
"""
if file_id >= 0:
pass
else:
return f"file_id error"
check_ret = False
recv_ret = False
recv_output = ""
list_z_check = ls_z_check(remote)
local_real = os.path.join(local, str(file_id))
if "ls:" not in list_z_check:
check_ret = True
if local != "":
if os.path.exists(local):
pass
else:
os.mkdir(local)
recv_cmd = f"hdc file recv {remote} {local_real}"
recv_output = run_command_with_timeout(recv_cmd, timeout=60)[0].replace(',', ' ')
if "FileTransfer finish" in recv_output:
recv_ret = True
elif os.path.exists(local_real):
recv_ret = True
out_string = f"{check_ret},{recv_ret},{remote},{local},{list_z_check},{recv_output}"
return out_string
def run_command_with_timeout(command, timeout):
"""
Run a command with timeout.
"""
try:
result = subprocess.run(command, check=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, timeout=timeout)
return result.stdout.decode(), result.stderr.decode()
except subprocess.TimeoutExpired:
return "", "Command timed out"
except subprocess.CalledProcessError as e:
return "", e.stderr.decode()
def write_list_csv(device_path="", host_tmp_dir=""):
"""
Write the output of the file check result to a csv file.
"""
list_file = get_files_list(device_path)
save_file = os.path.join(host_tmp_dir, "files_list.csv")
flags = os.O_WRONLY | os.O_CREAT
modes = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(save_file, flags, modes), 'w') as fd:
for file_name in list_file:
if file_name != "":
fd.write(f"{file_name}\n")
logging.info("List of files written to %s", save_file)
def process_files(tmp_dir, files, process_id, chunk_size, action):
"""
Process files in chunks.
"""
save_dir = os.path.join(tmp_dir, f"recv_{process_id}_dir")
save_csv = os.path.join(tmp_dir, f"recv_files_list_{process_id}.csv")
if not os.path.exists(save_dir):
os.mkdir(save_dir)
cnt = 0
flags = os.O_WRONLY | os.O_CREAT
modes = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(save_csv, flags, modes), 'w') as fd:
for file in tqdm.tqdm(files):
file_name = file.strip()
cnt += 1
file_id = process_id * chunk_size + cnt
if action == 'recv':
out_string = file_recv_check(save_dir, file_name, file_id)
elif action == 'copy':
out_string = file_copy_check(save_dir, file_name, file_id)
else:
out_string = ""
data = f"{str(file_id)},{out_string}".replace('\r\n', ' ').replace('\r', ' ')
fd.write(f"{data}\n")
def split_files(file_name, split_num):
"""
Multiprocess split file input list into chunks.
"""
if not os.path.exists(file_name):
logging.error("File %s not found", file_name)
return []
if split_num <= 0:
logging.error("Split number must be greater than 0")
return []
all_list = []
with open(file_name, 'r') as fd:
lines = fd.readlines()
size = len(lines)
chunk_size = round(size / split_num)
for i in range(split_num):
start = i * chunk_size
end = start + chunk_size
if i == split_num - 1:
end = size
table_list = lines[start:end]
all_list.append(table_list)
return all_list
def merge_file(tmp_dir, file_num, input_name, action='recv'):
"""
Merge ouput csv into single file.
"""
files = []
header = ""
for i in range(file_num):
file = os.path.join(tmp_dir, f"{input_name}_{i}.csv")
files.append(file)
new_file = os.path.join(tmp_dir, f"merge_dir.csv")
if action == 'recv':
header = "file_id,ls result,recv result,full path,save dir,ls output,recv output\n"
elif action == 'copy':
header = "file_id,ls result,recv result,full path,dave dir,ls output,copy output,copied ls output,\
recv output,remove output\n"
flags = os.O_WRONLY | os.O_CREAT
modes = stat.S_IRUSR | stat.S_IWUSR
with os.fdopen(os.open(new_file, flags, modes), 'w') as fd:
fd.write(header)
for file in files:
readline_merge(file, fd)
def readline_merge(input_file, output_fd):
"""
Read file line by line and write to output file.
"""
with open(input_file, 'r') as f:
lines = f.readlines()
for line in lines:
output_fd.write(line)
def run_file_check_all(tmp_dir, subprocess_num, action):
"""
Run file check, for all the files.
"""
file_name = os.path.join(tmp_dir, "files_list.csv")
all_list = split_files(file_name, subprocess_num)
results = []
input_name = "recv_files_list"
with Pool(processes=subprocess_num) as pool:
for i in range(subprocess_num):
result = pool.apply_async(process_files, args=(tmp_dir, all_list[i], i, len(all_list[0]), action))
results.append(result)
pool.close()
pool.join()
merge_file(tmp_dir, subprocess_num, input_name, action)
def main():
"""
Main function.
"""
time_stamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H")
tmp_dir = os.path.join(cwd, time_stamp)
save_file = os.path.join(tmp_dir, "files_list.csv")
if not os.path.exists(save_file):
input("第一次执行请使用root版本,\n请按下[ENTER]确认开始收集")
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
args = "/ -type d 2>/dev/null"
write_list_csv(args, tmp_dir)
else:
subprocess_num = 14
num = input("第二次执行请使用user版本,\n请确保第一次执行和第二次执行处在同一小时\n"\
"请输入数字选择测试内容:\n1. [recv]user版本文件直接收取\n2. [copy]user版本文件在cp后间接收取\n")
action = ""
if num == "1":
action = 'recv'
elif num == "2":
action = 'copy'
else:
logging.error("输入错误,测试程序已退出,请重新执行")
return
run_file_check_all(tmp_dir, subprocess_num, action)
if __name__ == "__main__":
main()