import hashlib
import os
import json
import sys
try:
reload(sys)
sys.setdefaultencoding('utf8')
except NameError:
pass
except AttributeError:
pass
try:
JSONDecodeError = json.JSONDecodeError
except AttributeError:
JSONDecodeError = ValueError
DEFAULT_MAX_FILE_SIZE = 100 * 1024 * 1024
DEFAULT_MAX_FILE_SIZE_20GB = 20 * 1024 * 1024 * 1024
class SafeFileHandler:
"""
安全文件处理类,提供统一的文件读写操作,包含文件大小校验、软链接检查和异常处理
"""
@staticmethod
def resolve_symlink(file_path):
"""
处理软链接:转换为真实路径并验证
Args:
file_path (str): 文件路径
Returns:
str: 处理后的真实路径
Raises:
Exception: 当软链接指向不存在的路径时抛出异常
"""
if os.path.islink(file_path):
real_path = os.path.realpath(file_path)
if not os.path.exists(real_path):
raise Exception(f"Symbolic link {file_path} points to non-existent real path {real_path}")
return real_path
return file_path
@staticmethod
def check_file_safety(file_path, max_file_size=DEFAULT_MAX_FILE_SIZE):
"""
检查文件安全性,包括是否存在、是否为软链接以及文件大小
Args:
file_path (str): 文件路径
max_file_size (int): 最大允许文件大小(字节),默认100MB
Raises:
Exception: 当文件检查失败时抛出异常
"""
file_path = SafeFileHandler.resolve_symlink(file_path)
if not os.path.exists(file_path):
raise Exception(f"File {file_path} does not exist")
try:
file_size = os.path.getsize(file_path)
if file_size > max_file_size:
raise Exception(
f"File {file_path} is too large ({file_size} bytes), maximum allowed size is {max_file_size} bytes")
except (IOError, OSError) as e:
raise Exception(f"Failed to get size of file {file_path}") from e
@staticmethod
def safe_read(file_path, mode='r', max_file_size=DEFAULT_MAX_FILE_SIZE, encoding=None):
"""
安全读取文件内容
Args:
file_path (str): 文件路径
mode (str): 文件打开模式,默认为'r'
max_file_size (int): 最大允许文件大小(字节),默认100MB
encoding (str): 文件编码,默认为None
Returns:
str: 文件内容字符串
Raises:
Exception: 当文件读取失败时抛出异常
"""
file_path = SafeFileHandler.resolve_symlink(file_path)
SafeFileHandler.check_file_safety(file_path, max_file_size)
try:
if sys.version_info[0] >= 3:
with open(file_path, mode, encoding=encoding) as f:
return f.read()
else:
with open(file_path, mode) as f:
return f.read()
except (IOError, OSError) as e:
raise Exception(f"Failed to read file {file_path}") from e
except Exception as e:
raise Exception(f"Unexpected error while processing file {file_path}") from e
@staticmethod
def safe_write(file_path, content, mode='w', encoding=None):
"""
安全写入文件内容
Args:
file_path (str): 文件路径
content (str or dict or list): 要写入的内容
mode (str): 文件打开模式,默认为'w'
encoding (str): 文件编码,默认为None
Raises:
Exception: 当文件写入失败时抛出异常
"""
dir_path = os.path.dirname(file_path)
if dir_path and not os.path.exists(dir_path):
try:
if not os.path.exists(dir_path):
os.makedirs(dir_path, mode=0o750)
except (IOError, OSError) as e:
raise Exception(f"Failed to create directory {dir_path}") from e
file_path = SafeFileHandler.resolve_symlink(file_path)
try:
if sys.version_info[0] >= 3:
with open(file_path, mode, encoding=encoding) as f:
if isinstance(content, (dict, list)):
json.dump(content, f, indent=4)
else:
f.write(content)
else:
with open(file_path, mode) as f:
if isinstance(content, (dict, list)):
json.dump(content, f, indent=4, ensure_ascii=False)
else:
f.write(content)
except (IOError, OSError) as e:
raise Exception(f"Failed to write file {file_path}") from e
except Exception as e:
raise Exception(f"Unexpected error while writing file {file_path}") from e
@staticmethod
def safe_read_json(file_path, max_file_size=DEFAULT_MAX_FILE_SIZE, encoding='utf-8'):
"""
安全读取JSON文件
Args:
file_path (str): JSON文件路径
max_file_size (int): 最大允许文件大小(字节),默认100MB
encoding (str): 文件编码,默认为'utf-8'
Returns:
dict or list: JSON解析后的内容
Raises:
Exception: 当文件读取或解析失败时抛出异常
"""
file_path = SafeFileHandler.resolve_symlink(file_path)
SafeFileHandler.check_file_safety(file_path, max_file_size)
try:
if sys.version_info[0] >= 3:
with open(file_path, 'r', encoding=encoding) as f:
return json.load(f)
else:
with open(file_path, 'r') as f:
return json.load(f)
except (IOError, OSError) as e:
raise Exception(f"Failed to read JSON file {file_path}") from e
except JSONDecodeError as e:
raise Exception(f"Failed to parse JSON in file {file_path}") from e
except Exception as e:
raise Exception(f"Unexpected error while processing JSON file {file_path}") from e
@staticmethod
def safe_write_json(file_path, data, encoding='utf-8', ensure_ascii=True, indent=4):
"""
安全写入JSON文件
Args:
file_path (str): JSON文件路径
data (dict or list): 要写入的JSON数据
encoding (str): 文件编码,默认为'utf-8'
ensure_ascii (bool): 是否将非ASCII字符转义,默认为True
Raises:
Exception: 当文件写入失败时抛出异常
"""
dir_path = os.path.dirname(file_path)
if dir_path and not os.path.exists(dir_path):
try:
if not os.path.exists(dir_path):
os.makedirs(dir_path, mode=0o750)
except (IOError, OSError) as e:
raise Exception(f"Failed to create directory {dir_path}") from e
file_path = SafeFileHandler.resolve_symlink(file_path)
try:
if sys.version_info[0] >= 3:
with open(file_path, 'w', encoding=encoding) as f:
json.dump(data, f, indent=4, ensure_ascii=ensure_ascii)
else:
with open(file_path, 'w') as f:
json.dump(data, f, indent=4, ensure_ascii=ensure_ascii)
except (IOError, OSError) as e:
raise Exception(f"Failed to write JSON file {file_path}") from e
except Exception as e:
raise Exception(f"Unexpected error while writing JSON file {file_path}") from e
@staticmethod
def calculate_file_md5(file_path, max_file_size=DEFAULT_MAX_FILE_SIZE_20GB):
"""
安全计算文件的MD5哈希值
Args:
file_path (str): 文件路径
max_file_size (int): 最大允许文件大小(字节)
Returns:
str: 文件的MD5哈希值
Raises:
Exception: 当文件处理失败时抛出异常
"""
md5_val = None
if file_path is None or not os.path.exists(file_path):
return md5_val
file_path = SafeFileHandler.resolve_symlink(file_path)
SafeFileHandler.check_file_safety(file_path, max_file_size)
md5 = hashlib.md5()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
md5.update(chunk)
return md5.hexdigest()
except (IOError, OSError) as e:
raise Exception(f"Failed to calculate MD5 for file {file_path}") from e
@staticmethod
def calculate_file_sha256(file_path, max_file_size=DEFAULT_MAX_FILE_SIZE_20GB):
"""
安全计算文件的SHA256哈希值
Args:
file_path (str): 文件路径
max_file_size (int): 最大允许文件大小(字节)
Returns:
str: 文件的SHA256哈希值
Raises:
Exception: 当文件处理失败时抛出异常
"""
hash_val = None
if file_path is None or not os.path.exists(file_path):
return hash_val
file_path = SafeFileHandler.resolve_symlink(file_path)
SafeFileHandler.check_file_safety(file_path, max_file_size)
sha256 = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
sha256.update(chunk)
return sha256.hexdigest()
except (IOError, OSError) as e:
raise Exception(f"Failed to calculate SHA256 for file {file_path}") from e