import os
import re
import sys
import json
import urllib.request
from resources.global_var import CURRENT_OHOS_ROOT
from resources.global_var import COMPONENTS_PATH_DIR
from exceptions.ohos_exception import OHOSException
from util.io_util import IoUtil
from containers.status import throw_exception
def get_part_name():
part_name_list = []
if len(sys.argv) > 2 and not sys.argv[2].startswith("-"):
for name in sys.argv[2:]:
if not name.startswith('-'):
part_name_list.append(name)
else:
break
return part_name_list
class ComponentUtil():
@staticmethod
def is_in_component_dir(path: str) -> bool:
return _recurrent_search_bundle_file(path)[0]
@staticmethod
def is_component_in_product(component_name: str, product_name: str) -> bool:
build_configs_path = os.path.join(
CURRENT_OHOS_ROOT, 'out', product_name, 'build_configs')
if os.path.exists(build_configs_path):
for root, dirs, files in os.walk(build_configs_path, topdown=True, followlinks=False):
if component_name in dirs:
return True
return False
@staticmethod
def get_component_name(path: str) -> str:
found_bundle_file, bundle_path = _recurrent_search_bundle_file(path)
if found_bundle_file:
data = IoUtil.read_json_file(bundle_path)
return data['component']['name']
return ''
@staticmethod
def get_component(path: str) -> str:
found_bundle_file, bundle_path = _recurrent_search_bundle_file(path)
if found_bundle_file:
data = IoUtil.read_json_file(bundle_path)
return data['component']['name'], os.path.dirname(bundle_path)
return '', ''
@staticmethod
def get_default_deps(variant: str, has_test=False) -> str:
gen_default_deps_json(variant, CURRENT_OHOS_ROOT, has_test)
default_deps_path = os.path.join(
CURRENT_OHOS_ROOT, 'out', 'preloader', 'default_deps.json')
return default_deps_path
@staticmethod
def get_product_component_path(product_name: str):
gen_product_component_list(product_name)
product_components_out_file = os.path.join(CURRENT_OHOS_ROOT, "out", "preloader", "product_components.json")
return product_components_out_file
@staticmethod
@throw_exception
def get_component_module_full_name(out_path: str, component_name: str, module_name: str) -> str:
root_path = os.path.join(out_path, "build_configs")
target_info = ""
module_list = []
for file in os.listdir(root_path):
if len(target_info):
break
file_path = os.path.join(root_path, file)
if not os.path.isdir(file_path):
continue
for component in os.listdir(file_path):
if os.path.isdir(os.path.join(file_path, component)) and component == component_name:
target_info = IoUtil.read_file(
os.path.join(file_path, component, "BUILD.gn"))
break
pattern = re.compile(r'(?<=module_list = )\[([^\[\]]*)\]')
results = pattern.findall(target_info)
for each_tuple in results:
module_list = each_tuple.replace('\n', '').replace(
' ', '').replace('\"', '').split(',')
for target_path in module_list:
if target_path != '':
path, target = target_path.split(":")
if target == module_name:
return target_path
raise OHOSException('You are trying to compile a module {} which do not exists in {} while compiling {}'.format(
module_name, component_name, out_path), "4001")
@staticmethod
def search_bundle_file(component_name: str):
all_bundle_path = get_all_bundle_path(CURRENT_OHOS_ROOT)
return all_bundle_path.get(component_name)
def _recurrent_search_bundle_file(path: str):
cur_dir = path
while cur_dir != CURRENT_OHOS_ROOT:
bundle_json = os.path.join(
cur_dir, 'bundle.json')
if os.path.exists(bundle_json):
return True, bundle_json
cur_dir = os.path.dirname(cur_dir)
return False, ''
def get_all_bundle_path(path):
bundles_path = {}
for root, dirnames, filenames in os.walk(path):
if root == path:
dirnames[:] = [d for d in dirnames if d not in ["out", ".repo", "binarys", "prebuilts", "kernel"]]
for filename in filenames:
if filename == "bundle.json":
bundle_json = os.path.join(root, filename)
data = IoUtil.read_json_file(bundle_json)
bundles_path = process_bundle_path(bundle_json, bundles_path, data)
IoUtil.dump_json_file(COMPONENTS_PATH_DIR, bundles_path)
return bundles_path
def process_bundle_path(bundle_json, bundles_path, data):
if data.get("component") and data.get("component").get("name"):
bundles_path[data["component"]["name"]] = os.path.dirname(bundle_json)
return bundles_path
def gen_default_deps_json(variant, root_path, has_test=False):
part_name_list = get_part_name()
default_deps_out_file = os.path.join(root_path, "out", "preloader", "default_deps.json")
default_deps_file = os.path.join(root_path, "build", "indep_configs", "variants", "common", 'default_deps.json')
default_deps_json = IoUtil.read_json_file(default_deps_file)
default_deps_json.append("variants_" + variant)
download_part_whitelist_path = os.path.join(root_path, 'build', 'indep_configs', 'config',
"download_part_whitelist.json")
download_part_whitelist = IoUtil.read_json_file(download_part_whitelist_path)
for part_name in part_name_list:
if part_name in download_part_whitelist.keys():
for dependencies_part in download_part_whitelist[part_name]:
default_deps_json.append(dependencies_part)
if has_test:
default_deps_json.append('developer_test')
preloader_path = os.path.join(root_path, "out", "preloader")
os.makedirs(preloader_path, exist_ok=True)
IoUtil.dump_json_file(default_deps_out_file, default_deps_json)
def gen_product_component_list(product_name: str):
config_path = os.path.join(CURRENT_OHOS_ROOT, "build", "indep_configs", "config", "product_component_url_config.json")
component_url_list = IoUtil.read_json_file(config_path).get(product_name, [])
component_list = set()
for url in component_url_list:
try:
response = urllib.request.urlopen(url, timeout=10)
data = json.loads(response.read().decode("utf-8"))
read_component(component_list, data)
except Exception as e:
raise OHOSException(f"Failed to download or parse component config from {url}: {e}", "0000")
product_components_out_file = os.path.join(CURRENT_OHOS_ROOT, "out", "preloader", "product_components.json")
os.makedirs(os.path.dirname(product_components_out_file), exist_ok=True)
IoUtil.dump_json_file(product_components_out_file, list(component_list))
def read_component(res_set, data):
if isinstance(data, dict):
for key in data:
if key == "component":
res_set.add(data[key])
else:
read_component(res_set , data[key])
elif isinstance(data, list):
for item in data:
read_component(res_set , item)