"""
-------------------------------------------------------------------------
This file is part of the MindStudio project.
Copyright (c) 2025 Huawei Technologies Co.,Ltd.
MindStudio is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
-------------------------------------------------------------------------
"""
import re
import os
import sys
import shutil
import stat
import json
from precision_tool import logger
from security.type import check_dict_character
PATH_WHITE_LIST_REGEX = re.compile(r"[^_A-Za-z0-9/.-]")
MAX_READ_FILE_SIZE_4G = 4294967296
MAX_READ_FILE_SIZE_32G = 34359738368
MAX_READ_FILE_SIZE_512G = 549755813888
WRITE_FILE_NOT_PERMITTED_STAT = stat.S_IWGRP | stat.S_IWOTH | stat.S_IROTH | stat.S_IXOTH
READ_FILE_NOT_PERMITTED_STAT = stat.S_IWGRP | stat.S_IWOTH
def is_endswith_extensions(path, extensions):
result = False
if isinstance(extensions, (list, tuple)):
for extension in extensions:
if path.endswith(extension):
result = True
break
elif isinstance(extensions, str):
result = path.endswith(extensions)
return result
def get_valid_path(path, extensions=None):
if not path or len(path) == 0:
raise ValueError("The value of the path cannot be empty.")
if PATH_WHITE_LIST_REGEX.search(path):
raise ValueError("Input path contains invalid characters.")
if os.path.islink(os.path.abspath(path)):
raise ValueError("The value of the path cannot be soft link: {}.".format(path))
real_path = os.path.realpath(path)
file_name = os.path.split(real_path)[1]
if len(file_name) > 255:
raise ValueError("The length of filename should be less than 256.")
if len(real_path) > 4096:
raise ValueError("The length of file path should be less than 4096.")
if real_path != path and PATH_WHITE_LIST_REGEX.search(real_path):
raise ValueError("Input path contains invalid characters.")
if extensions and not is_endswith_extensions(path, extensions):
raise ValueError("The filename {} doesn't endswith \"{}\".".format(path, extensions))
return real_path
def is_belong_to_user_or_group(file_stat):
return file_stat.st_uid == os.getuid() or file_stat.st_gid in os.getgroups()
def check_write_directory(dir_name, check_user_stat=True):
real_dir_name = get_valid_path(dir_name)
if not os.path.isdir(real_dir_name):
raise ValueError("The file writen directory {} doesn't exist.".format(dir_name))
file_stat = os.stat(real_dir_name)
if check_user_stat and not sys.platform.startswith("win") and not is_belong_to_user_or_group(file_stat):
if os.geteuid() == 0:
logger.warning("The file writen directory %r doesn't belong to the current user or group."
" current user is root, continue", dir_name)
else:
raise ValueError("The file writen directory {} doesn't belong to the current user or group."
.format(dir_name))
if not os.access(real_dir_name, os.W_OK):
raise ValueError("Current user doesn't have writen permission to file writen directory {}.".format(dir_name))
def get_valid_read_path(path, extensions=None, size_max=MAX_READ_FILE_SIZE_4G, check_user_stat=True, is_dir=False):
real_path = get_valid_path(path, extensions)
if not is_dir and not os.path.isfile(real_path):
raise ValueError("The path {} doesn't exist or not a file.".format(path))
if is_dir and not os.path.isdir(real_path):
raise ValueError("The path {} doesn't exist or not a directory.".format(path))
file_stat = os.stat(real_path)
if check_user_stat and not sys.platform.startswith("win") and not is_belong_to_user_or_group(file_stat):
if os.geteuid() == 0:
logger.warning("The file %r doesn't belong to the current user or group. current user is root, continue",
path)
else:
raise ValueError("The file {} doesn't belong to the current user or group.".format(path))
if not os.access(real_path, os.R_OK) or file_stat.st_mode & stat.S_IRUSR == 0:
raise ValueError("Current user doesn't have read permission to the file {}.".format(path))
if not is_dir and size_max > 0 and file_stat.st_size > size_max:
raise ValueError("The file {} exceeds size limitation of {}.".format(path, size_max))
return real_path
def get_valid_write_path(path, extensions=None, check_user_stat=True, is_dir=False, warn_exists=True):
real_path = get_valid_path(path, extensions)
real_path_dir = real_path if is_dir else os.path.dirname(real_path)
check_write_directory(real_path_dir, check_user_stat=check_user_stat)
if not is_dir and os.path.exists(real_path):
if os.path.isdir(real_path):
raise ValueError("The file {} exist and is a directory.".format(path))
if check_user_stat and os.stat(real_path).st_uid != os.getuid():
raise ValueError("The file {} doesn't belong to the current user.".format(path))
if check_user_stat and os.stat(real_path).st_mode & WRITE_FILE_NOT_PERMITTED_STAT > 0:
raise ValueError("The file {} permission for others is not 0, or is group writable.".format(path))
if not os.access(real_path, os.W_OK):
raise ValueError("The file {} exist and not writable.".format(path))
if warn_exists:
logger.warning("%r already exist. The original file will be overwritten.", path)
return real_path
def json_safe_load(path, extensions="json", size_max=MAX_READ_FILE_SIZE_4G, key_max_len=512, check_user_stat=True):
path = get_valid_read_path(path, extensions, size_max, check_user_stat)
with open(path) as json_file:
raw_dict = json.load(json_file)
check_dict_character(raw_dict, key_max_len)
return raw_dict
def json_safe_dump(obj, path, indent=None, extensions="json", check_user_stat=True):
check_dict_character(obj)
write_path = get_valid_write_path(path, extensions, check_user_stat)
default_mode = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(write_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=default_mode), "w") as json_file:
json.dump(obj, json_file, indent=indent)