import re
import os
import sys
import shutil
import stat
import json
from precision_tool import logger
from security.type import check_dict_character
PATH_WHITE_LIST_REGEX = re.compile(r"[^_A-Za-z0-9/.-]")
MAX_READ_FILE_SIZE_4G = 4294967296
MAX_READ_FILE_SIZE_32G = 34359738368
MAX_READ_FILE_SIZE_512G = 549755813888
WRITE_FILE_NOT_PERMITTED_STAT = stat.S_IWGRP | stat.S_IWOTH | stat.S_IROTH | stat.S_IXOTH
READ_FILE_NOT_PERMITTED_STAT = stat.S_IWGRP | stat.S_IWOTH
def is_endswith_extensions(path, extensions):
result = False
if isinstance(extensions, (list, tuple)):
for extension in extensions:
if path.endswith(extension):
result = True
break
elif isinstance(extensions, str):
result = path.endswith(extensions)
return result
def get_valid_path(path, extensions=None):
if not path or len(path) == 0:
raise ValueError("The value of the path cannot be empty.")
if PATH_WHITE_LIST_REGEX.search(path):
raise ValueError("Input path contains invalid characters.")
if os.path.islink(os.path.abspath(path)):
raise ValueError("The value of the path cannot be soft link: {}.".format(path))
real_path = os.path.realpath(path)
file_name = os.path.split(real_path)[1]
if len(file_name) > 255:
raise ValueError("The length of filename should be less than 256.")
if len(real_path) > 4096:
raise ValueError("The length of file path should be less than 4096.")
if real_path != path and PATH_WHITE_LIST_REGEX.search(real_path):
raise ValueError("Input path contains invalid characters.")
if extensions and not is_endswith_extensions(path, extensions):
raise ValueError("The filename {} doesn't endswith \"{}\".".format(path, extensions))
return real_path
def is_belong_to_user_or_group(file_stat):
return file_stat.st_uid == os.getuid() or file_stat.st_gid in os.getgroups()
def check_write_directory(dir_name, check_user_stat=True):
real_dir_name = get_valid_path(dir_name)
if not os.path.isdir(real_dir_name):
raise ValueError("The file writen directory {} doesn't exist.".format(dir_name))
file_stat = os.stat(real_dir_name)
if check_user_stat and not sys.platform.startswith("win") and not is_belong_to_user_or_group(file_stat):
if os.geteuid() == 0:
logger.warning("The file writen directory %r doesn't belong to the current user or group."
" current user is root, continue", dir_name)
else:
raise ValueError("The file writen directory {} doesn't belong to the current user or group."
.format(dir_name))
if not os.access(real_dir_name, os.W_OK):
raise ValueError("Current user doesn't have writen permission to file writen directory {}.".format(dir_name))
def get_valid_read_path(path, extensions=None, size_max=MAX_READ_FILE_SIZE_4G, check_user_stat=True, is_dir=False):
real_path = get_valid_path(path, extensions)
if not is_dir and not os.path.isfile(real_path):
raise ValueError("The path {} doesn't exist or not a file.".format(path))
if is_dir and not os.path.isdir(real_path):
raise ValueError("The path {} doesn't exist or not a directory.".format(path))
file_stat = os.stat(real_path)
if check_user_stat and not sys.platform.startswith("win") and not is_belong_to_user_or_group(file_stat):
if os.geteuid() == 0:
logger.warning("The file %r doesn't belong to the current user or group. current user is root, continue",
path)
else:
raise ValueError("The file {} doesn't belong to the current user or group.".format(path))
if not os.access(real_path, os.R_OK) or file_stat.st_mode & stat.S_IRUSR == 0:
raise ValueError("Current user doesn't have read permission to the file {}.".format(path))
if not is_dir and size_max > 0 and file_stat.st_size > size_max:
raise ValueError("The file {} exceeds size limitation of {}.".format(path, size_max))
return real_path
def get_valid_write_path(path, extensions=None, check_user_stat=True, is_dir=False, warn_exists=True):
real_path = get_valid_path(path, extensions)
real_path_dir = real_path if is_dir else os.path.dirname(real_path)
check_write_directory(real_path_dir, check_user_stat=check_user_stat)
if not is_dir and os.path.exists(real_path):
if os.path.isdir(real_path):
raise ValueError("The file {} exist and is a directory.".format(path))
if check_user_stat and os.stat(real_path).st_uid != os.getuid():
raise ValueError("The file {} doesn't belong to the current user.".format(path))
if check_user_stat and os.stat(real_path).st_mode & WRITE_FILE_NOT_PERMITTED_STAT > 0:
raise ValueError("The file {} permission for others is not 0, or is group writable.".format(path))
if not os.access(real_path, os.W_OK):
raise ValueError("The file {} exist and not writable.".format(path))
if warn_exists:
logger.warning("%r already exist. The original file will be overwritten.", path)
return real_path
def json_safe_load(path, extensions="json", size_max=MAX_READ_FILE_SIZE_4G, key_max_len=512, check_user_stat=True):
path = get_valid_read_path(path, extensions, size_max, check_user_stat)
with open(path) as json_file:
raw_dict = json.load(json_file)
check_dict_character(raw_dict, key_max_len)
return raw_dict
def json_safe_dump(obj, path, indent=None, extensions="json", check_user_stat=True):
check_dict_character(obj)
write_path = get_valid_write_path(path, extensions, check_user_stat)
default_mode = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(write_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=default_mode), "w") as json_file:
json.dump(obj, json_file, indent=indent)