import ctypes
import logging
import os
import platform
import re
import sys
import time
from logging.handlers import RotatingFileHandler
from common_func.constant import Constant
from common_func.ms_constant.number_constant import NumberConstant
from common_func.path_manager import PathManager
from common_func.regex_manager import RegexManagerConstant
class CommonConstant:
"""
common constant class
"""
CLIENT_NUM = 16
SAMPLE_JSON = "sample.json"
INFO_JSON_PATTERN = re.compile(r"info.json.(\d+)$")
GE_TABLE_NUM = 2
ACL_TABLE_MAP = "AclDataMap"
ACL_HASH_TABLE_MAP = "AclHashMap"
CONN_MASK = 0o640
FILE_AUTHORITY = 0o640
OPEN_AUTHORITY = 0o640
JOB_ID_PATTERN = r"^[A-Za-z0-9\-\_]+$"
BYTE_SIZE = 1024
DISK_SIZE = 100
PERCENT = 100.0
ROUND_SIX = 6
MAX_LOG_BYTES = 5 * 1024 * 1024
MAX_LOG_BACKUPS = 1
FILE_NAME = os.path.basename(__file__)
FILE_NAME_DONE_LEN = 5
LOG_LEVEL = logging.INFO
MINIMUM_DISK_MEMORY = 512
MEMORY_BUFFER_NUM = 3
def get_common_class_name(self: any) -> any:
"""
get common class name
"""
return self.__class__.__name__
def get_common_class_member(self: any) -> any:
"""
get common class member num
"""
return self.__dict__
def error(file_name: str, msg: str) -> None:
"""
print error message
"""
if file_name is None or msg is None:
return
print_msg(time.strftime("%a %d %b %Y %H:%M:%S ", time.localtime())
+ f"[ERROR] [MSVP] [{str(os.getpid())}] {file_name}: {msg}",
flush=True)
def print_info(file_name: str, msg: str) -> None:
"""
print info message
"""
if file_name is None or msg is None:
return
print_msg(time.strftime("%a %d %b %Y %H:%M:%S ", time.localtime())
+ "[INFO] [MSVP] [{0}] {1}: {2}".format(str(os.getpid()), file_name, str(msg)),
flush=True)
def warn(file_name: str, msg: str) -> None:
"""
print error message
"""
if file_name is None or msg is None:
return
print_msg(time.strftime("%a %d %b %Y %H:%M:%S ", time.localtime())
+ "[WARNING] [MSVP] [{0}] {1}: {2}".format(str(os.getpid()), file_name, str(msg)),
flush=True)
def print_msg(*args: any, **kwargs: any) -> None:
"""
execute print
"""
print(*args, **kwargs)
class Log:
"""
Python logger
"""
def __init__(self: any, log_path: str, logger: any = None) -> None:
"""Init a python logger"""
if logger:
self.logger = logging.getLogger(logger)
else:
self.logger = logging.getLogger()
self.logger.setLevel(CommonConstant.LOG_LEVEL)
self.log_path = PathManager.get_log_dir(log_path)
if not os.path.exists(self.log_path):
os.makedirs(self.log_path, mode=NumberConstant.DIR_AUTHORITY)
self.log_name = os.path.join(self.log_path, "collection.log")
for log in self.logger.handlers:
self.logger.removeHandler(log)
file_handler = RotatingFileHandler(self.log_name, 'a', CommonConstant.MAX_LOG_BYTES,
CommonConstant.MAX_LOG_BACKUPS)
os.chmod(self.log_name, CommonConstant.OPEN_AUTHORITY)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [MSVP] '
'[%(process)d] [%(filename)s:%(lineno)d] %(message)s',
'%a, %d %b %Y %H:%M:%S')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
file_handler.close()
def getlog(self: any) -> any:
"""
:return: logger object
"""
return self.logger
def get_log_path(self: any) -> str:
"""
:return: logger path
"""
return self.log_path
class LogFactory:
"""
Build a logger
"""
loggers = {}
@staticmethod
def get_logger(name: str, collection_path: str) -> any:
"""
return name of the log
"""
if not LogFactory.loggers.get(name):
_logger = Log(collection_path)
LogFactory.loggers[name] = _logger.getlog()
return LogFactory.loggers.get(name)
@staticmethod
def get_unique_logger(name: str) -> any:
"""
set logger object
"""
if not LogFactory.loggers.get(name):
_logger = Log(name, name)
LogFactory.loggers[name] = _logger.getlog()
return LogFactory.loggers.get(name)
def is_linux() -> bool:
"""
check whether our system is linux or not
"""
result = False
plat, _ = get_platform_info()
if plat.lower() == "linux":
result = True
return result
def get_platform_info() -> tuple:
"""
get platform info
"""
info = platform.uname()
return info[0], info[4]
def get_data_dir_sorted_files(data_path: str) -> list:
"""
sorted the files under the data directory
"""
slice_regx = re.compile(RegexManagerConstant.REGEX_SLICE)
num_regx = re.compile(RegexManagerConstant.REGEX_NUM)
file_all = os.listdir(data_path)
def _file_filer(file: any) -> any:
if not file:
return 0
slice_res = slice_regx.search(file)
if not slice_res:
return 0
num_res = num_regx.search(slice_res.group())
if not num_res:
return 0
return ord(file[:1]) * 10000 + int(num_res.group())
file_all.sort(key=_file_filer)
return file_all
def _strip_and_split_str_with_char(item: any, ch: str) -> list:
return str(item).strip().split(ch)[0].split('.')
def check_number_valid(item: any) -> bool:
"""
check validity of integer or float input
"""
if not item and item != 0:
logging.error("item is empty")
return False
danger_list = ["rm", "reboot", "chown", "chmod", "shutdown", "halt", ">", "<"]
sum_split = sum(n.isdigit() for n in _strip_and_split_str_with_char(item, "e")) != 2
sum_danger_list = str(item).strip().split("e")[-1] in danger_list
if not str(item).isdigit() and (sum_split or sum_danger_list):
logging.error("invalid item %s", item)
return False
if float(item) < 0:
logging.error("item %s should be bigger than 0", item)
return False
return True
def byte_per_us2_mb_pers(byte_perus: any) -> any:
"""
transform byte/us to mb/s
"""
return round(byte_perus * Constant.BYTE_US_TO_MB_S, NumberConstant.ROUND_THREE_DECIMAL)
def ns2_us(ns: any) -> any:
"""
transform ns to us
"""
return round(ns / NumberConstant.NS_TO_US, NumberConstant.ROUND_THREE_DECIMAL)
def init_log(output_path: str) -> None:
"""
create data and corresponding directories
"""
log_path = PathManager.get_collection_log_path(output_path)
root_logger = logging.getLogger()
for log in root_logger.handlers:
root_logger.removeHandler(log)
logging.basicConfig(
level=CommonConstant.LOG_LEVEL,
format='%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] -'
' %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=log_path,
filemode='a+')
def get_data_file_memory(result_dir: str) -> float:
"""
to get data memory
:param result_dir:
:return:
"""
if not os.path.isdir(result_dir):
return Constant.MAX_READ_FILE_BYTES
size = 0
data_path = PathManager.get_data_dir(result_dir)
for file in os.listdir(data_path):
size += os.path.getsize(os.path.join(data_path, file))
return float(size / CommonConstant.BYTE_SIZE / CommonConstant.BYTE_SIZE)
def get_free_memory(result_dir: str) -> float:
"""
to get free space
:param result_dir:
:return:
"""
if platform.system() == 'Windows':
free_bytes = ctypes.c_ulonglong(0)
ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(result_dir), None, None, ctypes.pointer(free_bytes))
return float(free_bytes.value / CommonConstant.BYTE_SIZE / CommonConstant.BYTE_SIZE)
dir_info = os.statvfs(result_dir)
return float(dir_info.f_bavail * dir_info.f_frsize / CommonConstant.BYTE_SIZE / CommonConstant.BYTE_SIZE)
def check_free_memory(result_dir: str) -> None:
"""
check memory
:param result_dir:
:return:
"""
data_file_memory = get_data_file_memory(result_dir)
free_memory = get_free_memory(result_dir)
need_free_memory = data_file_memory * CommonConstant.MEMORY_BUFFER_NUM
if free_memory < CommonConstant.MINIMUM_DISK_MEMORY:
warn(os.path.basename(__file__),
"The disk space is less than 512 MB, please check")
if free_memory < need_free_memory:
warn(os.path.basename(__file__),
"Requires {:.2f}MB of space to store parsed data, actually only {:.2f}MB, "
"parsed data may not be complete.".format(need_free_memory, free_memory))
def call_sys_exit(status: any = None) -> None:
"""
call sys.exit to exit the program.
:param status: the exit status or other object to be printed.
:return:
"""
sys.exit(status)