import json
import os
import sys
from abc import ABC, abstractmethod
from Utils import ChangeFileEntity, XTSTargetUtils, PathUtils, MatchConfig, HOME
class Ci_Manager(ABC):
@abstractmethod
def get_targets_from_change(self, change_list: list):
pass
def write_result(self, target_path_set: set, target_set: set):
print(f"{self.__class__.__name__} append build_targets : {self._build_targets}")
print(f"{self.__class__.__name__} append build_paths : {self._build_paths}")
xts_root_target = PathUtils.get_root_target(self._xts_root_dir)
if xts_root_target in target_set:
print("compile full testsuites")
return
if xts_root_target in self._build_targets:
target_set.add(xts_root_target)
target_path_set.clear()
print("compile full testsuites")
return
target_set.update(set(self._build_targets))
target_path_set.update(set(self._build_paths))
class ComponentManager(Ci_Manager):
def __init__(self, xts_root_dir, code_root_dir):
self._xts_root_dir = xts_root_dir
self._code_root_dir = code_root_dir
self._build_paths = []
self._build_targets = []
self._ieyes_xts_cfg_file = os.path.join(self._code_root_dir, 'ieyes_xts.json')
self._ieyes_xts_map = {}
def get_targets_from_change(self, change_list):
if not os.path.exists(self._ieyes_xts_cfg_file):
return 0
with open(self._ieyes_xts_cfg_file, 'r') as f:
self._ieyes_xts_map = json.load(f)
for changeFileEntity in change_list:
if changeFileEntity.path not in MatchConfig.get_xts_path_list():
ret = self.getTargetsPaths(changeFileEntity)
if ret == 1:
continue
return 0
def getTargetsPaths(self, change_file_entity: ChangeFileEntity):
try:
bundle_name = self.getBundleName(change_file_entity.path)
except Exception as e:
print("warning: Get bundle_name failed from cmoponent repository {}".format(change_file_entity.name))
return 1
print(f"{self.__class__.__name__} append bundle_name : {bundle_name}")
paths = XTSTargetUtils.getPathsByBundle([bundle_name], self._xts_root_dir, self._ieyes_xts_map)
if paths:
change_file_entity.set_already_match_utils(True)
self._build_paths += paths
return 0
def getBundleName(self, path) -> str:
with open(os.path.join(HOME, path, "bundle.json"), 'r') as file:
data = json.load(file)
bundle_name = data['component']['name']
return bundle_name
class XTSManager(Ci_Manager):
def __init__(self, xts_root_dir, code_root_dir):
self._xts_root_dir = xts_root_dir
self._code_root_dir = code_root_dir
self._build_paths = []
self._build_targets = []
self._need_all = False
def get_targets_from_change(self, change_list):
for changeFileEntity in change_list:
if changeFileEntity.path in self._xts_root_dir:
ret = self.getTargetsPaths(changeFileEntity)
if ret == 1:
print("warning: failed to parse modification of repository {}".format(self.__class__.__name__))
return 1
if self._need_all:
self._build_targets += PathUtils.get_all_build_target(self._xts_root_dir)
return 0
def getTargetsPaths(self, changeFileEntity: ChangeFileEntity):
for file in changeFileEntity.add + changeFileEntity.modified:
file = os.path.join(self._code_root_dir, file)
if PathUtils.isMatchRules(file, MatchConfig.get_exception_path()):
continue
if PathUtils.isTargetContains(self._build_paths, file):
continue
build_File = XTSTargetUtils.get_current_Build(self._xts_root_dir, file)
if (os.path.dirname(build_File) == self._xts_root_dir or
PathUtils.isMatchRules(file, MatchConfig.get_all_com_path())):
self._need_all = True
else:
self._build_paths.append(os.path.dirname(build_File))
for file in changeFileEntity.delete:
file = os.path.join(self._code_root_dir, file)
if PathUtils.isMatchRules(file, MatchConfig.get_exception_path()):
continue
if PathUtils.isTargetContains(self._build_paths, file):
continue
exist_path = PathUtils.get_current_exist(self._xts_root_dir, os.path.dirname(file))
build_File = XTSTargetUtils.get_current_Build(self._xts_root_dir, exist_path)
if (os.path.dirname(build_File) == self._xts_root_dir or
PathUtils.isMatchRules(file, MatchConfig.get_all_com_path())):
self._need_all = True
else:
self._build_paths.append(os.path.dirname(build_File))
return 0
class WhitelistManager(Ci_Manager):
def __init__(self, xts_root_dir, code_root_dir, suite_type):
self._xts_root_dir = xts_root_dir
self._code_root_dir = code_root_dir
self._build_paths = []
self._build_targets = []
self.full_impact_flag = "FULL_IMPACT"
self.full_impact_flag_totally = "FULL_IMPACT_TOTALLY"
self._suite_type = suite_type
def get_targets_from_change(self, change_list):
for changeFileEntity in change_list:
if changeFileEntity.path not in MatchConfig.get_xts_path_list():
ret = self.getTargetsandPaths(changeFileEntity)
if ret == 1:
return 1
return 0
def getTargetsandPaths(self, change_file_entity):
white_list = MatchConfig.get_white_list_repo()
if change_file_entity.path not in white_list:
return 0
change_file_entity.set_already_match_utils(True)
whitelist_item = white_list.get(change_file_entity.path)
if whitelist_item.get("subdirectory"):
self.getTargetsandPathsOfSubdirectory(change_file_entity, whitelist_item.get("subdirectory"))
else:
self.getTargetsAndPathsByCfg(
whitelist_item.get("suite_type"), whitelist_item.get("add_bundle"), whitelist_item.get("add_target"))
return 0
def getTargetsandPathsOfSubdirectory(self, change_file_entity, whitelist_subdir):
change_file_list = change_file_entity.add + change_file_entity.modified + change_file_entity.delete
for path in whitelist_subdir:
isMatch = False
for file in change_file_list:
if file.startswith(path):
isMatch = True
break
if not isMatch:
continue
config = whitelist_subdir.get(path)
self.getTargetsAndPathsByCfg(config.get("suite_type"), config.get("add_bundle"), config.get("add_target"))
def getTargetsAndPathsByCfg(self, suite_type, add_bundle, add_target):
if suite_type and not list(set(suite_type) & set(self._suite_type)):
return
targets = []
if add_target:
if add_target[0] == self.full_impact_flag:
targets = PathUtils.get_all_build_target(self._xts_root_dir, 0)
if add_target[0] == self.full_impact_flag_totally:
targets = PathUtils.get_all_build_target(self._xts_root_dir, 1)
else:
xts_parts = "/".join([p for p in os.path.normpath(self._xts_root_dir).split(os.sep) if p][-3:])
for target in add_target:
if xts_parts in target:
targets.append(target)
if add_bundle:
paths = XTSTargetUtils.getPathsByBundle(add_bundle, self._xts_root_dir)
if paths:
self._build_paths += paths
if targets:
self._build_targets += targets
class GetInterfaceData(Ci_Manager):
def __init__(self, xts_root_dir, code_root_dir, suite_type):
self._xts_root_dir = xts_root_dir
self._code_root_dir = code_root_dir
self._build_paths = []
self._build_targets = []
self.sum_change_list_path = []
self.bundle_name_list = []
self.match_path_list = []
self.no_match_path_list = []
self._suite_type = suite_type
def get_first_levels_path(self, path, num):
normalized_path = os.path.normpath(path)
parts = normalized_path.split(os.sep)
return os.sep.join(parts[:num])
def path_error(self, path_list):
if len(path_list) > 0:
raise Exception('error: Failed to obtain interface file ownership, Please config in test/xts/tools/config/ci_api_part_name.json')
def get_targets_from_change(self, change_list):
for store in change_list:
if store.path in MatchConfig.get_interface_path_list() and "hap_static" in self._suite_type:
self._build_targets = "xts_{}".format(os.path.basename(self._xts_root_dir))
return
if "acts" in self._xts_root_dir:
self.get_c_bundle_name(change_list, MatchConfig.get_interface_json_c_data())
self.get_driver_interface_bundle_name(change_list, MatchConfig.get_interface_json_driver_interface_data())
self.get_js_bundle_name(change_list, MatchConfig.get_interface_json_js_data())
for path in self.sum_change_list_path:
if path not in self.match_path_list:
self.no_match_path_list.append(path)
self.bundle_name_list = list(set(self.bundle_name_list))
self._build_targets = list(set(self._build_targets))
print('INTERFACE_BUNDLE_NAME = ', self.bundle_name_list)
try:
self.path_error(self.no_match_path_list)
except Exception as e:
print(e)
for path in self.no_match_path_list:
print("error: Cannot match path {}".format(path))
sys.exit(1)
self._build_paths = XTSTargetUtils.getPathsByBundle(self.bundle_name_list, self._xts_root_dir)
else:
for change in change_list:
if change.path in MatchConfig.get_interface_path_list():
self._build_targets += PathUtils.get_all_build_target(self._xts_root_dir)
return
def get_js_bundle_name(self, change_list, js_json_data):
for store in change_list:
if store.path != MatchConfig.get_interface_path_list()[0]:
continue
paths = store.add + store.modified + store.delete
self.sum_change_list_path += paths
targete_data = [data for data in js_json_data if data['path'] in paths]
for _data in targete_data:
store.set_already_match_utils(True)
self.match_path_list.append(_data.get('path'))
self.bundle_name_list += _data.get('bundle_name')
self._build_targets += _data.get('build_targets')
for path in paths:
for source_path in js_json_data:
if PathUtils.is_parent_path(source_path.get('path'), path):
self.match_path_list.append(path)
store.set_already_match_utils(True)
self.bundle_name_list += source_path.get('bundle_name')
def get_driver_interface_bundle_name(self, change_list, driver_interface_json_data):
add_bundle_json_path = []
for store in change_list:
if store.path != MatchConfig.get_interface_path_list()[2]:
continue
for path in store.add + store.modified + store.delete:
self.sum_change_list_path.append(path)
for source_path in driver_interface_json_data:
if PathUtils.is_parent_path(source_path.get('path'), path):
store.set_already_match_utils(True)
self.bundle_name_list += source_path.get('bundle_name')
self.match_path_list.append(path)
for path in store.add:
if os.path.basename(path) == 'bundle.json':
try:
with open(os.path.abspath(__file__).split('/test/')[0] + '/' + path, 'r') as d:
for k, v in json.load(d).items():
if k == 'component':
add_bundle_json_path.append([path, v['name']])
except FileNotFoundError:
print("error: The repository drivers/interface has a new directory and no bundle.json is found under the directory, Please add bundle.json")
sys.exit(1)
for path in store.add:
for path_name in add_bundle_json_path:
if self.get_first_levels_path(path, 3) == self.get_first_levels_path(path_name[0], 3):
self.match_path_list.append(path)
store.set_already_match_utils(True)
self.bundle_name_list.append(path_name[1])
def get_c_bundle_name(self, change_list, c_json_data):
for store in change_list:
if store.path != MatchConfig.get_interface_path_list()[1]:
continue
for path in store.add + store.modified + store.delete:
self.sum_change_list_path.append(path)
for source_path in c_json_data:
if PathUtils.is_parent_path(source_path.get('path'), path):
self.match_path_list.append(path)
store.set_already_match_utils(True)
self.bundle_name_list += source_path.get('bundle_name')
elif path == source_path.get('path'):
self.match_path_list.append(path)
store.set_already_match_utils(True)
self.bundle_name_list += source_path.get('bundle_name')