import os
import sys
import subprocess
import argparse
import traceback
import csv
from argparse import Namespace
from collections import namedtuple
from datetime import datetime, timezone
from functools import partial
from itertools import chain
from typing import Dict, Iterator, List, Set, Tuple, TextIO
from common.py.utils import pkg_utils
from common.py.filelist import (
FileItem, FileList, check_filelist, create_file_item, generate_filelist,
get_transform_nested_path_func,
)
from common.py.packer import (
PackageName, create_makeself_pkg_params_factory, create_run_package_command
)
from common.py.pkg_parser import (
ParseOption, XmlConfig, parse_xml_config, get_cann_version_info
)
from common.py.utils.pkg_utils import (
CONFIG_SCRIPT_PATH, CompressError, ContainAsteriskError, DELIVERY_PATH, FAIL,
FilelistError, GenerateFilelistError, PackageNameEmptyError, SUCCESS, TOP_DIR,
UnknownOperateTypeError, path_join
)
from common.py.utils.funcbase import invoke, pipe
from common.py.utils.comm_log import CommLog
from common.py.version_info import VersionInfo, VersionInfoFile
def get_comments(package_name: PackageName) -> str:
comments = '_'.join(
[package_name.product_name.upper(), package_name.func_name.upper(), 'RUN_PACKAGE']
)
return f'"{comments}"'
def get_compress_cmd(pkg_args: Namespace,
xml_config: XmlConfig) -> str:
suffix = xml_config.package_attr.get('suffix')
if suffix == "run":
package_name = PackageName(xml_config.package_attr, pkg_args, xml_config.version)
factory = create_makeself_pkg_params_factory(
package_name.getvalue(), get_comments(package_name)
)
params = factory(xml_config.package_attr)
pack_cmd, err_msg = create_run_package_command(params)
if err_msg:
CommLog.cilog_error(err_msg)
CommLog.cilog_error("create_run_command failed!")
else:
CommLog.cilog_error("the repack type '%s' is not support!", suffix)
sys.exit(FAIL)
try:
makeself_dir = os.path.join(TOP_DIR, "build/makeself.txt")
with open(makeself_dir, 'w') as f:
f.write(pack_cmd)
except Exception as exception:
CommLog.cilog_error(f"save makeself.txt failed!{str(exception)}")
sys.exit(FAIL)
return package_name.getvalue()
def make_parse_option(args_: argparse.Namespace) -> ParseOption:
return ParseOption(
args_.os_arch, args_.pkg_version,
args_.build_type,
args_.package_check,
args_.ext_name
)
PrivatePackageOption = namedtuple(
'PrivatePackageOption',
[
'os_arch', 'package_suffix', 'not_in_name', 'pkg_version', 'ext_name',
'chip_name', 'func_name', 'version_dir', 'disable_multi_version', 'suffix'
]
)
class PackageOption(PrivatePackageOption):
__slots__ = ()
def __new__(cls, *package_option_args, **kwargs):
return super().__new__(cls, *package_option_args, **kwargs)
def generate_info_content(target_conf, ext_name) -> List[str]:
def toolchain_llvm_config() -> Iterator[Tuple[str, str]]:
if 'llvm' in ext_name:
yield 'toolchain', 'llvm'
content_list = [
f'{key}={value}'
for key, value in chain(
target_conf['content'].items(), toolchain_llvm_config()
)
]
return content_list
def generate_version_header_content(target_conf) -> Iterator[str]:
guard_name = target_conf['value'].replace('.', '_').upper()
yield f'#ifndef {guard_name}'
yield f'#define {guard_name}'
yield ''
for name, value in target_conf['content'].items():
if name.endswith('_VERSION'):
version_infos = get_cann_version_info(name, value)
for version_name, version_value in version_infos:
yield f'#define {version_name} {version_value}'
else:
yield f'#define {name} {value}'
yield ''
yield f'#endif /* {guard_name} */'
yield ''
def generate_customized_file(target_conf, ext_name):
filepath = os.path.join(TOP_DIR, "build", target_conf.get('value'))
generator = target_conf.get('generator', 'info')
if generator == 'version_header':
content_list = generate_version_header_content(target_conf)
else:
content_list = generate_info_content(target_conf, ext_name)
file_content = '\n'.join(content_list)
try:
with open(filepath, 'w') as file:
file.write(file_content)
except Exception as ex:
CommLog.cilog_error(f"generate customized file {filepath} failed: {ex}!")
return FAIL
return SUCCESS
def get_module(target_config) -> str:
module = target_config.get('module', 'NA')
return module if module else 'NA'
def get_operation(operation, target_config) -> str:
if operation in ('copy', 'move') and target_config.get('entity') == 'true':
return 'copy_entity'
return operation
def get_permission(target_config) -> str:
return target_config.get('install_mod', 'NA')
def get_owner_group(target_config) -> str:
return target_config.get('install_own', 'NA').replace('$', '\\\\$')
def get_install_type(target_config) -> str:
return target_config.get('install_type', 'NA')
def get_softlink(target_config) -> List[str]:
softlink_str = target_config.get('install_softlink')
if not softlink_str:
return []
return softlink_str.split(';')
def get_feature(target_config) -> Set[str]:
return target_config['feature']
def get_chip(target_config) -> Set[str]:
return target_config['chip']
def get_configurable(target_config) -> str:
return target_config.get('configurable', 'FALSE')
def get_hash_value(target_config) -> str:
return target_config.get('hash', 'NA')
def get_block(target_config) -> str:
return target_config.get('name', 'NA')
def get_pkg_inner_softlink(target_config) -> List[str]:
softlink_str = target_config.get('pkg_inner_softlink')
if not softlink_str:
return []
return softlink_str.split(';')
def parse_install_info(infos: List,
operate_type,
filter_key) -> Iterator[FileItem]:
for target_config in infos:
target_name = get_target_name(target_config)
if target_config.get("optional") == 'true':
path = os.path.join(TOP_DIR, DELIVERY_PATH, target_config.get('dst_path'))
value = os.path.join(TOP_DIR, DELIVERY_PATH, target_config.get('dst_path'), target_name)
if not os.path.exists(path):
continue
if not os.path.exists(value):
continue
if operate_type in ('copy', 'move'):
relative_path_in_pkg = os.path.join(target_config.get('dst_path'), target_name)
relative_install_path = path_join(target_config.get('install_path'), target_name)
is_dir = target_config.get('is_dir', False)
elif operate_type == 'mkdir':
relative_path_in_pkg = 'NA'
relative_install_path = target_config.get('value')
is_dir = False
elif operate_type == 'del':
relative_path_in_pkg = 'NA'
relative_install_path = path_join(target_config.get('install_path'), target_name)
is_dir = False
else:
raise UnknownOperateTypeError(f"unknown operate type {operate_type}")
if relative_install_path is None:
continue
install_type = get_install_type(target_config)
if any(key in install_type for key in filter_key):
is_in_docker = 'TRUE'
else:
is_in_docker = 'FALSE'
file_item = create_file_item(
get_module(target_config),
get_operation(operate_type, target_config),
relative_path_in_pkg,
relative_install_path,
is_in_docker,
get_permission(target_config),
get_owner_group(target_config),
install_type,
get_softlink(target_config),
get_feature(target_config),
'N',
get_configurable(target_config),
get_hash_value(target_config),
get_block(target_config),
get_pkg_inner_softlink(target_config),
get_chip(target_config),
is_dir,
)
yield file_item
def generate_version_info(version_info: VersionInfo,
package_name: PackageName,
target_path: str):
if version_info.version_xml:
requires = version_info.version_xml.collect_requires(package_name.func_name)
else:
requires = []
itf_version_info = '\n'.join(version_info.itf_versions)
version_info_file = VersionInfoFile(
version_info.version, itf_version_info, requires, None,
timestamp=version_info.timestamp
)
version_info_file.save(
os.path.join(target_path, "version.info")
)
def do_copy(target_conf=None,
delivery_dir='',
release_dir='',
config_relpath=None,
package_name=None):
copy_type = target_conf.get('copy_type')
if copy_type == 'delivery':
src_target = os.path.join(
delivery_dir,
target_conf['src_path'],
target_conf.get('value')
)
elif copy_type == 'source':
src_target = os.path.join(
pkg_utils.TOP_DIR,
target_conf['src_path'],
target_conf.get('value')
)
else:
CommLog.cilog_error(THIS_FILE_NAME, "copy_type %s is not support for src_target %s!", copy_type, src_target)
return FAIL
target_name = get_target_name(target_conf)
dst_path = os.path.join(release_dir, target_conf.get('dst_path', ''))
pkg_mod = target_conf.get('pkg_mod', '')
rename = target_conf.get('rename')
cmd = ''
if not os.path.exists(dst_path):
cmd += ' '.join(['mkdir', '-p', dst_path, '&&'])
dst_fullpath = os.path.join(dst_path, target_name)
if rename:
dst_path = os.path.join(dst_path, rename)
if 'dereference' in target_conf:
dereference_flag = 'L'
else:
dereference_flag = ''
cmd += ' '.join([f'cp -rf{dereference_flag}', src_target, dst_path])
status = subprocess.run(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True, shell=False)
if status.returncode != SUCCESS:
CommLog.cilog_error("do_copy(%s) failed!", cmd)
CommLog.cilog_error("output: %s, error: %s", status.stdout, status.stderr)
CommLog.cilog_error("please check package config \"%s\"!", config_relpath)
return FAIL
if pkg_mod:
cmd = f'chmod -R {pkg_mod} {dst_fullpath}'
status = subprocess.run(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True, shell=False)
if status.returncode != SUCCESS:
CommLog.cilog_error(THIS_FILE_NAME, "chmod(%s) failed! %s", cmd, cmd)
CommLog.cilog_error(THIS_FILE_NAME, "output: %s, error: %s", status.stdout, status.stderr)
return FAIL
return SUCCESS
def execute_repack_process(xml_config: XmlConfig,
delivery_dir: str,
pkg_args: Namespace,
package_name: PackageName = None,
package_option: PackageOption = None):
release_dir = os.path.join(
delivery_dir, xml_config.default_config.get('name', 'default'))
for item in xml_config.generate_infos:
if generate_customized_file(item, package_option.ext_name):
return FAIL
if xml_config.package_attr.get('gen_version_info'):
if pkg_args.pkg_name == 'driver_compat':
generate_version_info(xml_config.version_info, package_name, delivery_dir)
else:
generate_version_info(xml_config.version_info, package_name, release_dir)
for item in chain(xml_config.package_content_list, xml_config.move_content_list):
if do_copy(item,
delivery_dir,
release_dir,
xml_config.xml_relpath,
package_name):
return FAIL
if pkg_args.check_size == "True":
limit_list, tag = processing_csv_file(
release_dir, package_name.func_name, package_name.chip_name, pkg_args.build_type
)
if not tag:
return FAIL
if limit_list:
abspath = os.path.abspath(release_dir)
replace_path = abspath + "/"
result = check_add_dir(replace_path, abspath, limit_list)
if not result:
return FAIL
try:
package_name = get_compress_cmd(pkg_args, xml_config)
except CompressError:
return FAIL
CommLog.cilog_info("package %s generate filelist.csv and makeself cmd successfully!",
package_name)
return SUCCESS
def check_path_is_conflict(xml_config):
install_path_list = set()
pkg_softlink_list = set()
for item in xml_config.package_content_list:
value_list = item.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
if item.get('install_path'):
install_path_list.add(
os.path.join(item['install_path'], target_name)
)
if item.get('pkg_inner_softlink'):
pkg_softlink = item.get('pkg_inner_softlink')
pkg_softlink_list.add(pkg_softlink)
if install_path_list & pkg_softlink_list:
CommLog.cilog_info('intersection:{}'.format(install_path_list & pkg_softlink_list))
CommLog.cilog_info('path conflicting: pkg_inner_softlink dir equals install_path!!')
return FAIL
return SUCCESS
def checksum_value(limit_value, release_dir):
path = os.path.join(release_dir, limit_value[1])
if len(limit_value) >= 7:
try:
max_value = int(limit_value[4])
except ValueError:
CommLog.cilog_error("{0} configuration is not standard., Please check limit.csv.".format(path))
return True
else:
CommLog.cilog_error("{0} configuration is less than four, Please check limit.csv.".format(path))
return True
if not os.path.exists(path):
CommLog.cilog_warning("{0} doesn't exist, Please check limit.csv.".format(path))
return True
size = 0
for root, dirs, files in os.walk(path):
size += os.path.getsize(root)
for f in files:
filepath = os.path.join(root, f)
if os.path.islink(filepath):
continue
if not os.path.exists(filepath):
continue
size += os.path.getsize(os.path.join(root, f))
if size == 0:
size = os.path.getsize(path)
if size > max_value * 1024:
CommLog.cilog_error(f"\n{path} size {size} bytes exceeds maximum {max_value * 1024} bytes")
return False
return True
def processing_csv_file(release_dir, package_name, chip_name, build_type):
ret = True
limit_list = []
product = os.path.basename(os.path.dirname(release_dir))
limit_path = os.path.join(pkg_utils.TOP_SOURCE_DIR, CONFIG_SCRIPT_PATH, "common/limit.csv")
if not os.path.exists(limit_path):
CommLog.cilog_warning("{0} doesn't exist.".format(limit_path))
return limit_list, ret
with open(limit_path, "r") as file:
reader = csv.reader(file)
next(reader)
for data in reader:
if not data:
CommLog.cilog_warning("The limit.csv file contains empty lines.")
continue
if is_match_line(package_name, chip_name, product, build_type, data):
if data[1][-1] == "/":
limit_list.append(data[1][:-1])
else:
limit_list.append(data[1])
res = checksum_value(data, release_dir)
if not res:
ret = False
return limit_list, ret
def is_match_line(package_name, chip_name, product, build_type, data):
return package_name == data[0] and chip_name == data[5] and product == data[6] and build_type == data[7].lower()
def check_add_dir(package_path, dirs, limit_list, ret=True):
for limit_path in limit_list:
if dirs == os.path.join(os.path.split(dirs)[0], limit_path):
return ret
for dir_file in os.listdir(dirs):
path = os.path.join(dirs, dir_file)
relative_path = path.replace(package_path, "")
if os.path.isfile(path) and relative_path not in limit_list:
CommLog.cilog_error("{0} is not in limit.csv file and is newly added.".format(path))
ret = False
elif os.path.isdir(path) and relative_path not in limit_list:
ret = check_add_dir(package_path, path, limit_list, ret)
return ret
def get_target_name(target_conf) -> str:
rename = target_conf.get('rename')
if rename:
return rename
value_list = target_conf.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
return target_name
def gen_file_install_list(xml_config: XmlConfig,
filter_key) -> Tuple[FileList, FileList]:
file_install_list = []
dir_filelist = parse_install_info(
xml_config.dir_install_list, 'mkdir', filter_key
)
move_filelist = parse_install_info(
xml_config.move_content_list, 'move', filter_key
)
pkg_filelist = parse_install_info(
xml_config.package_content_list, 'copy', filter_key
)
gen_filelist = parse_install_info(
xml_config.generate_infos, 'copy', filter_key
)
del_filelist = parse_install_info(
xml_config.expand_content_list, 'del', filter_key
)
collect_filelist = list(chain(dir_filelist, move_filelist, pkg_filelist, gen_filelist))
collect_filelist = list(xml_config.packer_config.fill_is_common_path(collect_filelist))
all_filelist = list(chain(collect_filelist, del_filelist))
for file_item in all_filelist:
file_install_list.append(file_item)
return file_install_list, []
def generate_filelist_file_by_xml_config(xml_config: XmlConfig,
filter_key: List[str],
package_check: bool):
check_move = xml_config.package_attr.get('use_move', False)
transform_nested_path_func = get_transform_nested_path_func(
xml_config.package_attr.get('parallel') or check_move
)
check_features = xml_config.package_attr.get('check_features', False)
file_install_list, [] = invoke(
pipe(
gen_file_install_list,
partial(map, transform_nested_path_func),
tuple,
),
xml_config, filter_key
)
generate_filelist(file_install_list, 'filelist.csv')
if package_check:
check_filelist(file_install_list, check_features, check_move)
def get_pkg_xml_relative_path(pkg_args: Namespace) -> str:
def parts():
yield CONFIG_SCRIPT_PATH
yield pkg_args.pkg_name
if pkg_args.chip_scenes:
yield pkg_args.chip_scenes
if pkg_args.xml_file:
yield pkg_args.xml_file
else:
yield f'{pkg_args.chip_name}/driver.xml'
return os.path.join(*parts())
def write_config_inc_var(name: str, package_attr: Dict, file: TextIO):
if name in package_attr:
value = str(package_attr[name]).lower()
file.write(f"{name.upper()}={value}\n")
def generate_config_inc(package_attr: Dict):
if 'parallel' not in package_attr and 'parallel_limit' not in package_attr and 'use_move' not in package_attr:
return
year = datetime.now(timezone.utc).year
config_inc = os.path.join(TOP_DIR, "build", 'config.inc')
header = [
'#!/bin/sh\n',
'#----------------------------------------------------------------------------\n',
f'# Copyright Huawei Technologies Co., Ltd. 2023-{year}. All rights reserved.\n',
'#----------------------------------------------------------------------------\n',
'\n',
]
if os.path.isfile(config_inc):
os.chmod(config_inc, 0o700)
with open(config_inc, 'w', encoding='utf-8') as file:
file.writelines(header)
write_config_inc_var('parallel', package_attr, file)
write_config_inc_var('parallel_limit', package_attr, file)
write_config_inc_var('use_move', package_attr, file)
os.chmod(config_inc, 0o500)
def main(pkg_name='', xml_file='', main_args=None):
delivery_dir = os.path.join(TOP_DIR, DELIVERY_PATH)
if not os.path.exists(delivery_dir):
return FAIL
config_relative_path = get_pkg_xml_relative_path(main_args)
pkg_xml_file = os.path.join(pkg_utils.TOP_SOURCE_DIR, config_relative_path)
parse_option = make_parse_option(main_args)
try:
xml_config = parse_xml_config(
pkg_xml_file, config_relative_path, delivery_dir, parse_option, main_args
)
except ContainAsteriskError as ex:
CommLog.cilog_error(f"Value contain '*' in {config_relative_path}. value is '{ex.value}'.")
return FAIL
if pkg_name in ['driver', 'firmware']:
filter_key = ['all', 'docker']
elif pkg_name in ['aicpu_kernels_device', 'aicpu_kernels_host']:
filter_key = []
else:
filter_key = ['all', 'run']
try:
generate_filelist_file_by_xml_config(
xml_config, filter_key,
main_args.package_check or xml_config.package_attr.get('package_check')
)
except PackageNameEmptyError:
CommLog.cilog_error(f'package name is empty in {xml_file}, please check it')
return FAIL
except GenerateFilelistError as ex:
CommLog.cilog_error(f'generate filelist {ex.filename} failed!', )
return FAIL
except FilelistError as ex:
CommLog.cilog_error('check filelist error! %s', str(ex))
return FAIL
generate_config_inc(xml_config.package_attr)
package_option = PackageOption(
main_args.os_arch, main_args.package_suffix, main_args.not_in_name, main_args.pkg_version, main_args.ext_name,
chip_name=main_args.chip_name, func_name=main_args.func_name, version_dir=main_args.version_dir,
disable_multi_version=main_args.disable_multi_version, suffix=main_args.suffix)
package_name = PackageName(xml_config.package_attr, main_args, xml_config.version)
if check_path_is_conflict(xml_config) == FAIL:
return FAIL
return execute_repack_process(xml_config, delivery_dir, main_args,
package_name=package_name, package_option=package_option)
def args_parse():
parser = argparse.ArgumentParser(
description='This script is for package repack processing.')
parser.add_argument('-c', '--chip_scenes', metavar='chip_scenes', required=False, dest='chip_scenes', nargs='?',
const='',
default='', help='This parameter define chip id for package.')
parser.add_argument('-n', '--pkg_name', metavar='pkg_name', required=False,
help='This parameter define pkg_name for config_xml.')
parser.add_argument('-o', '--os_arch', metavar='os_arch', required=False, dest='os_arch', nargs='?', const='',
default=None, help="This parameter define the package's os_arch")
parser.add_argument('-t', '--type', metavar='type', required=False, dest='type', nargs='?', const='',
default='repack', help="This parameter define this script's function")
parser.add_argument('-i', '--not_in_name', metavar='not_in_name', required=False, dest='not_in_name', nargs='?',
const='',
default='', help="This parameter define the package's name not contain the element")
parser.add_argument('-v', '--pkg_version', metavar='pkg_version', required=False, dest='pkg_version', nargs='?',
const='',
default='', help="This parameter define the version for package.")
parser.add_argument('-e', '--ext_name', metavar='ext_name', required=False, dest='ext_name', nargs='?', const='',
default='', help="This parameter define the package's ext_name")
parser.add_argument('--package_suffix', nargs='?', const='none',
default='none', help="This parameter define the package suffix, debug or none")
parser.add_argument('--suffix', metavar='suffix', required=False, dest='suffix', nargs='?', const='',
default=None, help="This parameter define the package suffix, for example such as tar.gz")
parser.add_argument('-b', '--build_type', metavar='build_type', required=False, dest='build_type', nargs='?',
const='',
default='debug', help="This parameter define release type of package")
parser.add_argument('--feature_list', metavar='feature_list', nargs='?', const=None,
default=None, help="Package feature list file.")
parser.add_argument('-x', '--xml', metavar='xml_file', required=False, dest='xml_file', nargs='?', const='',
default='', help="This parameter define xml file")
parser.add_argument('--chip_name', metavar='chip_name', required=False, dest='chip_name', nargs='?', const=None,
default=None,
help="This parameter define package chip name, has higher priority than chip name in xml")
parser.add_argument('--func_name', metavar='func_name', required=False, dest='func_name', nargs='?', const=None,
default=None,
help="This parameter define package func name, has higher priority than func name in xml")
parser.add_argument('--source_root', metavar='source_root', required=False, dest='source_root', nargs='?', const='',
help='source root dir.')
parser.add_argument('--version_dir', nargs='?', const='', default='', help='Set version dir.')
parser.add_argument('--tag', metavar='tag', nargs='?', const='', default='')
parser.add_argument('--disable-multi-version', action='store_true', help='Disable multi version.')
parser.add_argument('--package-check', action='store_true', help='check package config.')
parser.add_argument('--check_size', nargs='?', const='', default='', help="Check the size of a file or directory.")
parser.add_argument('--pkg-name-style', metavar='pkg_name_style', default='common', help='Package name style.')
return parser.parse_args()
if __name__ == "__main__":
CommLog.cilog_info("%s", " ".join(sys.argv))
args = args_parse()
try:
if args.source_root:
pkg_utils.TOP_SOURCE_DIR = args.source_root
if args.build_type == '':
args.build_type = 'debug'
else:
args.build_type = args.build_type.lower()
status = main(args.pkg_name, args.xml_file, main_args=args)
except Exception as e:
CommLog.cilog_error("exception is occurred (%s)!", e)
CommLog.cilog_info("%s", traceback.format_exc())
status = FAIL
sys.exit(status)