import argparse
import copy
import hashlib
import inspect
import os
import re
import shutil
import subprocess
import sys
import time
import traceback
import xml.etree.ElementTree as ET
THIS_FILE_NAME = __file__
CURRENT_DIR = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
TOP_DIR = os.path.join(CURRENT_DIR, '../../')
TARGET_ENV = '${TARGET_ENV}'
OUTPUT = '${OUTPUT}'
SUCC = 0
FAIL = -1
LOG_E = "ERROR"
LOG_W = "WARNING"
LOG_I = "INFO"
def log_print_element(log_element):
print("[" + log_element + "]", end=' ')
return
def log_msg(log_level, log_msg, *log_paras):
log_timestamp = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
line_no = inspect.currentframe().f_back.f_lineno
log_print_element(log_timestamp)
log_print_element(log_level)
log_print_element(THIS_FILE_NAME)
log_print_element(str(line_no))
print(log_msg % log_paras)
return
class Xmlparser(object):
"""
功能描述: 解析xml配置文件。
"""
def __init__(self, xml_file, delivery_dir, os_arch):
self.xml_file = xml_file
self.delivery_dir = delivery_dir
self.default_config = {}
self.target_env="{0}-{1}".format(os_arch, "linux")
self._cache_dir_info = {}
self._dir_install_list = []
self._expand_content_list = []
self._package_content_list = []
@property
def dir_install_list(self):
return self._dir_install_list
@property
def expand_content_list(self):
return self._expand_content_list
@property
def package_content_list(self):
return self._package_content_list
def parse(self):
try:
tree = ET.parse(self.xml_file)
root = tree.getroot()
self.default_config = list(root.iter('config'))[0].attrib
self._parse_filelist_info(root)
except Exception as e:
log_msg(LOG_E, "xmlparse %s failed: %s!", self.xml_file, e)
log_msg(LOG_I, "%s", traceback.format_exc())
return FAIL
return SUCC
def _parse_filelist_info(self, xmlobj):
self._parse_dir_infos(xmlobj.findall('dir_info'))
self._parse_file_infos(xmlobj.findall('file_info'))
def _parse_dir_infos(self, dir_infos):
for item in dir_infos:
dir_config = self.default_config.copy()
dir_config.update(item.attrib)
dir_config['module'] = dir_config['value']
for sub_item in list(item.iter())[1:]:
dir_info = dir_config.copy()
dir_info.update(sub_item.attrib)
dir_info = self.pre_parse(dir_info)
self.add_dir_info(dir_info)
def _parse_file_infos(self, file_infos):
for item in file_infos:
file_config = self.default_config.copy()
file_config.update(item.attrib)
file_config['module'] = file_config['value']
for sub_item in list(item.iter())[1:]:
file_info = file_config.copy()
file_info.update(sub_item.attrib)
file_info = self.pre_parse(file_info)
configurable = file_info.get('configurable', 'FALSE').upper()
file_info['configurable'] = configurable
src_target = self.get_src_target(file_info=file_info)
if os.path.isdir(src_target):
expand_file_info_list, expand_dir_info_list = self.expand_dir(file_info)
self._expand_content_list.extend(expand_file_info_list)
self.add_dir_info_list(expand_dir_info_list)
else:
if configurable == 'TRUE':
hash_value = self.make_hash(file_info)
file_info['hash'] = hash_value
self._package_content_list.append(file_info)
def get_src_target(self, file_info):
"""
获取文件的实际路径
:param file_info:
:return:src_target
"""
copy_type = file_info.get('copy_type', None)
if copy_type == 'delivery':
src_target = os.path.join(
self.delivery_dir, file_info.get(
'src_path'), file_info.get('value')
)
elif copy_type == 'source':
src_target = os.path.join(
TOP_DIR, file_info.get('src_path'), file_info.get('value')
)
else:
raise Exception("error copy_type=%s" % (copy_type))
return src_target
def expand_dir(self, file_info):
"""
如果file_info中配置的路径是文件夹,需要展开到文件
:param file_info:
:return:
"""
file_info_list = []
dir_info_list = []
src_target = self.get_src_target(file_info=file_info)
if not os.path.isdir(src_target):
file_info_list.append(file_info)
return file_info_list, dir_info_list
value_list = file_info.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info['value']
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name
)
dir_info_copy['install_softlink'] = 'NA'
dir_info_list.append(dir_info_copy)
for root, dirs, files in os.walk(src_target):
if ".git" in dirs:
dirs.remove(".git")
dirs.sort()
files.sort()
for name in dirs:
dirname = os.path.join(root, name)
if os.path.islink(dirname):
relative_filename = os.path.relpath(dirname, src_target)
relative_dir_name = os.path.split(relative_filename)[0]
copy_file_info = file_info.copy()
copy_file_info['value'] = name
copy_file_info['src_path'] = os.path.join(
file_info['src_path'], file_info['value'], relative_dir_name
)
copy_file_info['dst_path'] = os.path.join(
file_info['dst_path'], target_name, relative_dir_name
)
copy_file_info['install_path'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dir_name
)
copy_file_info['install_softlink'] = 'NA'
file_info_list.append(copy_file_info)
continue
relative_dirname = os.path.relpath(dirname, src_target)
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info['value']
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dirname
)
dir_info_copy['install_softlink'] = 'NA'
dir_info_list.append(dir_info_copy)
for name in files:
filename = os.path.join(root, name)
relative_filename = os.path.relpath(filename, src_target)
relative_dir_name = os.path.split(relative_filename)[0]
copy_file_info = file_info.copy()
copy_file_info['value'] = name
copy_file_info['src_path'] = os.path.join(
file_info['src_path'], file_info['value'], relative_dir_name
)
copy_file_info['dst_path'] = os.path.join(
file_info['dst_path'], target_name, relative_dir_name
)
copy_file_info['install_path'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dir_name
)
file_info_list.append(copy_file_info)
return file_info_list, dir_info_list
def make_hash(self, file_info):
"""
计算文件的hash(sha256)值
:param file_info:
:return: hash_value
"""
src_target = self.get_src_target(file_info=file_info)
sha256_hash = hashlib.sha256()
with open(src_target, "rb") as f:
sha256_hash.update(f.read())
value = sha256_hash.hexdigest()
return value
def add_dir_info(self, dir_info):
"""
往dir_install_list变量添加dir_info, 这里封装主要是为了避免重复添加
:param dir_info:
:return:
"""
if dir_info['value'] in self._cache_dir_info:
return False
else:
self._cache_dir_info[dir_info['value']] = dir_info
self._dir_install_list.append(dir_info)
return True
def add_dir_info_list(self, dir_info_list):
for dir_info in dir_info_list:
self.add_dir_info(dir_info)
def pre_parse(self, item_attr):
"""
xml配置属性解析预处理
转换配置属性中的$(TARGET_ENV)变量
"""
if item_attr.get("value", None):
item_attr["value"] = item_attr["value"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("install_path", None):
item_attr["install_path"] = item_attr["install_path"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("install_softlink", None):
item_attr["install_softlink"] = item_attr["install_softlink"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("src_path", None):
item_attr["src_path"] = item_attr["src_path"].replace(
OUTPUT, self.delivery_dir)
return item_attr
def chmod_recursive(path, mode):
'''
功能描述: 递归设置文件和目录权限(替代 chmod -R)
参数: path - 目标路径, mode - 权限值(八进制整数)
'''
if os.path.isfile(path) or os.path.islink(path):
os.chmod(path, mode)
return
os.chmod(path, mode)
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), mode)
for f in files:
os.chmod(os.path.join(root, f), mode)
def do_copy(target_conf={}, delivery_dir='', release_dir=''):
'''
功能描述: 根据拷贝类型来执行文件或目录拷贝
参数: target_conf, delivery_dir, release_dir
target_conf:{'value': ,'copy_type': ,'src_path': ,'dst_path': }
返回值: SUCC/FAIL
'''
copy_type = target_conf.get('copy_type')
if copy_type == 'delivery':
src_target = os.path.join(delivery_dir, target_conf.get(
'src_path'), target_conf.get('value'))
elif copy_type == 'source':
src_target = os.path.join(TOP_DIR, target_conf.get(
'src_path'), target_conf.get('value'))
else:
log_msg(LOG_E, "copy_type %s is not support for src_target %s!", copy_type, src_target)
return FAIL
value_list = target_conf.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
dst_path = os.path.join(release_dir, target_conf.get('dst_path', ''))
pkg_mod = target_conf.get('pkg_mod', '')
rename = target_conf.get('rename')
try:
os.makedirs(dst_path, exist_ok=True)
if rename:
dst_path = os.path.join(dst_path, rename)
if os.path.isdir(src_target):
final_dst = os.path.join(dst_path, target_name) if not rename else dst_path
if os.path.exists(final_dst):
shutil.rmtree(final_dst)
shutil.copytree(src_target, final_dst, symlinks=True)
else:
shutil.copy2(src_target, dst_path)
except Exception as e:
log_msg(LOG_E, "do_copy failed! src=%s dst=%s error=%s", src_target, dst_path, str(e))
return FAIL
if pkg_mod:
try:
mode = int(pkg_mod, 8)
chmod_recursive(os.path.join(dst_path, target_name), mode)
except Exception as e:
log_msg(LOG_E, "chmod %s failed: %s", pkg_mod, str(e))
return FAIL
pkg_softlink = target_conf.get('pkg_softlink')
if pkg_softlink:
source = os.path.join(dst_path, target_conf.get('value'))
link_target = os.path.join(release_dir, pkg_softlink)
return creat_softlink(source, link_target)
return SUCC
def creat_softlink(source, target):
'''
功能描述: 创建软连接
参数: source, target
返回值: SUCC/FAIL
'''
source = os.path.abspath(source.strip())
target = os.path.abspath(target.strip())
link_target_path = os.path.dirname(target)
link_target_name = os.path.basename(target)
relative_path = os.path.relpath(source, link_target_path)
if os.path.isfile(target):
cmd = 'rm -f {}'.format(target)
status, output = subprocess.getstatusoutput(cmd)
if status != SUCC:
log_msg(LOG_E, "rm -f %s failed, %s" , target, output)
return FAIL
if os.path.isdir(target):
log_msg(LOG_E, "%s is directory, can't add softlink", target)
return FAIL
if not os.path.exists(link_target_path):
os.mkdir(link_target_path)
tmp_dir = os.getcwd()
os.chdir(link_target_path)
os.symlink(relative_path, link_target_name)
os.chdir(tmp_dir)
return SUCC
def parse_install_info(target_config, operate_type):
"""
功能描述: 根据配置解析生成安装信息
参数: target_config, operate_type
返回值: install_info
"""
install_info_list = []
install_info_list.append(target_config.get('module', 'NA'))
install_info_list.append(operate_type)
value_list = target_config.get('value').split('/')
target_rename = target_config.get('rename')
if target_rename:
target_name = target_rename
else:
target_name = value_list[-1] if value_list[-1] else value_list[-2]
if operate_type == 'copy':
install_info_list.append(
os.path.join(target_config.get('dst_path'), target_name)
)
install_info_list.append(
os.path.join(target_config.get('install_path', ''), target_name)
)
elif operate_type == 'mkdir':
install_info_list.append('NA')
install_info_list.append(target_config.get('value'))
elif operate_type == 'del':
install_info_list.append('NA')
install_info_list.append(
os.path.join(target_config.get('install_path', ''), target_name)
)
else:
raise Exception("error operate_type=%s" % operate_type)
install_info_list.append(target_config.get('install_mod', 'NA'))
install_info_list.append(target_config.get('install_own', 'NA').replace('$', '\\\\$'))
install_info_list.append(target_config.get('install_softlink', 'NA'))
install_info_list.append(target_config.get('configurable', 'FALSE'))
install_info_list.append(target_config.get('hash', 'NA'))
install_info = ','.join(install_info_list)
return install_info
def generate_filelist(file_content_list, delivery_path):
"""
功能描述: 组装生成安装文件filelist.csv
参数: file_content_list, delivery_path
返回值: SUCC/FAIL
"""
content_list = [
"module,operation,relative_path_in_pkg,relative_install_path,\
permission,owner:group,softlink,configurable,hash"
]
content_list.extend(file_content_list)
file_str = '\n'.join(content_list)
filelist_path = os.path.join(delivery_path, 'filelist.csv')
try:
with open(filelist_path, 'w') as file:
file.write(file_str)
except Exception as e:
log_msg(LOG_E, "generate filelist.csv failed: %s!", e)
return FAIL
parser_script = os.path.join(
TOP_DIR, 'package/script/parser_install.sh')
shutil.copy(parser_script, delivery_path)
return SUCC
def main(args=None):
"""
功能描述: 执行打包流程(解析配置--->生成文件列表--->执行拷贝/打包动作)
参数: args
返回值: SUCC/FAIL
"""
delivery_dir = args.delivery_path
if not os.path.exists(delivery_dir):
os.makedirs(delivery_dir)
if args.xml_file:
pkg_xml_file = args.xml_file
else:
log_msg(LOG_E, "Input xml_file is None, please check paramters!")
return FAIL
xmlparser = Xmlparser(pkg_xml_file, delivery_dir, args.os_arch)
if xmlparser.parse():
sys.exit(FAIL)
file_install_list = []
for item in xmlparser.dir_install_list:
file_install_list.append(parse_install_info(item, 'mkdir'))
for item in xmlparser.package_content_list:
file_install_list.append(parse_install_info(item, 'copy'))
for item in xmlparser.expand_content_list:
file_install_list.append(parse_install_info(item, 'del'))
if generate_filelist(file_install_list, delivery_dir):
return FAIL
release_dir = os.path.join(
delivery_dir, xmlparser.default_config.get('name', ''))
status = SUCC
for item in xmlparser.package_content_list:
if do_copy(item, delivery_dir, release_dir):
status = FAIL
continue
return status
def args_prase():
"""
功能描述 : 脚本入参解析
参数 : 调用脚本的传参
返回值 : 解析后的参数值
"""
parser = argparse.ArgumentParser(description='This script is for spiltpackage repack processing.')
parser.add_argument('-x', '--xml', metavar='xml_file', required=True, dest='xml_file', nargs='?', const='',
default='', help="This parameter define xml file")
parser.add_argument('--delivery_path', metavar='delivery_path', required=True, dest='delivery_path', nargs='?', const='',
help="This parameter define the delivery path for all module." )
parser.add_argument('-o', '--os_arch', metavar='os_arch', required=False, dest='os_arch', nargs='?', const='',
default=None, help="This parameter define the package's os_arch")
return parser.parse_args()
if __name__ == "__main__":
os.chdir(CURRENT_DIR)
log_msg(LOG_I, "%s", " ".join(sys.argv))
args = args_prase()
try:
status = main(args)
except Exception as e:
log_msg(LOG_E, "exception is occurred (%s)!", e)
log_msg(LOG_I, "%s", traceback.format_exc())
status = FAIL
sys.exit(status)