import copy
import glob
import hashlib
import itertools
import argparse
import re
import xml.etree.ElementTree as ET
import os
import sys
from argparse import Namespace
from functools import partial
from io import StringIO
from itertools import chain, filterfalse
from operator import attrgetter, itemgetter, methodcaller
from typing import (
Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Set, Tuple, Union
)
from .utils import pkg_utils
from .pkg_feature import (
PkgFeature, combine_feature_and_feature_list, load_feature_list, config_feature_to_set,
make_pkg_feature
)
from .filelist import FileItem, FileList, fill_is_common_path
from .utils.pkg_utils import (
ContainAsteriskError, FAIL, BLOCK_CONFIG_PATH,
BlockConfigError, EnvNotSupported, IllegalVersionDir, PackageError,
ParseOsArchError,
flatten, star_pipe, merge_dict, yield_if, ErrMsgs, each_file_line, strip_lines
)
from .utils.funcbase import constant, dispatch, invoke, pipe, star_apply
from .utils.comm_log import CommLog
from .version_info import (
VersionInfo, VersionXml, VersionFormatNotMatch, is_multi_version, VersionInfoFile
)
EnvDict = Dict[str, str]
FileInfo = Dict[str, str]
PackageAttr = Dict[str, Union[str, bool]]
GenerateInfo = Dict[str, str]
class ParseOption(NamedTuple):
os_arch: Optional[str]
pkg_version: Optional[str]
build_type: Optional[str]
package_check: bool
ext_name: str = ''
def parse_os_arch(os_arch: str) -> Tuple[str, str, str]:
match = re.match("^([a-z]+)(\\d+(\\.\\d+)*)?[.-]?(\\S*)", os_arch)
if match:
os_name = match.group(1)
os_ver = match.group(2)
if match.group(4):
arch = match.group(4)
else:
arch = 'aarch64'
return os_name, os_ver, arch
raise ParseOsArchError()
def replace_env(env_dict: EnvDict, in_str: str):
env_list = re.findall(".*?\\$\\((.*?)\\).*?", in_str)
for env in env_list:
if env == 'FILE':
continue
if env in env_dict:
if env_dict[env] is not None:
in_str = in_str.replace(f"$({env})", env_dict[env])
else:
in_str = in_str.replace(f"$({env})", '')
else:
raise EnvNotSupported(f"Error: {env} not supported.")
return in_str
class ParseEnv(NamedTuple):
need_package_func: Callable[[Dict], bool]
env_dict: EnvDict
parse_option: ParseOption
delivery_dir: str
top_dir: str
class BlockElement(NamedTuple):
name: str
block_conf_path: str
dst_path: str
chips: Set[str]
features: Set[str]
attrs: Dict[str, str]
BLOCK_ELEMENT_PASS_THROUGH_ARGS = ['dst_path', 'chips', 'features', 'attrs']
class LoadedBlockElement(NamedTuple):
root_ele: ET.Element
use_move: bool
dst_path: str
chips: Set[str]
features: Set[str]
pkg_features: Set[str]
attrs: Dict[str, str]
class FileInfoParsedResult(NamedTuple):
file_info: FileInfo
move_infos: List[FileInfo]
dir_infos: List[Dict[str, str]]
expand_infos: List[Dict[str, str]]
class BlockConfig(NamedTuple):
dir_install_list: List[Dict]
move_files: List[FileInfo]
expand_content_list: List[Dict]
package_content_list: List[Dict]
generate_infos: List[GenerateInfo]
class PackerConfig(NamedTuple):
fill_is_common_path: Callable[[FileList], Iterator[FileItem]]
class XmlConfig(NamedTuple):
xml_relpath: str
default_config: Dict[str, str]
package_attr: PackageAttr
version_info: VersionInfo
blocks: List[BlockConfig]
version: str
version_xml: Optional[VersionXml]
packer_config: PackerConfig
def _collect_list(self, list_name):
result = []
for block in self.blocks:
result.extend(getattr(block, list_name))
return result
@property
def dir_install_list(self):
return self._collect_list('dir_install_list')
@property
def move_content_list(self):
return self._collect_list('move_files')
@property
def expand_content_list(self):
return self._collect_list('expand_content_list')
@property
def package_content_list(self):
return self._collect_list('package_content_list')
@property
def generate_infos(self) -> List[GenerateInfo]:
return self._collect_list('generate_infos')
DEFAULT_PACKAGE_ATTR = {
'gen_version_info': True,
}
def parse_package_info(package_info_ele: Optional[ET.Element]) -> Dict:
def get_package_info_attrs(ele: ET.Element) -> Iterator[Tuple[str, Union[str, bool]]]:
bool_attrs = (
'expand_asterisk', 'parallel', 'parallel_limit', 'package_check', 'check_features',
'use_move', 'gen_version_info'
)
bool_values = ('t', 'true', 'y', 'yes')
if ele.tag in bool_attrs:
if ele.text.lower() in bool_values:
yield ele.tag, True
else:
yield ele.tag, False
else:
yield ele.tag, ele.text
if not package_info_ele:
return {}
attr = dict(
chain.from_iterable(map(get_package_info_attrs, list(package_info_ele)))
)
return attr
skip_empty_lines = partial(filter, bool)
skip_comment_lines = partial(filterfalse, methodcaller('startswith', '#'))
skip_empty_and_comment_lines = pipe(
skip_empty_lines,
skip_comment_lines,
)
def load_itf_version_conf(itf_conf: str,
replace_env_func: Callable[[str], str]) -> List[str]:
load_func = pipe(
each_file_line,
strip_lines,
skip_empty_and_comment_lines,
partial(map, replace_env_func),
list,
)
return load_func(itf_conf)
def make_loaded_block_element(root_ele: ET.Element,
dst_path: str = None) -> LoadedBlockElement:
if not dst_path:
new_dst_path = ''
else:
new_dst_path = dst_path
return LoadedBlockElement(root_ele, False, new_dst_path, set(), set(), set(), {})
def parse_install_version_info(attr_info: Dict[str, str]) -> Tuple[bool, Dict]:
if attr_info.get('install') == 'true':
return True, attr_info
return False, None
def parse_itf_version_paths(version_info_ele: Optional[ET.Element]
) -> Tuple[ErrMsgs, List[str]]:
def has_itf_version(ele: ET.Element) -> bool:
return bool(ele.attrib.get('itf_version'))
if version_info_ele is None:
return [], []
interface_eles = version_info_ele.findall('./interface')
err_msgs = [
'interface element doesn\'t have itf_version attribute!'
for ele in interface_eles if not has_itf_version(ele)
]
itf_version_paths = [
ele.attrib['itf_version']
for ele in interface_eles if has_itf_version(ele)
]
return err_msgs, itf_version_paths
def parse_itf_versions(version_info_ele: Optional[ET.Element],
top_dir: str,
replace_env_func: Callable[[str], str]) -> Tuple[ErrMsgs, List[str]]:
def itf_conf_exists(path: str) -> bool:
return os.path.isfile(os.path.join(top_dir, path))
err_msgs, itf_version_paths = parse_itf_version_paths(version_info_ele)
if err_msgs:
return err_msgs, []
err_msgs = [
f'{path} doesn\'t exist!'
for path in itf_version_paths if not itf_conf_exists(path)
]
if err_msgs:
return err_msgs, []
itf_versions = flatten([
load_itf_version_conf(os.path.join(top_dir, path), replace_env_func)
for path in itf_version_paths
])
return err_msgs, itf_versions
def parse_version_info_by_root(root_ele: ET.Element,
env_dict: EnvDict,
version: str,
version_dir: Optional[str],
version_xml: Optional[VersionXml],
timestamp: str,
) -> Tuple[ErrMsgs, VersionInfo]:
version_info_ele = root_ele.find('version_info')
evaluate_info_func = partial(
evaluate_info, loaded_block=make_loaded_block_element(root_ele), env_dict=env_dict
)
if version_info_ele is None:
err_msgs, itf_versions = [], []
attr_info = {}
dst_path = ''
else:
err_msgs, itf_versions = parse_itf_versions(
version_info_ele, pkg_utils.TOP_DIR, partial(replace_env, env_dict)
)
attr_info = evaluate_info_func(version_info_ele.attrib)
dst_path = attr_info.get('dst_path', '')
version_info = VersionInfo(
*parse_install_version_info(attr_info),
itf_versions,
version,
version_xml,
timestamp
)
return err_msgs, version_info
def parse_package_attr_by_args(args: Namespace) -> Dict:
def pairs():
if hasattr(args, 'chip_name') and args.chip_name:
yield 'chip_name', args.chip_name
if hasattr(args, 'suffix') and args.suffix:
yield 'suffix', args.suffix
if hasattr(args, 'func_name') and args.func_name:
yield 'func_name', args.func_name
return dict(pairs())
def parse_package_attr(root_ele: ET.Element, args: Namespace) -> Dict:
package_info_ele = root_ele.find("package_info")
return merge_dict(
DEFAULT_PACKAGE_ATTR,
parse_package_info(package_info_ele),
parse_package_attr_by_args(args),
)
def parse_feature_config_by_root(root_ele: ET.Element) -> Dict:
package_info_ele = root_ele.find("feature_config")
return parse_package_info(package_info_ele)
def get_feature_list(args: argparse.Namespace, feature_attr: Dict) -> str:
return args.feature_list or feature_attr.get('feature_list', '')
def render_cann_version(a_ver: int,
b_ver: int,
c_ver: Optional[int],
d_ver: Optional[int],
e_ver: Optional[int],
f_ver: Optional[int]) -> str:
buffer = StringIO()
buffer.write('(')
buffer.write(f'({a_ver + 1} * 100000000) + ({b_ver + 1} * 1000000)')
if c_ver is not None:
buffer.write(f' + ({c_ver + 1} * 10000)')
if d_ver is not None:
buffer.write(f' + (({d_ver + 1} * 100) + 5000)')
if e_ver is not None:
buffer.write(f' + ({e_ver + 1} * 100)')
if f_ver is not None:
buffer.write(f' + {f_ver}')
buffer.write(')')
return buffer.getvalue()
def get_cann_version(version_dir: str) -> str:
if not version_dir:
return ""
release_pattern = re.compile(r'(\d+)\.(\d+)\.(\d+)', re.IGNORECASE)
matched = release_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), int(matched.group(3)), None, None, None
)
release_alpha_pattern = re.compile(r'(\d+)\.(\d+)\.(\d+)\.[a-z]*(\d+)', re.IGNORECASE)
matched = release_alpha_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), int(matched.group(3)), None, None,
int(matched.group(4))
)
rc_pattern = re.compile(r'(\d+)\.(\d+)\.RC(\d+)', re.IGNORECASE)
matched = rc_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), None, int(matched.group(3)), None, None
)
test_pattern = re.compile(r'(\d+)\.(\d+)\.T(\d+)', re.IGNORECASE)
matched = test_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), None, None, int(matched.group(3)), None
)
alpha_pattern = re.compile(r'(\d+)\.(\d+)\.RC(\d+)\.[a-z]*(\d+)', re.IGNORECASE)
matched = alpha_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), None, int(matched.group(3)), None,
int(matched.group(4))
)
cann_pattern = re.compile(r'CANN-(\d+)\.(\d+)', re.IGNORECASE)
matched = cann_pattern.fullmatch(version_dir)
if matched:
return render_cann_version(
int(matched.group(1)), int(matched.group(2)), None, None, None, None
)
raise IllegalVersionDir(version_dir)
def get_cann_version_info(name: str, version_dir: str) -> List[Tuple[str, str]]:
version_info = []
package_name = name[:-8]
if not version_dir:
version_str = '0'
elif version_dir.startswith('CANN-'):
version_str = version_dir[5:]
else:
version_str = version_dir
version_info.append((f'{package_name}_VERSION_STR', f'"{version_str}"'))
version = get_cann_version(version_dir)
version_info.append((f'{package_name}_VERSION', version))
return version_info
def get_default_env_items() -> Iterator[Tuple[str, str]]:
yield 'VERSION_DIR', ''
yield 'HOME', os.environ.get('HOME')
def get_env_items_by_version(version: Optional[str]) -> Iterator[Tuple[str, str]]:
if version:
yield 'ASCEND_VER', version
version_parts = version.split('.')
for idx in range(1, len(version_parts) + 1):
yield f'CUR_VER[{idx}]', '.'.join(version_parts[:idx])
yield 'CUR_VER', version
yield 'LOWER_CUR_VER', version.lower()
def get_env_items_by_version_dir(version_dir: Optional[str]) -> Iterator[Tuple[str, str]]:
if version_dir:
yield 'VERSION_DIR', version_dir
def get_os_arch_default_env_items() -> Iterator[Tuple[str, str]]:
yield 'OS_NAME', 'linux'
yield 'OS_VER', ''
yield 'ARM', 'aarch64'
yield 'TARGET_ENV', '$(TARGET_ENV)'
def get_env_items_by_os_arch(os_arch: str) -> Iterator[Tuple[str, str]]:
if os_arch:
os_name, os_ver, arch = parse_os_arch(os_arch)
yield 'OS_NAME', os_name
yield 'OS_VER', os_ver
yield 'ARCH', arch
yield 'OS_ARCH', os_arch
if arch in ('arm', 'sw_64'):
yield 'ARM', arch
else:
yield 'ARM', 'aarch64'
yield 'TARGET_ENV', f"{arch}-linux"
else:
yield from get_os_arch_default_env_items()
def get_env_items_by_timestamp(timestamp: Optional[str]) -> Iterator[Tuple[str, str]]:
if timestamp:
yield 'TIMESTAMP', timestamp
yield 'TIMESTAMP_NO', timestamp.replace('_', '')
else:
yield 'TIMESTAMP', '0'
yield 'TIMESTAMP_NO', '0'
def parse_env_dict(os_arch: str,
package_attr: PackageAttr,
version: Optional[str],
version_dir: Optional[str],
timestamp: Optional[str]) -> EnvDict:
env_dict = dict(
chain(
get_default_env_items(),
yield_if(('ARCH', package_attr.get('default_arch')), itemgetter(1)),
get_env_items_by_os_arch(os_arch),
get_env_items_by_version(version),
get_env_items_by_version_dir(version_dir),
yield_if(('VERSION_DIR', version_dir), constant(version_dir)),
get_env_items_by_timestamp(timestamp),
)
)
return env_dict
def get_timestamp(args: Namespace) -> Optional[str]:
if 'tag' not in args:
return None
tag = args.tag
if tag:
timestamp_re = r"\d{8}_\d{9}"
timestamp_list = re.findall(timestamp_re, tag)
if not timestamp_list:
raise PackageError("The {} format is incorrect.".format(tag))
timestamp = timestamp_list[-1]
else:
timestamp = None
return timestamp
def extract_element_attrib(ele: ET.Element) -> Dict:
return ele.attrib.copy()
def extract_generate_info_content(generate_info_ele: ET.Element, env_dict: EnvDict) -> Dict:
file_content = {
sub_item.tag: replace_env(env_dict, sub_item.text)
for sub_item in list(generate_info_ele)
}
return {
'content': file_content
}
def parse_generate_infos_by_loaded_block(loaded_block: LoadedBlockElement,
default_config: Dict[str, str],
env_dict: EnvDict,
need_package_func: Callable[[Dict], bool]) -> List[Dict]:
return invoke(
pipe(
partial(
map, pipe(
dispatch(
pipe(
extract_element_attrib,
partial(merge_dict, default_config),
partial(evaluate_info, loaded_block=loaded_block, env_dict=env_dict),
),
partial(extract_generate_info_content, env_dict=env_dict),
),
star_apply(merge_dict),
)
),
partial(filter, need_package_func),
list,
),
loaded_block.root_ele.findall('generate_info')
)
def join_pkg_inner_softlink(link_str_list: List[str]) -> str:
path = "/".join(link_str_list)
return os.path.normpath(path)
def check_contain_asterisk(value: str) -> bool:
if '*' in value:
return True
return False
def check_value(value: str,
package_check: bool,
package_attr: PackageAttr):
if package_check and package_attr.get('suffix') == 'run':
if check_contain_asterisk(value):
raise ContainAsteriskError(value)
def need_package(info: Dict[str, str],
input_feature: PkgFeature,
parse_option: ParseOption,
env_dict: EnvDict) -> bool:
pkg_features = info['pkg_feature']
if not input_feature.matched(pkg_features):
return False
return True
def get_dst_prefix(file_info: FileInfo, env: ParseEnv) -> str:
return os.path.join(env.delivery_dir, file_info['dst_path'])
def get_dst_target(file_info: FileInfo, env: ParseEnv) -> str:
dst_prefix = get_dst_prefix(file_info, env)
return os.path.join(dst_prefix, os.path.basename(file_info.get('value')))
def make_hash(filepath: str) -> str:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as file:
sha256_hash.update(file.read())
return sha256_hash.hexdigest()
def config_hash(parsed_result: FileInfoParsedResult, env: ParseEnv):
file_info = parsed_result.file_info
if file_info and file_info['configurable'] == 'TRUE':
src_target = get_dst_target(file_info, env)
hash_value = make_hash(src_target)
file_info['hash'] = hash_value
return parsed_result
def apply_func(func: Callable[[str], str],
value: Union[List[str], Set[str], str]
) -> Union[List[str], Set[str], str]:
if isinstance(value, list):
return list(map(func, value))
if isinstance(value, set):
return set(map(func, value))
return func(value)
REAL_PREFIX = 'real:'
def join_dst_path(base: str, other: str) -> str:
if other.startswith('real:'):
other = other[len(REAL_PREFIX):]
return other
return os.path.join(base, other)
def evaluate_info(info: Dict[str, str],
loaded_block: LoadedBlockElement,
env_dict: EnvDict,
pkg: bool = False) -> Dict[str, str]:
if pkg:
dst_keys = ('src_path', 'value')
else:
dst_keys = ('dst_path', 'pkg_softlink')
replace_env_func = partial(replace_env, env_dict)
add_dst_path_func = partial(join_dst_path, loaded_block.dst_path)
def split_pkg_softlink(key: str, value: str) -> Tuple[str, str]:
if key == 'pkg_softlink':
return key, value.split(';')
return key, value
def upper_value(key: str, value: str) -> Tuple[str, str]:
if key == 'configurable':
return key, value.upper()
return key, value
def add_dst_path(key: str, value: str) -> Tuple[str, str]:
if key in dst_keys:
return key, apply_func(add_dst_path_func, value)
return key, value
def replace_pkg_inner_softlink(key: str, value: str) -> Tuple[str, str]:
if key == 'pkg_inner_softlink':
inner_softlink_new = [
join_pkg_inner_softlink(link_str.split(':'))
for link_str in value.split(';')
if ':' not in link_str or link_str.split(':')[0] == loaded_block.dst_path
]
if inner_softlink_new:
return key, ';'.join(inner_softlink_new)
return key, 'NA'
return key, value
def merge_feature(key: str, value: str) -> Tuple[str, Set[str]]:
if key in ('chip', 'feature', 'pkg_feature'):
config_features = config_feature_to_set(value, key)
return key, config_features | getattr(loaded_block, f'{key}s')
return key, value
def eval_value(key: str, value: str) -> str:
if value is None:
return key, None
return key, apply_func(replace_env_func, value)
def bool_value(key: str, value: str) -> Tuple[str, bool]:
if key == 'preprocess':
return key, value.lower() in BOOL_VALUES
return key, value
eval_value_func = star_pipe(
upper_value, split_pkg_softlink, add_dst_path, replace_pkg_inner_softlink,
merge_feature, eval_value, bool_value, lambda x, y: y,
)
return {
key: eval_value_func(key, value)
for key, value in
itertools.chain(
[
('dst_path', ''), ('configurable', 'FALSE'),
('chip', None), ('feature', None), ('pkg_feature', None),
],
info.items()
)
}
def parse_dir_info_elements(loaded_block: LoadedBlockElement,
default_config: Dict[str, str],
package_attr: PackageAttr,
env: ParseEnv) -> List[Dict[str, str]]:
dir_info_elements: List[ET.Element] = loaded_block.root_ele.findall('dir_info')
dir_infos = []
for item in dir_info_elements:
dir_config = default_config.copy()
dir_config.update(item.attrib)
dir_config['module'] = dir_config.get('value')
for sub_item in list(item):
dir_info = dir_config.copy()
dir_info.update(sub_item.attrib)
dir_info = evaluate_info(dir_info, loaded_block, env.env_dict)
check_value(
dir_info['value'], env.parse_option.package_check, package_attr
)
if not env.need_package_func(dir_info):
continue
dir_infos.append(dir_info)
return dir_infos
def expand_dir(file_info: FileInfo, get_dst_target_func: Callable[[FileInfo], str]):
file_info_list = []
dir_info_list = []
dst_target = get_dst_target_func(file_info)
value_list = file_info.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info.get('value')
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name
)
subdir_mod = file_info.get("subdir_mod", None)
if subdir_mod is not None:
dir_info_copy['install_mod'] = subdir_mod
dir_info_copy['install_softlink'] = 'NA'
dir_info_copy['pkg_inner_softlink'] = 'NA'
dir_info_list.append(dir_info_copy)
for root, dirs, files in os.walk(dst_target, followlinks=True):
dirs.sort()
files.sort()
dirs_to_remove = []
for name in dirs:
dirname = os.path.join(root, name)
if os.path.islink(dirname) and not need_dereference(file_info):
copy_file_info = create_file_info(dirname, dst_target, file_info, name, target_name)
copy_file_info['install_softlink'] = 'NA'
copy_file_info['pkg_inner_softlink'] = 'NA'
file_info_list.append(copy_file_info)
dirs_to_remove.append(name)
continue
relative_dirname = os.path.relpath(dirname, dst_target)
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info.get('value')
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dirname
)
dir_info_copy['install_softlink'] = 'NA'
dir_info_copy['pkg_inner_softlink'] = 'NA'
subdir_mod = file_info.get("subdir_mod", None)
if subdir_mod is not None:
dir_info_copy['install_mod'] = subdir_mod
dir_info_list.append(dir_info_copy)
for name in files:
filename = os.path.join(root, name)
copy_file_info = create_file_info(filename, dst_target, file_info, name, target_name)
file_info_list.append(copy_file_info)
for name in dirs_to_remove:
dirs.remove(name)
return file_info_list, dir_info_list
def create_file_info(dirname, dst_target, file_info, name, target_name):
relative_filename = os.path.relpath(dirname, dst_target)
relative_dir_name = os.path.split(relative_filename)[0]
copy_file_info = file_info.copy()
copy_file_info['value'] = name
copy_file_info['src_path'] = os.path.join(
file_info['src_path'], file_info['value'], relative_dir_name
)
copy_file_info['dst_path'] = os.path.join(
file_info['dst_path'], target_name, relative_dir_name
)
copy_file_info['install_path'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dir_name
)
return copy_file_info
def expand_file_info_asterisk(parsed_result: FileInfoParsedResult,
env: ParseEnv) -> Iterator[FileInfoParsedResult]:
file_info = parsed_result.file_info
if check_contain_asterisk(file_info.get('value', '')):
dst_prefix = get_dst_prefix(file_info, env)
dst_targets = sorted(glob.glob(get_dst_target(file_info, env)))
if 'exclude' in file_info:
exclude = list(map(methodcaller('strip'), file_info['exclude'].split(';')))
else:
exclude = []
for dst_target in dst_targets:
value = os.path.relpath(dst_target, dst_prefix)
if value in exclude:
continue
new_file_info = file_info.copy()
new_file_info['value'] = value
if 'pkg_inner_softlink' in new_file_info:
pkg_inner_softlink = new_file_info['pkg_inner_softlink']
new_file_info['pkg_inner_softlink'] = pkg_inner_softlink.replace(
'$(FILE)', os.path.basename(dst_target)
)
yield parsed_result._replace(file_info=new_file_info)
else:
yield parsed_result
def trans_to_stream(item: Any) -> Iterator[Any]:
yield item
def need_dereference(file_info: FileInfo) -> bool:
if 'dereference' in file_info:
return True
return False
def need_expand(file_info: FileInfo, get_dst_target_func: Callable[[FileInfo], str]) -> bool:
if file_info.get('entity') == 'true':
return False
dst_target = get_dst_target_func(file_info)
if os.path.isdir(dst_target):
if need_dereference(file_info):
return True
if os.path.islink(dst_target):
return False
return True
return False
def expand_file_info(parsed_result: FileInfoParsedResult,
use_move: bool,
get_dst_target_func: Callable[[FileInfo], str]) -> FileInfoParsedResult:
file_info = parsed_result.file_info
if need_expand(file_info, get_dst_target_func):
expand_infos, dir_infos = expand_dir(file_info, get_dst_target_func)
return FileInfoParsedResult(
merge_dict(file_info, {'is_dir': True}), [], dir_infos, expand_infos
)
if use_move:
return FileInfoParsedResult(
{}, [file_info], parsed_result.dir_infos, parsed_result.expand_infos
)
return parsed_result
def trans_file_info_to_result(file_info: FileInfo) -> FileInfoParsedResult:
return FileInfoParsedResult(file_info, [], [], [])
def parse_file_element(file_ele: ET.Element,
file_config: Dict[str, str],
loaded_block: LoadedBlockElement,
package_attr: PackageAttr,
env: ParseEnv) -> Iterator[FileInfoParsedResult]:
file_info = merge_dict(file_config, file_ele.attrib)
file_info = evaluate_info(file_info, loaded_block, env.env_dict)
if not env.need_package_func(file_info):
return
if package_attr.get('expand_asterisk', False):
expand_asterisk_func = partial(expand_file_info_asterisk, env=env)
else:
expand_asterisk_func = trans_to_stream
if 'install_path' not in file_info:
file_info['install_path'] = ''
trans_file_info_func = pipe(
trans_file_info_to_result,
expand_asterisk_func,
partial(map, partial(config_hash, env=env)),
partial(
map,
partial(
expand_file_info,
use_move=loaded_block.use_move,
get_dst_target_func=partial(get_dst_target, env=env)
)
),
)
yield from trans_file_info_func(file_info)
def parse_file_info_elements(loaded_block: LoadedBlockElement,
default_config: Dict[str, str],
package_attr: PackageAttr,
env: ParseEnv) -> Iterator[FileInfoParsedResult]:
file_info_elements: List[ET.Element] = loaded_block.root_ele.findall('file_info')
for file_info_ele in file_info_elements:
file_config = merge_dict(
default_config,
file_info_ele.attrib,
{'module': file_info_ele.attrib.get('value')}
)
for sub_item in list(file_info_ele):
yield from parse_file_element(
sub_item, file_config, loaded_block, package_attr, env
)
def unique_infos(infos: Iterable) -> List[Dict[str, str]]:
cache: Set[str] = set()
new_infos = []
for info in infos:
if info['value'] in cache:
continue
cache.add(info['value'])
new_infos.append(info)
return new_infos
def parse_block_config(loaded_block: LoadedBlockElement,
package_attr: PackageAttr,
parse_env: ParseEnv):
default_config = copy.copy(loaded_block.attrs)
default_config.update(loaded_block.root_ele.attrib)
dir_infos = parse_dir_info_elements(
loaded_block,
default_config,
package_attr,
parse_env,
)
file_info_results = list(
chain(
parse_file_info_elements(
loaded_block,
default_config,
package_attr,
parse_env,
)
)
)
generate_infos = parse_generate_infos_by_loaded_block(
loaded_block, default_config, parse_env.env_dict, parse_env.need_package_func
)
return BlockConfig(
unique_infos(
itertools.chain(dir_infos,
flatten(result.dir_infos for result in file_info_results)
)
),
list(flatten(map(attrgetter('move_infos'), file_info_results))),
list(flatten(map(attrgetter('expand_infos'), file_info_results))),
[result.file_info for result in file_info_results if result.file_info],
generate_infos,
)
def make_loaded_block_element(root_ele: ET.Element,
dst_path: str = '') -> LoadedBlockElement:
return LoadedBlockElement(root_ele, False, dst_path, set(), set(), set(), {})
def parse_block_element(block_ele: ET.Element,
block_info_attr: Dict[str, str]) -> BlockElement:
def filter_attrs(attrs: Dict[str, str]) -> Dict[str, str]:
return {
key: value
for key, value in attrs.items()
if key not in ('dst_path', 'block_conf_path')
}
def with_merged_attrs(attrs: Dict[str, str]) -> BlockElement:
name = attrs.get('name')
block_conf_path = attrs.get('block_conf_path')
if not name:
raise BlockConfigError("block's name is not set!")
if not block_conf_path:
raise BlockConfigError("block's conf_path is not set!")
return BlockElement(
name=name,
block_conf_path=block_conf_path,
dst_path=attrs.get('dst_path', ''),
chips=config_feature_to_set(attrs.get('chip'), 'chip'),
features=config_feature_to_set(attrs.get('feature'), 'feature'),
attrs=filter_attrs(attrs)
)
return with_merged_attrs(merge_dict(block_info_attr, block_ele.attrib))
def parse_block_info(block_info: ET.Element) -> List[BlockElement]:
def parse_block_elements(block_elements: List[ET.Element]) -> List[BlockElement]:
return [
parse_block_element(block_ele, block_info.attrib) for block_ele in block_elements
]
return parse_block_elements(list(block_info))
def get_block_filepath(block_element: BlockElement) -> str:
return os.path.join(
pkg_utils.TOP_SOURCE_DIR, BLOCK_CONFIG_PATH, block_element.block_conf_path,
f'{block_element.name}.xml'
)
def load_block_element(package_attr: PackageAttr,
block_element: BlockElement) -> LoadedBlockElement:
def with_filepath(block_xml: str):
if not os.path.exists(block_xml):
raise BlockConfigError(f"block's config xml {block_xml} does not exist!")
try:
return LoadedBlockElement(
root_ele=ET.parse(block_xml).getroot(),
use_move=package_attr.get('use_move', False),
**{
name: getattr(block_element, name)
for name in BLOCK_ELEMENT_PASS_THROUGH_ARGS
}
)
except Exception:
raise BlockConfigError(f"dependent block configuration {block_xml} parse failed!")
return with_filepath(get_block_filepath(block_element))
def parse_blocks(root_ele: ET.Element,
package_attr: PackageAttr,
parse_env: ParseEnv) -> List[BlockConfig]:
return [
parse_block_config(
loaded_block, package_attr, parse_env
)
for loaded_block in itertools.chain(
[make_loaded_block_element(root_ele)],
map(
partial(load_block_element, package_attr),
chain.from_iterable(
map(parse_block_info, root_ele.findall("block_info"))
)
)
)
]
def read_version_info(package_attr: PackageAttr) -> Tuple[str, str]:
version_path = os.path.join(pkg_utils.TOP_DIR, package_attr.get('version'))
with open(version_path, 'r') as file:
version = file.readline().strip()
version_dir=""
m = re.match(r'[.a-zA-Z0-9]+$', version)
if not m:
raise VersionFormatNotMatch()
return version, version_dir
def get_feature_list_filepath(top_dir: str,
pkg_name: str,
chip_scenes: str,
feature_list: str) -> Optional[str]:
if not feature_list:
return None
return os.path.join(
top_dir, pkg_utils.CONFIG_SCRIPT_PATH, pkg_name, chip_scenes, feature_list
)
def parse_feature(xml_root: ET.Element, args: Namespace, top_dir: str) -> PkgFeature:
feature_attr = parse_feature_config_by_root(xml_root)
feature_list = get_feature_list(args, feature_attr)
feature_list_path = get_feature_list_filepath(
top_dir, args.pkg_name, args.chip_name, feature_list
)
feature = make_pkg_feature(
combine_feature_and_feature_list(
"",
feature_list_path
),
False
)
return feature
def parse_xml_config(filepath: str,
xml_relpath: str,
delivery_dir: str,
parse_option: ParseOption,
args: Namespace) -> XmlConfig:
try:
tree = ET.parse(filepath)
xml_root = tree.getroot()
except ET.ParseError as ex:
CommLog.cilog_error("xml parse %s failed: %s!", filepath, ex)
sys.exit(FAIL)
default_config = xml_root.attrib.copy()
package_attr = parse_package_attr(xml_root, args)
feature = parse_feature(xml_root, args, pkg_utils.TOP_SOURCE_DIR)
version, version_dir = read_version_info(package_attr)
if args.disable_multi_version:
version_dir = None
timestamp = get_timestamp(args)
try:
env_dict = parse_env_dict(
parse_option.os_arch, package_attr, version, version_dir, timestamp
)
except ParseOsArchError:
CommLog.cilog_error(
"os_arch %s is not correctly configured: %s!",
parse_option.os_arch, filepath
)
sys.exit(FAIL)
err_msgs, version_info = parse_version_info_by_root(
xml_root, env_dict, version, version_dir, None,
timestamp
)
need_package_func = partial(
need_package, input_feature=feature, parse_option=parse_option, env_dict=env_dict
)
parse_env = ParseEnv(
need_package_func, env_dict, parse_option, delivery_dir, pkg_utils.TOP_SOURCE_DIR
)
blocks = parse_blocks(
xml_root, package_attr, parse_env
)
if is_multi_version(version_dir):
fill_is_common_path_func = partial(
fill_is_common_path, target_env=env_dict.get('TARGET_ENV')
)
else:
fill_is_common_path_func = iter
return XmlConfig(
xml_relpath, default_config, package_attr, version_info, blocks, version, None,
PackerConfig(fill_is_common_path_func)
)