import os
import stat
import json
import yaml
import pytest
from ascend_utils.common import security
TEST_DIR = "/tmp/a_test_path_for_testing_automl_common/"
TEST_READ_FILE_NAME = TEST_DIR + "testfile.testfile"
USER_NOT_PERMITTED_READ_FILE = TEST_DIR + "testfile_not_readable.testfile"
OTHERS_READABLE_READ_FILE = TEST_DIR + "testfile_others_readable.testfile"
OTHERS_WRITABLE_READ_FILE = TEST_DIR + "testfile_others_writable.testfile"
USER_NOT_PERMITTED_WRITE_FILE = TEST_DIR + "testfile_not_writable/foo"
JSON_FILE = TEST_DIR + "testfile.json"
YAML_FILE = TEST_DIR + "testfile.yaml"
TEST_FILE = TEST_DIR + "testfile.test"
ORI_DATA = {
"a_long_key_name": 1,
12: "b",
3.14: "",
"c": {"d": 3, "e": 4},
True: "true",
False: "false",
None: "null"
}
OVER_WRITE_DATA = {"hello": "world"}
def setup_module():
os.makedirs(TEST_DIR, mode=int('700', 8), exist_ok=True)
default_mode = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(TEST_READ_FILE_NAME, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=default_mode), "w") \
as temp_file:
temp_file.write("a_test_file_name_for_testing_automl_common")
with os.fdopen(os.open(USER_NOT_PERMITTED_READ_FILE, os.O_CREAT, mode=000), "w"):
pass
with os.fdopen(os.open(OTHERS_READABLE_READ_FILE, os.O_CREAT, mode=default_mode), "w"):
pass
os.chmod(OTHERS_READABLE_READ_FILE, int('755', 8))
with os.fdopen(os.open(OTHERS_WRITABLE_READ_FILE, os.O_CREAT, mode=default_mode), "w"):
pass
os.chmod(OTHERS_WRITABLE_READ_FILE, int('666', 8))
dir_name = os.path.dirname(USER_NOT_PERMITTED_WRITE_FILE)
os.makedirs(dir_name, mode=int('500', 8), exist_ok=True)
with os.fdopen(os.open(JSON_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=default_mode), "w") as json_file:
json.dump(ORI_DATA, json_file)
with os.fdopen(os.open(YAML_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=default_mode), "w") as yaml_file:
yaml.dump(ORI_DATA, yaml_file)
def teardown_module():
os.remove(TEST_READ_FILE_NAME)
os.chmod(USER_NOT_PERMITTED_READ_FILE, int('600', 8))
os.remove(USER_NOT_PERMITTED_READ_FILE)
os.remove(OTHERS_READABLE_READ_FILE)
os.remove(OTHERS_WRITABLE_READ_FILE)
dir_name = os.path.dirname(USER_NOT_PERMITTED_WRITE_FILE)
os.chmod(dir_name, int('700', 8))
os.removedirs(dir_name)
os.remove(JSON_FILE)
os.remove(YAML_FILE)
if os.path.exists(TEST_FILE):
os.remove(TEST_FILE)
os.removedirs(TEST_DIR)
def test_get_valid_path_given_valid_when_any_then_pass():
security.get_valid_path('../anypath')
security.get_valid_path('../anypath/a')
def test_get_valid_path_given_invalid_when_any_then_value_error():
with pytest.raises(ValueError):
security.get_valid_path('../anypath*a')
with pytest.raises(ValueError):
security.get_valid_path('../anypath/\\a')
with pytest.raises(ValueError):
security.get_valid_path('../anypath/!a')
def test_get_valid_read_path_given_valid_when_any_then_pass():
security.get_valid_read_path(TEST_READ_FILE_NAME)
security.get_valid_read_path(TEST_READ_FILE_NAME, extensions='.testfile')
security.get_valid_read_path(OTHERS_READABLE_READ_FILE)
security.get_valid_read_path(OTHERS_WRITABLE_READ_FILE, check_user_stat=False)
def test_get_valid_read_path_given_invalid_when_any_then_value_error():
with pytest.raises(ValueError):
security.get_valid_read_path('./not_exist')
with pytest.raises(ValueError):
security.get_valid_read_path(TEST_READ_FILE_NAME, extensions='.json')
with pytest.raises(ValueError):
security.get_valid_read_path(TEST_READ_FILE_NAME, size_max=1)
with pytest.raises(ValueError):
security.get_valid_read_path(USER_NOT_PERMITTED_READ_FILE)
with pytest.raises(ValueError):
security.get_valid_read_path(OTHERS_WRITABLE_READ_FILE)
def test_check_write_directory_given_valid_when_any_then_pass():
security.check_write_directory(TEST_DIR)
def test_check_write_directory_given_invalid_when_any_then_error():
with pytest.raises(ValueError):
security.check_write_directory('not_exists')
def test_get_valid_write_path_given_valid_when_any_then_pass():
security.get_valid_write_path(TEST_READ_FILE_NAME, extensions='.testfile')
def test_get_valid_write_path_when_directory_not_exists():
with pytest.raises(ValueError):
security.get_valid_write_path('not_exists/README.md', extensions='.md')
@pytest.mark.skipif(
os.geteuid() == 0,
reason="root 用户跳过此用例"
)
def test_get_valid_write_path_when_no_write_permission():
with pytest.raises(ValueError):
security.get_valid_write_path(USER_NOT_PERMITTED_WRITE_FILE)
def test_yaml_safe_load_given_valid_when_any_then_pass():
security.yaml_safe_load(YAML_FILE)
def test_yaml_safe_load_given_invalid_when_any_then_value_error():
with pytest.raises(ValueError):
security.yaml_safe_load(TEST_READ_FILE_NAME)
with pytest.raises(ValueError):
security.yaml_safe_load(YAML_FILE, key_max_len=2)
def test_json_safe_load_given_valid_when_any_then_pass():
security.json_safe_load(JSON_FILE)
def test_json_safe_load_given_invalid_when_any_then_value_error():
with pytest.raises(ValueError):
security.json_safe_load(YAML_FILE)
with pytest.raises(ValueError):
security.json_safe_load(JSON_FILE, size_max=1)
def test_file_safe_write_given_valid_when_any_then_pass():
security.file_safe_write("hello world", TEST_FILE, ".test")
def test_file_safe_write_given_invalid_when_any_then_type_error():
with pytest.raises(TypeError):
security.file_safe_write(ORI_DATA, TEST_FILE, ".test")
def test_yaml_safe_dump_given_valid_when_over_write_then_pass():
security.yaml_safe_dump(ORI_DATA, YAML_FILE)
security.yaml_safe_dump(OVER_WRITE_DATA, YAML_FILE)
cur_dict = security.yaml_safe_load(YAML_FILE)
assert cur_dict == OVER_WRITE_DATA
def test_json_safe_dump_given_valid_when_over_write_then_pass():
security.json_safe_dump(ORI_DATA, JSON_FILE, indent=4)
security.json_safe_dump(OVER_WRITE_DATA, JSON_FILE)
cur_dict = security.json_safe_load(JSON_FILE)
assert cur_dict == OVER_WRITE_DATA
def test_safe_copy_file_given_valid_when_over_write_then_pass():
dest_path = TEST_READ_FILE_NAME + "_copy"
security.safe_copy_file(TEST_READ_FILE_NAME, dest_path)
security.safe_delete_path_if_exists(dest_path)
def test_safe_write_umask_given_valid_when_any_then_pass():
security.safe_delete_path_if_exists(TEST_FILE)
default_flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
fake_mode = stat.S_IWUSR | stat.S_IRUSR | stat.S_IWGRP | stat.S_IRGRP | stat.S_IWOTH | stat.S_IROTH
with security.SafeWriteUmask(), os.fdopen(os.open(TEST_FILE, default_flags, mode=fake_mode), "w") as write_file:
write_file.write("")
assert os.stat(TEST_FILE).st_mode & (stat.S_IWGRP | stat.S_IWOTH | stat.S_IROTH | stat.S_IXOTH) == 0