import json
import logging
import os
import re
import stat
from common_func.common import CommonConstant, print_info
from common_func.common import error
from common_func.common import is_linux
from common_func.constant import Constant
from common_func.empty_class import EmptyClass
from common_func.file_name_manager import FileNameManagerConstant
from common_func.ms_constant.number_constant import NumberConstant
from common_func.msprof_exception import ProfException
from common_func.path_manager import PathManager
from common_func.return_code_checker import ReturnCodeCheck
class FileManager:
"""
class to manage files
"""
FILE_AUTHORITY = 0o640
@staticmethod
def remove_file(file_path: str) -> bool:
"""
remove file
"""
try:
if os.path.isfile(file_path):
os.remove(file_path)
return True
return False
except OSError as err:
error(os.path.basename(__file__), str(err))
return False
@staticmethod
def match_process(pattern: str, file_name: str) -> any:
"""
match files
:param pattern: check pattern
:param file_name: check files
:return:
"""
matched_name = re.match(pattern, file_name)
if matched_name:
return matched_name
return EmptyClass("empty match name")
@classmethod
def is_info_json_file(cls: any, file_name: str) -> any:
"""
check whether file match info.json pattern
:param file_name: check files
:return: matched groups
"""
return cls.match_process(CommonConstant.INFO_JSON_PATTERN, file_name)
@classmethod
def is_valid_jobid(cls: any, check_string: str) -> any:
"""
check whether job id matches JOB_ID_PATTERN
:param check_string: check strings
:return: matched groups
"""
return cls.match_process(CommonConstant.JOB_ID_PATTERN, check_string)
@classmethod
def is_analyzed_data(cls: any, project_path: str) -> bool:
"""
return if data dir contains all parse complete file
"""
data_dir = PathManager.get_data_dir(project_path)
sqlite_dir = PathManager.get_sql_dir(project_path)
is_analyse = False
sqlite_list = os.listdir(sqlite_dir) if os.path.exists(sqlite_dir) else []
if os.path.exists(data_dir) and sqlite_list:
all_file_name = os.path.join(data_dir, FileNameManagerConstant.ALL_FILE_TAG)
if os.path.exists(all_file_name):
is_analyse = True
logging.warning("Data has been analyzed, path is %s, "
"there are %s in sqlite.", project_path, sqlite_list)
return is_analyse
@classmethod
def add_complete_file(cls: any, project_path: str, file_name: str) -> None:
"""
add complete file
"""
file_dir = PathManager.get_data_dir(project_path)
try:
if os.path.exists(os.path.join(file_dir, file_name)) and \
not file_name.endswith(Constant.COMPLETE_TAG):
file_path = os.path.join(file_dir, file_name + Constant.COMPLETE_TAG)
with FdOpen(file_path):
os.chmod(file_path, cls.FILE_AUTHORITY)
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
error(os.path.basename(__file__), err)
@classmethod
def storage_query_result_json_file(cls: any, collection_path: str, result_data: dict, file_name: str) -> None:
if not result_data:
return
query_path = os.path.join(collection_path, PathManager.QUERY_CLUSTER)
if not os.path.exists(query_path):
try:
os.makedirs(query_path, mode=NumberConstant.DIR_AUTHORITY)
except OSError:
error(os.path.basename(__file__),
"Storing data failed, you may not have the permission to write files in the current path.")
return
output_file_path = PathManager.get_query_result_path(collection_path, file_name)
check_path_valid(output_file_path, True)
try:
with FdOpen(output_file_path) as file:
json.dump(result_data, file)
except (OSError, SystemError, ValueError, TypeError, RuntimeError) as err:
error(os.path.basename(__file__), err)
else:
print_info(os.path.basename(__file__),
"The data has stored successfully, file path: {}".format(output_file_path))
class FileOpen:
"""
open and read file
"""
def __init__(self: any, file_path: str, mode: str = "r", max_size: int = Constant.MAX_READ_FILE_BYTES) -> None:
self.file_path = file_path
self.file_reader = None
self.mode = mode
self.max_size = max_size
def __enter__(self: any) -> any:
check_path_valid(self.file_path, True, max_size=self.max_size)
self.file_reader = open(self.file_path, self.mode)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file_reader:
self.file_reader.close()
class FdOpen:
"""
creat and write file
"""
def __init__(self: any, file_path: str, flags: int = Constant.WRITE_FLAGS, mode: int = Constant.WRITE_MODES,
operate: str = "w", newline: str = None) -> None:
self.file_path = file_path
self.flags = flags
self.newline = newline
self.mode = mode
self.operate = operate
self.fd = None
self.file_open = None
def __enter__(self: any) -> any:
file_dir = os.path.dirname(self.file_path)
check_dir_writable(file_dir)
self.fd = os.open(self.file_path, self.flags, self.mode)
if self.newline is None:
self.file_open = os.fdopen(self.fd, self.operate)
else:
self.file_open = os.fdopen(self.fd, self.operate, newline=self.newline)
return self.file_open
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file_open:
self.file_open.close()
elif self.fd:
os.close(self.fd)
def is_link(path: str) -> bool:
"""
软链接校验漏洞,在最后加一个/或者./可以获取到绝对路径进行校验
"""
if not path:
return False
path = os.path.abspath(path)
return os.path.islink(path)
def check_path_valid(path: str, is_file: bool, max_size: int = Constant.MAX_READ_FILE_BYTES) -> None:
"""
check the path is valid or not
:param path: file path
:param is_file: file or not
:param max_size: file's max size
:return: None
"""
if path == "":
raise ProfException(ProfException.PROF_INVALID_PARAM_ERROR, "The path is empty. Please enter a valid path.")
if not os.path.exists(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" does not exist. Please check that the path exists.")
if is_link(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" is link. Please check the path.")
if len(path) > NumberConstant.PROF_PATH_MAX_LEN:
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"Please ensure the length of input path \"{path}\" less than "
f"{NumberConstant.PROF_PATH_MAX_LEN}")
if is_file:
if not os.path.isfile(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" is not a file. Please check the path.")
if os.path.getsize(path) > max_size:
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" is too large to read. Please check the path.")
else:
if not os.path.isdir(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" is not a directory. Please check the path.")
if is_root_user():
return
if not os.access(path, os.R_OK):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" does not have permission to read. "
f"Please check that the path is readable.")
if is_other_writable(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" is writable by any other users.")
if not check_path_owner(path):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" owner is not root or current user.")
def check_db_path_valid(path: str, is_create: bool = False, max_size: int = Constant.MAX_READ_DB_FILE_BYTES) -> None:
if is_link(path):
ReturnCodeCheck.print_and_return_status(json.dumps(
{'status': NumberConstant.ERROR,
'info': "The db file '%s' is link. Please check the "
'path.' % path}))
if not is_create and os.path.exists(path) and os.path.getsize(path) > max_size:
ReturnCodeCheck.print_and_return_status(json.dumps(
{'status': NumberConstant.ERROR,
'info': "The db file '%s' is too large to read. Please check the "
'path.' % path}))
def check_file_readable(path: str) -> None:
"""
check path is file and readable
"""
check_path_valid(path, True)
def check_file_writable(path: str) -> None:
"""
check path is file and writable
"""
if path and os.path.exists(path):
check_path_valid(path, True)
if not os.access(path, os.W_OK):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" does not have permission to write. "
f"Please check that the path is writeable.")
def check_dir_readable(path: str) -> None:
"""
check path is dir and readable
"""
check_path_valid(path, False)
def check_dir_writable(path: str) -> None:
"""
check path is dir and writable
"""
check_path_valid(path, False)
if is_root_user():
return
if not os.access(path, os.W_OK):
raise ProfException(ProfException.PROF_INVALID_PATH_ERROR,
f"The path \"{path}\" does not have permission to write. "
f"Please check that the path is writeable.")
def is_other_writable(path: str) -> bool:
"""
check whether others can write
0o022 means 755
"""
stat_info = os.stat(path)
return bool(stat_info.st_mode & 0o022)
def check_path_owner(path: str) -> bool:
"""
Check the file owner is the root user or the current user.
"""
if not is_linux():
return True
stat_info = os.stat(path)
if stat_info.st_uid == 0:
return True
else:
current_uid = os.geteuid()
return current_uid == stat_info.st_uid
def check_parent_dir_invalid(paths: list) -> None:
for path in paths:
check_path_valid(path, False)
def check_so_valid(path: str) -> bool:
"""
check so file is file and executable
"""
if not os.path.isfile(path):
logging.warning("The path %s is not a file. Please check the path.", path)
return False
elif is_link(path):
logging.warning("The path %s is link. Please check the path.", path)
return False
elif not os.access(path, os.R_OK):
logging.warning("The path %s does not have permission to open. "
"Please check that the path is readable.", path)
return False
elif is_other_writable(path):
logging.warning("The path %s is writable by others", path)
return False
elif not check_path_owner(path):
logging.warning("The path %s owner is not the current user.", path)
return False
else:
return True
def is_root_user() -> bool:
return not is_linux() or os.getuid() == 0