import os
import stat
import threading
import time
import logging
import multiprocessing
import pytest
from utils import GP, run_command_with_timeout, get_shell_result, \
check_shell, check_version, get_local_path, rmdir, load_gp, get_hdcd_pss, get_hdcd_fd_count
SEP = "/"
MOUNT_POINT = "storage"
TEST_FILE_SIZE = 20
SEND_FILE_PROCESS_COUNT = 25
FD_THRESHOLD = 2
TEST_FILE_CASE_TABLE = [
(False, False, True),
(False, False, False),
(False, True, True),
(False, True, False),
(True, False, True),
(True, False, False),
(True, True, True),
(True, True, False),
]
logger = logging.getLogger(__name__)
def create_test_file(file_path, size, random=False):
flags = os.O_CREAT | os.O_WRONLY | os.O_EXCL
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(file_path, flags, modes), 'wb') as f:
if random:
f.write(os.urandom(size * 1024))
else:
f.seek(size * 1024 - 1)
f.write(b'\xff')
def create_binary_tree(depth, path='.', size=TEST_FILE_SIZE, random=False):
if depth == 0:
create_test_file(os.path.join(path, f'{size}KB.1.bin'), size, random=random)
create_test_file(os.path.join(path, f'{size}KB.2.bin'), size, random=random)
else:
left_path = os.path.join(path, '1')
right_path = os.path.join(path, '2')
os.makedirs(left_path, exist_ok=True)
os.makedirs(right_path, exist_ok=True)
create_binary_tree(depth - 1, left_path, size)
create_binary_tree(depth - 1, right_path, size)
class TestFileNoSpaceFdLeak:
"""
直接填满磁盘空间进行文件传输,传输后查询fd泄露状态。
"""
fd_count = 0
pss = 0
@staticmethod
def send_file_to_storage(is_compress=False, is_directory=False, is_zero=False, is_mix=False):
compress_command = "-z" if is_compress else ""
single_file_name = "medium" if not is_zero else "word_100M.txt"
single_dir_name = "tree_rand" if not is_zero else "tree_zero"
local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
target_path = "it_nospace" if is_directory else "it_nospace.bin"
if is_mix:
local_path = get_local_path(".")
target_path = "it_nospace_mix"
re_send_time = 20
for i in range(1, re_send_time + 1):
logger.info("send %d times", i)
output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 25)
if "Command timed out" in error_str:
logger.warning("error_str: %s", error_str)
return False
if "space left on device" not in output_str:
logger.warning(f"output_str: %s", output_str)
return False
return True
def teardown_class(self):
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
rmdir(get_local_path("tree_rand"))
rmdir(get_local_path("tree_zero"))
pss_now = get_hdcd_pss()
if self.pss == 0 or pss_now == 0:
logger.error("get hdcd mem pss failed")
if pss_now > (self.pss + 50):
logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
def setup_class(self):
depth = 10
if not os.path.exists(get_local_path("tree_rand")):
create_binary_tree(depth, get_local_path("tree_rand"), random=True)
if not os.path.exists(get_local_path("tree_zero")):
create_binary_tree(depth, get_local_path("tree_zero"), random=False)
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
assert check_shell(f"shell dd if={SEP}dev/zero bs=500M count=24 of={SEP}storage/it_full.img",
"space left on device")
time.sleep(1)
self.fd_count = get_hdcd_fd_count()
self.pss = get_hdcd_pss()
if self.pss == 0:
logger.error("get hdcd mem pss failed")
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
@pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
def test_file_normal(self, is_compress, is_directory, is_zero):
assert self.send_file_to_storage(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
pss_now = get_hdcd_pss()
if self.pss == 0 or pss_now == 0:
logger.error("get hdcd mem pss failed")
if pss_now > (self.pss + 50):
logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
@pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
def test_file_mix(self, is_compress):
assert self.send_file_to_storage(is_compress=is_compress, is_mix=True)
pss_now = get_hdcd_pss()
if self.pss == 0 or pss_now == 0:
logger.error("get hdcd mem pss failed")
if pss_now > (self.pss + 50):
logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
def test_file_fd_leak_proc(self):
assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
def test_file_fd_count(self):
time.sleep(1)
assert get_hdcd_fd_count() <= (self.fd_count + FD_THRESHOLD)
class TestFileReFullSpaceFdLeak:
"""
磁盘空间接近满情况,进行文件传输,不断地删除并重传,传输后查询fd泄露状态。
"""
stop_flag = threading.Event()
fd_count = 0
pss = 0
def re_create_file(self, num=600):
for i in range(1, num):
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*;"
f" dd if={SEP}dev/zero bs=1M count=10240 of="
f"{SEP}storage/it_full.img")
logger.info("re create file count:%d", i)
if self.stop_flag.is_set():
break
def re_send_file(self, is_compress=False, is_directory=False, is_zero=False, is_mix=False):
re_create_file_thread = threading.Thread(target=self.re_create_file)
re_create_file_thread.start()
result = True
compress_command = "-z" if is_compress else ""
single_file_name = "medium" if not is_zero else "word_100M.txt"
single_dir_name = "tree_rand" if not is_zero else "tree_zero"
local_path = get_local_path(single_dir_name) if is_directory else get_local_path(single_file_name)
target_path = "it_nospace" if is_directory else "it_nospace.bin"
if is_mix:
local_path = get_local_path('.')
target_path = "it_nospace_mix"
re_send_time = 10
for i in range(1, re_send_time + 1):
output_str, error_str = run_command_with_timeout(f"{GP.hdc_head} "
f"file send {compress_command} {local_path} {SEP}{MOUNT_POINT}/{target_path}_{i}", 25)
logger.info("output:%s,error:%s", output_str, error_str)
if "Command timed out" in error_str:
logger.warning("Command timed out")
result = False
break
if "Transfer Stop" not in output_str:
logger.warning("Transfer Stop NOT FOUNT")
result = False
break
self.stop_flag.set()
re_create_file_thread.join()
return result
def setup_class(self):
depth = 10
if not os.path.exists(get_local_path("tree_rand")):
create_binary_tree(depth, get_local_path("tree_rand"), random=True)
if not os.path.exists(get_local_path("tree_zero")):
create_binary_tree(depth, get_local_path("tree_zero"), random=False)
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
storage_size = get_shell_result(
f"shell \"df {SEP}{MOUNT_POINT} | grep {MOUNT_POINT}\"").split()[3]
logger.info("storage size =%s", storage_size)
assert int(storage_size) >= 10
gen_size = int(storage_size) - 10
logger.info("gen size = %d", gen_size)
check_shell(f"shell dd if={SEP}dev/zero bs=1K count={gen_size} of="
f"{SEP}{MOUNT_POINT}/gen_{gen_size}.img")
time.sleep(1)
self.fd_count = get_hdcd_fd_count()
self.pss = get_hdcd_pss()
if self.pss == 0:
logger.error("get hdcd mem pss failed")
def teardown_class(self):
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/gen_*")
rmdir(get_local_path("tree_rand"))
rmdir(get_local_path("tree_zero"))
pss_now = get_hdcd_pss()
if self.pss == 0 or pss_now == 0:
logger.error("get hdcd mem pss failed")
if pss_now > (self.pss + 50):
logger.warning("hdcd mem pss leak, original value %d, now value %d", self.pss, pss_now)
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
@pytest.mark.parametrize("is_compress, is_directory, is_zero", TEST_FILE_CASE_TABLE,
ids=[f"is_compress:{is_compress}, is_directory:{is_directory}, is_zero:{is_zero}"
for is_compress, is_directory, is_zero in TEST_FILE_CASE_TABLE])
def test_file_normal(self, is_compress, is_directory, is_zero):
assert self.re_send_file(is_compress=is_compress, is_directory=is_directory, is_zero=is_zero)
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
@pytest.mark.parametrize("is_compress", [True, False], ids=["is_compress:True", "is_compress:False"])
def test_file_mix(self, is_compress):
assert self.re_send_file(is_compress=is_compress, is_mix=True)
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
def test_file_compress_z_fd_proc(self):
assert not check_shell(f"shell ls -al {SEP}proc/`pidof hdcd`/fd", "it_nospace")
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
def test_file_compress_z_fd_count(self):
time.sleep(1)
assert get_hdcd_fd_count() <= (self.fd_count + FD_THRESHOLD)
class TestFileNoSpaceFdFullCrash:
fd_count = "0"
pid = "0"
plist = list()
@staticmethod
def teardown_class(self):
for p in self.plist:
p.join()
@staticmethod
def new_process_run(cmd):
with open(os.devnull, 'w') as devnull:
old_stdout = os.dup2(devnull.fileno(), 1)
old_stderr = os.dup2(devnull.fileno(), 2)
try:
check_shell(f"{cmd}")
finally:
os.dup2(old_stdout, 1)
os.dup2(old_stderr, 2)
def setup_class(self):
depth = 10
if not os.path.exists(get_local_path("tree")):
create_binary_tree(depth, get_local_path("tree"), random=True)
check_shell(f"shell rm -rf {SEP}{MOUNT_POINT}/it_*")
check_shell(f"shell dd if={SEP}dev/zero bs=1K count=10 of={SEP}storage/smallfile")
check_shell(f"shell dd if={SEP}dev/zero bs=1M of={SEP}storage/largefile")
check_shell(f"shell df {SEP}storage")
check_shell(f"shell rm -rf {SEP}storage/smallfile")
time.sleep(1)
check_shell(f"shell ls")
self.pid = get_shell_result(f"shell pidof hdcd").split("\r")[0]
self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
@pytest.mark.L2
@check_version("Ver: 3.1.0e")
def test_file_fd_full_no_crash(self):
for _ in range(1, SEND_FILE_PROCESS_COUNT):
cmd = f"file send {get_local_path('tree')} {SEP}storage/it_tree"
p = multiprocessing.Process(target=self.new_process_run, args=(cmd,))
p.start()
self.plist.append(p)
last_fd_count = 0
loop_count = 0
equal_count = 0
while True:
self.fd_count = get_shell_result(f"shell ls {SEP}proc/{self.pid}/fd | wc -l")
logger.warning("fd_count is %s", ("ff" + self.fd_count))
try:
if int(self.fd_count) >= 32768:
break
if int(self.fd_count) < last_fd_count:
logger.warning("fd count decrease.")
assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")
run_command_with_timeout(f"{GP.hdc_head} kill", 3)
return
if int(self.fd_count) == last_fd_count:
equal_count += 1
if equal_count > 20:
logger.warning("equal 20 times.")
assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")
run_command_with_timeout(f"{GP.hdc_head} kill", 3)
return
last_fd_count = int(self.fd_count)
loop_count += 1
if loop_count % 5 == 0:
logger.warning("fd count is:%s", self.fd_count)
except ValueError:
logger.warning("ValueError")
break
except TypeError:
logger.warning("TypeError output:%s", self.fd_count)
break
run_command_with_timeout(f"{GP.hdc_head} kill", 3)
run_command_with_timeout(f"{GP.hdc_head} kill", 3)
run_command_with_timeout(f"{GP.hdc_head} wait", 3)
time.sleep(3)
assert not check_shell(f"shell ls data/log/faultlog/faultlogger/", "-hdcd-")