import os
import re
import subprocess
import copy
from datetime import datetime
from resources.global_var import CURRENT_BUILD_DIR, CURRENT_OHOS_ROOT
from util.io_util import IoUtil
from util.log_util import LogUtil
from hb.helper.no_instance import NoInstance
from containers.status import throw_exception
class HandleKwargs(metaclass=NoInstance):
compile_item_pattern = re.compile(r'\[\d+/\d+\].+')
custom_kwargs = ["max_try", "pre_msg", "log_stage", "after_msg", "log_filter", "custom_line_handle"]
enable_message_filter = False
@staticmethod
def get_env_kwargs(kw_dict):
for item in HandleKwargs.custom_kwargs:
kw_dict.pop(item, "")
return kw_dict
@staticmethod
def print_pre_msg(kw_dict):
pre_msg = kw_dict.get('pre_msg', '')
if pre_msg:
LogUtil.hb_info(pre_msg)
@staticmethod
def set_log_stage(kw_dict):
log_stage = kw_dict.get('log_stage', '')
if log_stage:
LogUtil.set_stage(log_stage)
@staticmethod
def remove_useless_space(cmd):
while "" in cmd:
cmd.remove("")
return cmd
@staticmethod
def print_post_msg(kw_dict):
after_msg = kw_dict.get('after_msg', '')
if after_msg:
LogUtil.hb_info(after_msg)
@staticmethod
def clear_log_stage(kw_dict):
log_stage = kw_dict.get('log_stage', '')
if log_stage:
LogUtil.clear_stage()
@staticmethod
def handle_line(line, kw_dict):
filter_function = kw_dict.get('custom_line_handle', False)
if filter_function:
return filter_function(line)
else:
return True, line
@staticmethod
def update_filter_status(log_mode, kw_dict):
if kw_dict.get('log_filter', False) or log_mode == 'silent':
HandleKwargs.enable_message_filter = True
@staticmethod
def print_line(line, log_mode):
if HandleKwargs.enable_message_filter:
info = re.findall(HandleKwargs.compile_item_pattern, line)
if len(info):
LogUtil.hb_info(info[0], mode=log_mode)
else:
LogUtil.hb_info(line)
class SystemUtil(metaclass=NoInstance):
@staticmethod
def exec_command(cmd: list, log_path='out/build.log', exec_env=None, log_mode='normal',
**kwargs):
custom_kwargs = copy.deepcopy(kwargs)
env_kwargs = HandleKwargs.get_env_kwargs(kwargs)
HandleKwargs.enable_message_filter = False
HandleKwargs.set_log_stage(custom_kwargs)
HandleKwargs.update_filter_status(log_mode, custom_kwargs)
cmd = HandleKwargs.remove_useless_space(cmd)
if not os.path.exists(os.path.dirname(log_path)):
os.makedirs(os.path.dirname(log_path), exist_ok=True)
HandleKwargs.print_pre_msg(custom_kwargs)
hidden_pattern = SensitiveHidden.load_sensitive_conf()
max_try = custom_kwargs.get("max_try", 1)
while max_try > 0:
with open(log_path, 'at', encoding='utf-8') as log_file:
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8',
env=exec_env,
errors="ignore",
**env_kwargs)
for _line in iter(process.stdout.readline, ''):
line = SensitiveHidden.hidden_sensitive_info(hidden_pattern, _line)
keep_deal, new_line = HandleKwargs.handle_line(line, custom_kwargs)
if keep_deal:
log_file.write(new_line)
HandleKwargs.print_line(new_line, log_mode)
process.wait()
ret_code = process.returncode
if ret_code == 0:
break
else:
max_try -= 1
HandleKwargs.print_post_msg(custom_kwargs)
HandleKwargs.clear_log_stage(custom_kwargs)
if ret_code != 0:
cmd_str = " ".join(cmd)
LogUtil.hb_error(f"command failed: \"{cmd_str}\" , ret_code: {ret_code}")
LogUtil.get_failed_log(log_path, cmd)
@staticmethod
def get_current_time(time_type: str = 'default'):
if time_type == 'timestamp':
return int(datetime.utcnow().timestamp() * 1000)
if time_type == 'datetime':
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return datetime.now().replace(microsecond=0)
class ExecEnviron:
def __init__(self):
self._env = None
@property
def allenv(self):
return self._env
@property
def allkeys(self):
if self._env is None:
return []
return list(self._env.keys())
def initenv(self):
self._env = os.environ.copy()
def allow(self, allowed_vars: list):
if self._env is not None:
allowed_env = {k: v for k, v in self._env.items() if k in allowed_vars}
self._env = allowed_env
class SensitiveHidden:
@staticmethod
def determine_conf_file():
config_file = os.path.join(CURRENT_BUILD_DIR, "sensitive_info_config.json")
sensitive_config_ext = os.path.join(CURRENT_OHOS_ROOT, "out/products_ext/sensitive_info_config.json")
if os.path.exists(sensitive_config_ext):
config_file = sensitive_config_ext
return config_file
@staticmethod
def parse_sensitive_conf(sensitive_config_file: str) -> set:
hidden_pattern = set()
if not os.path.isfile(sensitive_config_file):
return hidden_pattern
config = IoUtil.read_json_file(sensitive_config_file)
hidden_pattern.update(config.get("keywords", []))
for var_name in config.get("env_vars", []):
env_value = os.getenv(var_name)
if env_value:
hidden_pattern.add(env_value)
for pattern_str in config.get("regex_patterns", []):
try:
hidden_pattern.add(re.compile(pattern_str, re.IGNORECASE))
except re.error as e:
raise ValueError(f"invalid regex pattern '{pattern_str}': {e}") from e
return hidden_pattern
@staticmethod
def load_sensitive_conf() -> set:
config_file = SensitiveHidden.determine_conf_file()
try:
hidden_pattern = SensitiveHidden.parse_sensitive_conf(config_file)
except Exception as e:
raise ValueError(f"{config_file} is invalid file, please check: {e}") from e
return hidden_pattern
@staticmethod
def hidden_sensitive_info(hidden_pattern: set, text: str, replace_text: str = "******") -> str:
if not hidden_pattern:
return text
hidden_text = text
for pattern in hidden_pattern:
if isinstance(pattern, str):
hidden_text = hidden_text.replace(pattern, replace_text)
elif isinstance(pattern, re.Pattern):
hidden_text = pattern.sub(replace_text, hidden_text)
else:
continue
return hidden_text