import os
import stat
import unittest
from unittest.mock import patch
from mindie_llm.runtime.utils.helpers.safety.file import (
safe_open,
check_user_write_permission,
check_uid_permission,
check_world_write_permission,
check_file_permission,
check_file_size_lt,
check_file_safety,
safe_readlines,
safe_listdir,
safe_chmod,
)
class TestFile(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_dir = os.path.dirname(os.path.abspath(__file__))
cls.file_path = os.path.join(cls.test_dir, "test.txt")
with safe_open(cls.file_path, 'w', encoding='utf-8', permission_mode=0o640) as fw:
fw.write("This is a test file." + "\n")
cls.symbolic_link = os.path.join(cls.test_dir, "link_to_test.txt")
if os.path.lexists(cls.symbolic_link):
os.unlink(cls.symbolic_link)
os.symlink(cls.file_path, cls.symbolic_link)
@classmethod
def tearDownClass(cls):
os.remove(cls.file_path)
os.unlink(cls.symbolic_link)
def setUp(self):
with safe_open(self.file_path, 'w', encoding='utf-8', permission_mode=0o640) as fw:
fw.write("This is a test file." + "\n")
def tearDown(self):
pass
def test_safe_open_case_read_file_result_success(self):
file_path = self.file_path
mode = 'r'
encoding = 'utf-8'
permission_mode = 0o640
is_exist_ok = True
file_context = []
with safe_open(file_path, mode, encoding, permission_mode, is_exist_ok) as fr:
for line in fr.readlines():
file_context.append(line)
self.assertEqual(len(file_context), 1)
def test_safe_open_case_append_file_result_success(self):
file_path = self.file_path
mode = 'a'
encoding = 'utf-8'
permission_mode = 0o640
is_exist_ok = True
max_path_length = 1024
max_file_size = 1024 * 1024
check_link = True
file_context = []
with safe_open(file_path, mode, encoding, permission_mode, is_exist_ok, \
max_path_length=max_path_length, max_file_size=max_file_size, check_link=check_link) as fa:
fa.write("Test safe_open success." + "\n")
mode = 'r'
with safe_open(file_path, mode, encoding, permission_mode, is_exist_ok, \
max_path_length=max_path_length, max_file_size=max_file_size, check_link=check_link) as fr:
for line in fr.readlines():
file_context.append(line)
self.assertEqual(len(file_context), 2)
def test_check_file_size_lt_case_less_than_max_file_size_result_success(self):
path = self.file_path
max_file_size = 1024 * 1024
check_file_size_lt(path, max_file_size)
def test_check_file_size_lt_case_greater_than_max_file_size_result_raise_value_error(self):
path = self.file_path
max_file_size = 1
with self.assertRaises(ValueError) as context:
check_file_size_lt(path, max_file_size)
self.assertIn("The size of file should not be greater than", str(context.exception))
def test_check_uid_permission_case_the_cur_user_is_root_result_success(self):
path = self.file_path
with patch("os.geteuid") as mocked_geteuid, \
patch("os.getgid") as mocked_getgid, \
patch("os.stat") as mocked_stat:
path_stat = os.stat_result((0, 0, 0, 0, 1001, 1001, 0, 0, 0, 0))
mocked_geteuid.return_value = 0
mocked_getgid.return_value = 0
mocked_stat.return_value = path_stat
check_uid_permission(path)
def test_check_uid_permission_case_the_cur_user_is_path_owner_result_success(self):
path = self.file_path
with patch("os.geteuid") as mocked_geteuid, \
patch("os.getgid") as mocked_getgid, \
patch("os.stat") as mocked_stat:
path_stat = os.stat_result((0, 0, 0, 0, 1001, 1001, 0, 0, 0, 0))
mocked_geteuid.return_value = 1001
mocked_getgid.return_value = 1002
mocked_stat.return_value = path_stat
check_uid_permission(path)
def test_check_uid_permission_case_the_cur_user_and_path_owner_in_same_user_group_result_success(self):
path = self.file_path
with patch("os.geteuid") as mocked_geteuid, \
patch("os.getgid") as mocked_getgid, \
patch("os.stat") as mocked_stat:
path_stat = os.stat_result((0, 0, 0, 0, 1001, 1001, 0, 0, 0, 0))
mocked_geteuid.return_value = 1002
mocked_getgid.return_value = 1001
mocked_stat.return_value = path_stat
check_uid_permission(path)
def test_check_uid_permission_case_the_other_user_result_raise_permission_error(self):
path = self.file_path
with patch("os.geteuid") as mocked_geteuid, \
patch("os.getgid") as mocked_getgid, \
patch("os.stat") as mocked_stat:
path_stat = os.stat_result((0, 0, 0, 0, 1001, 1001, 0, 0, 0, 0))
mocked_geteuid.return_value = 1002
mocked_getgid.return_value = 1002
mocked_stat.return_value = path_stat
with self.assertRaises(PermissionError) as context:
check_uid_permission(path)
self.assertIn("The current user does not have permission to access the input path", str(context.exception))
def test_check_world_write_permission_case_not_have_other_write_permission_result_success(self):
file_path = self.file_path
check_world_write_permission(file_path)
def test_check_world_write_permission_case_have_other_write_permission_result_raise_permission_error(self):
file_path = self.file_path
os.chmod(file_path, 0o642)
with self.assertRaises(PermissionError) as context:
check_world_write_permission(file_path)
os.chmod(file_path, 0o640)
self.assertIn("The file should not be writable by others who are neither the owner nor in the group.", \
str(context.exception))
def test_check_file_permission_case_do_check_result_success(self):
file_path = self.file_path
check_file_permission(file_path)
def test_check_file_safety_case_file_exists_result_success(self):
file_path = self.file_path
mode = 'r'
is_exist_ok = True
max_file_size = 1024
is_check_file_size = True
check_file_safety(file_path, mode, is_exist_ok, max_file_size, is_check_file_size)
def test_check_file_safety_case_file_exists_but_not_is_exists_ok_result_raise_file_exists_error(self):
file_path = self.file_path
mode = 'r'
is_exist_ok = False
max_file_size = 1024
is_check_file_size = True
with self.assertRaises(FileExistsError) as context:
check_file_safety(file_path, mode, is_exist_ok, max_file_size, is_check_file_size)
self.assertIn("The file is expected not to exist, but it already does.", str(context.exception))
def test_check_file_safety_case_file_not_exists_result_success(self):
file_path = os.path.join(self.test_dir, "a_not_exists_file.txt")
mode = 'w'
is_exist_ok = True
max_file_size = 1024
is_check_file_size = True
check_file_safety(file_path, mode, is_exist_ok, max_file_size, is_check_file_size)
def test_check_file_safety_case_file_not_exists_but_mode_is_read_result_raise_file_not_found_error(self):
file_path = os.path.join(self.test_dir, "a_not_exists_file.txt")
mode = 'r'
is_exist_ok = True
max_file_size = 1024
is_check_file_size = True
with self.assertRaises(FileNotFoundError) as context:
check_file_safety(file_path, mode, is_exist_ok, max_file_size, is_check_file_size)
self.assertIn("The file is expected to exist, but it does not.", str(context.exception))
def test_safe_listdir_case_file_num_not_exceeds_result_success(self):
file_path = self.test_dir
max_file_num = 1024
file_list = safe_listdir(file_path, max_file_num)
self.assertListEqual(file_list, os.listdir(file_path))
def test_safe_listdir_case_file_num_exceeds_result_raise_value_error(self):
file_path = self.test_dir
max_file_num = 1
with self.assertRaises(ValueError) as context:
_ = safe_listdir(file_path, max_file_num)
self.assertIn("The file num in dir is", str(context.exception))
self.assertIn("which exceeds the limit", str(context.exception))
def test_safe_chmod_case_chmod_ok_result_success(self):
file_path = self.file_path
permission_mode = 0o600
safe_chmod(file_path, permission_mode)
file_stat = os.stat(file_path)
mode = stat.S_IMODE(file_stat.st_mode)
os.chmod(file_path, 0o640)
self.assertEqual(mode, permission_mode)
def test_check_user_write_permission_case_has_result_true(self):
file_path = self.file_path
has_permission = check_user_write_permission(file_path)
self.assertTrue(has_permission)
def test_check_user_write_permission_case_not_has_result_false(self):
file_path = self.file_path
os.chmod(file_path, 0o440)
has_permission = check_user_write_permission(file_path)
os.chmod(file_path, 0o640)
self.assertFalse(has_permission)
def test_safe_readlines_case_line_num_not_exceeds_result_success(self):
max_line_num = 1024 * 1024
with safe_open(self.file_path, 'r', encoding='utf-8', permission_mode=0o640) as fr:
lines = safe_readlines(fr, max_line_num)
with safe_open(self.file_path, 'r', encoding='utf-8', permission_mode=0o640) as fr:
lines_golden = fr.readlines()
self.assertListEqual(lines, lines_golden)
def test_safe_readlines_case_line_num_exceeds_result_raise_value_error(self):
max_line_num = 0
with safe_open(self.file_path, 'r', encoding='utf-8', permission_mode=0o640) as fr:
with self.assertRaises(ValueError) as context:
_ = safe_readlines(fr, max_line_num)
self.assertIn("The file line num is", str(context.exception))
self.assertIn("which exceeds the limit", str(context.exception))
if __name__ == '__main__':
unittest.main()