import os
import shutil
import subprocess
import shlex
from argparse import Namespace
from itertools import chain
from pathlib import Path
from subprocess import PIPE, STDOUT
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union
from utils.comm_log import CommLog
from utils.pkg_utils import CompressError
from pkg_parser import PkgSoftlink
class PackageName:
"""包名。"""
def __init__(self,
package_attr,
args: Namespace,
version: str):
self.product_name = package_attr.get('product_name')
self.chip_name = args.chip_name or package_attr.get('chip_name')
self.suffix = args.suffix or package_attr.get('suffix')
self.func_name = get_func_name(args.func_name, package_attr)
self.chip_plat = package_attr.get('chip_plat')
self.deploy_type = package_attr.get('deploy_type')
self.version = version.lower()
self.not_in_name_list = args.not_in_name.split(",")
self.os_arch = args.os_arch
self.package_suffix = args.package_suffix
self.ext_name = args.ext_name
if args.pkg_name_style == 'underline':
self.name_sep = '_'
else:
self.name_sep = '-'
def get_attribute(self, name: str) -> Optional[str]:
"""获取属性。"""
if name in self.not_in_name_list:
return None
return getattr(self, name)
def getvalue(self) -> str:
product_name = self.get_attribute('product_name')
chip_name = self.get_attribute('chip_name')
func_name = self.get_attribute('func_name')
version = self.get_attribute('version')
os_arch = self.get_attribute('os_arch')
chip_plat = self.get_attribute('chip_plat')
deploy_type = self.get_attribute('deploy_type')
ext_name = self.get_attribute('ext_name')
package_suffix = "debug" if self.package_suffix == "debug" else None
region1 = "-".join(filter(None, [product_name, remove_ascend(chip_name), func_name]))
region2 = ".".join(filter(None, [version]))
region3 = "-".join(filter(None, [os_arch, chip_plat, deploy_type, package_suffix, ext_name]))
package_name = "_".join(filter(None, [region1, region2, region3]))
return f"{package_name}.{self.suffix}"
class MakeselfPkgParams(NamedTuple):
"""run包打包参数。"""
package_name: str
comments: str
makeself_tool: Optional[str] = None
makeself_header: Optional[str] = None
help_info: Optional[str] = None
source_target: Optional[str] = None
install_script: Optional[str] = None
independent_pkg: Optional[bool] = False
cleanup: Optional[str] = None
def remove_ascend(text):
if text is None:
return None
text_lower = text.lower()
if text_lower == "ascend910_93":
return "A3"
if "ascend" in text_lower:
return text_lower.replace("ascend", "")
return text_lower
def get_func_name(func_name: str, package_attr) -> str:
"""获取包func_name。"""
return func_name or package_attr.get('func_name')
def softlink_before_package(pkg_soft_links: List[PkgSoftlink], release_dir: Union[str, Path]):
"""打包前创建包内文件软链。"""
for pkg_softlink in pkg_soft_links:
src_path = os.path.join(release_dir, pkg_softlink.src_path)
dst_path = os.path.join(release_dir, pkg_softlink.dst_path)
os.symlink(
os.path.relpath(src_path, os.path.dirname(dst_path)), dst_path
)
def get_compress_tool() -> str:
tools = ["pigz", "gzip", "bzip2", "xz"]
for tool in tools:
path = shutil.which(tool)
if path:
return "--" + tool
CommLog.cilog_error("The system does not come with a compression tool pre-installed."
"Please ensure at least one of the folllowing compression tools is available: %s", tools)
return ""
def get_compress_format() -> str:
tar_format = "gnu"
path = shutil.which("bsdtar")
if path:
tar_format = "ustar"
return tar_format
def compose_makeself_command(params: MakeselfPkgParams) -> str:
"""组装makeself包打包命令。"""
def get_cleanup_commands() -> List[str]:
if params.cleanup:
return ['--cleanup', params.cleanup]
return []
independent_pkg = params.independent_pkg
compress_tool = get_compress_tool()
tar_format = get_compress_format()
if independent_pkg:
commands = chain(
[
'TMPDIR=$pwd', params.makeself_tool, "--header", params.makeself_header,
"--help-header", params.help_info, compress_tool, '--complevel', '4',
'--nomd5', '--sha256', '--nooverwrite', '--chown', '--tar-format', tar_format,
'--tar-extra', '--numeric-owner', '--tar-quietly'
],
get_cleanup_commands(),
[params.source_target, params.package_name, params.comments, params.install_script]
)
else:
commands = chain(
[
compress_tool, '--complevel', '4',
'--nomd5', '--sha256', '--nooverwrite', '--chown', '--tar-format', tar_format,
'--tar-extra', '--numeric-owner', '--tar-quietly'
],
get_cleanup_commands(),
[params.package_name, params.comments]
)
command = ' '.join(commands)
return command
def create_makeself_pkg_params_factory(source_target: str,
package_name: str,
comments: str
) -> Callable[[str, dict, bool], MakeselfPkgParams]:
"""创建Makeself打包参数工厂。"""
def create_makeself_pkg_params(makeself_dir: str,
package_attr: Dict,
independent_pkg=False) -> MakeselfPkgParams:
"""创建Makeself打包参数。"""
cleanup = package_attr.get('cleanup')
if independent_pkg:
install_script = str(package_attr.get('install_script'))
help_info = str(package_attr.get('help'))
makeself_tool = os.path.join(makeself_dir, 'makeself.sh')
makeself_header = os.path.join(makeself_dir, 'makeself-header.sh')
params = MakeselfPkgParams(
package_name=package_name,
comments=comments,
makeself_tool=makeself_tool,
makeself_header=makeself_header,
help_info=help_info,
source_target=source_target,
install_script=install_script,
independent_pkg=independent_pkg,
cleanup=cleanup,
)
else:
params = MakeselfPkgParams(
package_name=package_name,
comments=comments,
cleanup=cleanup,
)
return params
return create_makeself_pkg_params
def create_run_package_command(params: MakeselfPkgParams
) -> Tuple[Optional[str], Optional[str]]:
"""
功能描述: 组装打run包命令
返回值: command
"""
return compose_makeself_command(params), None
def _parse_env_and_cmd(tokens, env):
"""
从一组 token 中提取环境变量并返回命令列表
"""
cmd_list = []
for token in tokens:
if '=' in token and not cmd_list:
key, val = token.split('=', 1)
env[key] = os.getcwd() if val.upper() in ['$PWD', '$PWD/'] else val
continue
cmd_list.append(token)
return cmd_list
def run_complex_cmd(cmd_str):
"""
处理cmd命令
"""
parts = cmd_str.split('&&')
cwd, env = None, os.environ.copy()
final_cmd = None
for part in parts:
tokens = shlex.split(part.strip())
if not tokens:
continue
if tokens[0] == 'cd':
cwd = tokens[1] if len(tokens) > 1 else cwd
continue
final_cmd = _parse_env_and_cmd(tokens, env)
if not final_cmd:
return None
return subprocess.run(
final_cmd, cwd=cwd, env=env, shell=False,
check=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
def exec_pack_cmd(delivery_dir: str,
pack_cmd: str,
package_name: str) -> str:
"""执行打包命令"""
if delivery_dir:
cmd = f'cd {delivery_dir} && {pack_cmd}'
else:
cmd = pack_cmd
CommLog.cilog_info("package cmd:%s", cmd)
result = run_complex_cmd(cmd)
output = result.stdout
if result.returncode != 0:
CommLog.cilog_error(__file__, "compress package(%s) failed! %s.", package_name, output)
raise CompressError(package_name)
return package_name