import argparse
import copy
import hashlib
import inspect
import os
import re
import shutil
import subprocess
import sys
import time
import traceback
import xml.etree.ElementTree as ET
from pathlib import Path
import fnmatch
THIS_FILE_NAME = __file__
CURRENT_DIR = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
TOP_DIR = os.path.join(CURRENT_DIR, '../../')
TARGET_ENV = '${TARGET_ENV}'
OUTPUT = '${OUTPUT}'
SUCC = 0
FAIL = -1
LOG_E = "ERROR"
LOG_W = "WARNING"
LOG_I = "INFO"
def log_print_element(log_element):
print("[" + log_element + "]", end=' ')
return
def log_msg(log_level, log_msg, *log_paras):
log_timestamp = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
line_no = inspect.currentframe().f_back.f_lineno
log_print_element(log_timestamp)
log_print_element(log_level)
log_print_element(THIS_FILE_NAME)
log_print_element(str(line_no))
print(log_msg % log_paras)
return
class Xmlparser(object):
"""
功能描述: 解析xml配置文件。
"""
def __init__(self, xml_file, delivery_dir, os_arch):
self.xml_file = xml_file
self.delivery_dir = delivery_dir
self.default_config = {}
self.target_env="{0}-{1}".format(os_arch, "linux")
self._cache_dir_info = {}
self._dir_install_list = []
self._expand_content_list = []
self._package_content_list = []
@property
def dir_install_list(self):
return self._dir_install_list
@property
def expand_content_list(self):
return self._expand_content_list
@property
def package_content_list(self):
return self._package_content_list
def parse(self):
try:
tree = ET.parse(self.xml_file)
root = tree.getroot()
self.default_config = list(root.iter('config'))[0].attrib
self._parse_filelist_info(root)
except Exception as e:
log_msg(LOG_E, "xmlparse %s failed: %s!", self.xml_file, e)
log_msg(LOG_I, "%s", traceback.format_exc())
return FAIL
return SUCC
def _parse_filelist_info(self, xmlobj):
self._parse_dir_infos(xmlobj.findall('dir_info'))
self._parse_file_infos(xmlobj.findall('file_info'))
def _parse_dir_infos(self, dir_infos):
for item in dir_infos:
dir_config = self.default_config.copy()
dir_config.update(item.attrib)
dir_config['module'] = dir_config['value']
for sub_item in list(item.iter())[1:]:
dir_info = dir_config.copy()
dir_info.update(sub_item.attrib)
dir_info = self.pre_parse(dir_info)
self.add_dir_info(dir_info)
def _parse_file_infos(self, file_infos):
for item in file_infos:
file_config = self.default_config.copy()
file_config.update(item.attrib)
file_config['module'] = file_config['value']
for sub_item in list(item.iter())[1:]:
file_info = file_config.copy()
file_info.update(sub_item.attrib)
file_info = self.pre_parse(file_info)
configurable = file_info.get('configurable', 'FALSE').upper()
file_info['configurable'] = configurable
src_target = self.get_src_target(file_info=file_info)
if os.path.isdir(src_target):
expand_file_info_list, expand_dir_info_list = self.expand_dir(file_info)
self._expand_content_list.extend(expand_file_info_list)
self.add_dir_info_list(expand_dir_info_list)
else:
if configurable == 'TRUE':
hash_value = self.make_hash(file_info)
file_info['hash'] = hash_value
self._package_content_list.append(file_info)
def get_src_target(self, file_info):
"""
获取文件的实际路径
:param file_info:
:return:src_target
"""
copy_type = file_info.get('copy_type', None)
if copy_type == 'delivery':
src_target = os.path.join(
self.delivery_dir, file_info.get(
'src_path'), file_info.get('value')
)
elif copy_type == 'source':
src_target = os.path.join(
TOP_DIR, file_info.get('src_path'), file_info.get('value')
)
else:
raise Exception("error copy_type=%s" % (copy_type))
return src_target
def expand_dir(self, file_info):
"""
如果file_info中配置的路径是文件夹,需要展开到文件
:param file_info:
:return:
"""
file_info_list = []
dir_info_list = []
src_target = self.get_src_target(file_info=file_info)
if not os.path.isdir(src_target):
file_info_list.append(file_info)
return file_info_list, dir_info_list
value_list = file_info.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info['value']
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name
)
dir_info_copy['install_softlink'] = 'NA'
dir_info_list.append(dir_info_copy)
for root, dirs, files in os.walk(src_target):
if ".git" in dirs:
dirs.remove(".git")
dirs.sort()
files.sort()
for name in dirs:
dirname = os.path.join(root, name)
if os.path.islink(dirname):
relative_filename = os.path.relpath(dirname, src_target)
relative_dir_name = os.path.split(relative_filename)[0]
copy_file_info = file_info.copy()
copy_file_info['value'] = name
copy_file_info['src_path'] = os.path.join(
file_info['src_path'], file_info['value'], relative_dir_name
)
copy_file_info['dst_path'] = os.path.join(
file_info['dst_path'], target_name, relative_dir_name
)
copy_file_info['install_path'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dir_name
)
copy_file_info['install_softlink'] = 'NA'
file_info_list.append(copy_file_info)
continue
relative_dirname = os.path.relpath(dirname, src_target)
dir_info_copy = file_info.copy()
dir_info_copy['module'] = file_info['value']
dir_info_copy['value'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dirname
)
dir_info_copy['install_softlink'] = 'NA'
dir_info_list.append(dir_info_copy)
for name in files:
filename = os.path.join(root, name)
relative_filename = os.path.relpath(filename, src_target)
relative_dir_name = os.path.split(relative_filename)[0]
copy_file_info = file_info.copy()
copy_file_info['value'] = name
copy_file_info['src_path'] = os.path.join(
file_info['src_path'], file_info['value'], relative_dir_name
)
copy_file_info['dst_path'] = os.path.join(
file_info['dst_path'], target_name, relative_dir_name
)
copy_file_info['install_path'] = os.path.join(
file_info.get('install_path', ''), target_name, relative_dir_name
)
file_info_list.append(copy_file_info)
return file_info_list, dir_info_list
def make_hash(self, file_info):
"""
计算文件的hash(sha256)值
:param file_info:
:return: hash_value
"""
src_target = self.get_src_target(file_info=file_info)
sha256_hash = hashlib.sha256()
with open(src_target, "rb") as f:
sha256_hash.update(f.read())
value = sha256_hash.hexdigest()
return value
def add_dir_info(self, dir_info):
"""
往dir_install_list变量添加dir_info, 这里封装主要是为了避免重复添加
:param dir_info:
:return:
"""
if dir_info['value'] in self._cache_dir_info:
return False
else:
self._cache_dir_info[dir_info['value']] = dir_info
self._dir_install_list.append(dir_info)
return True
def add_dir_info_list(self, dir_info_list):
for dir_info in dir_info_list:
self.add_dir_info(dir_info)
def pre_parse(self, item_attr):
"""
xml配置属性解析预处理
转换配置属性中的$(TARGET_ENV)变量
"""
if item_attr.get("value", None):
item_attr["value"] = item_attr["value"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("install_path", None):
item_attr["install_path"] = item_attr["install_path"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("install_softlink", None):
item_attr["install_softlink"] = item_attr["install_softlink"].replace(
TARGET_ENV, self.target_env)
if item_attr.get("src_path", None):
item_attr["src_path"] = item_attr["src_path"].replace(
OUTPUT, self.delivery_dir)
return item_attr
def copy_file_by_pattern(src_path: str, dst_path:str, mode: int) -> None:
"""
拷贝符合模式的文件到目标目录下,替代cp -rf 功能
src_path: 传入的带有匹配模式的全路径 /a/b/c/*.whl
dst_path: 目标目录文件夹
mode:权限模式
"""
current_path = Path(src_path)
pattern = current_path.name
parent_path = current_path.parent
files = os.listdir(parent_path)
for filename in fnmatch.filter(files, pattern):
src_file_path = os.path.join(parent_path, filename)
dst_file_path = os.path.join(dst_path, filename)
try:
shutil.copy(src_file_path, dst_file_path)
os.chmod(dst_file_path, mode)
except Exception as e:
log_msg(LOG_E, "Faile to copy file src %s to dst %s failed!. error is %s", src_file_path, dst_file_path, str(e))
return False
return True
def do_copy(target_conf={}, delivery_dir='', release_dir=''):
'''
功能描述: 根据拷贝类型来执行文件或目录拷贝
参数: target_conf, delivery_dir, release_dir
target_conf:{'value': ,'copy_type': ,'src_path': ,'dst_path': }
返回值: SUCC/FAIL
'''
copy_type = target_conf.get('copy_type')
if copy_type == 'delivery':
src_target = os.path.join(delivery_dir, target_conf.get(
'src_path'), target_conf.get('value'))
elif copy_type == 'source':
src_target = os.path.join(TOP_DIR, target_conf.get(
'src_path'), target_conf.get('value'))
else:
log_msg(LOG_E, "copy_type %s is not support for src_target %s!", copy_type, src_target)
return FAIL
value_list = target_conf.get('value').split('/')
target_name = value_list[-1] if value_list[-1] else value_list[-2]
dst_path = os.path.join(release_dir, target_conf.get('dst_path', ''))
pkg_mod = target_conf.get('pkg_mod', '0o750')
rename = target_conf.get('rename')
cmd = ''
if not os.path.exists(dst_path):
os.makedirs(dst_path, mode=0o750)
if rename:
dst_path = os.path.join(dst_path, rename)
if (pkg_mod.startswith(('0o','0o'))) :
pkg_mod = pkg_mod[2:]
mode = int(pkg_mod, 8)
if not copy_file_by_pattern(src_target, dst_path, mode) :
return FAIL
pkg_softlink = target_conf.get('pkg_softlink')
if pkg_softlink:
source = os.path.join(dst_path, target_conf.get('value'))
link_target = os.path.join(release_dir, pkg_softlink)
return creat_softlink(source, link_target)
return SUCC
def creat_softlink(source, target):
'''
功能描述: 创建软连接
参数: source, target
返回值: SUCC/FAIL
'''
source = os.path.abspath(source.strip())
target = os.path.abspath(target.strip())
link_target_path = os.path.dirname(target)
link_target_name = os.path.basename(target)
relative_path = os.path.relpath(source, link_target_path)
if os.path.isfile(target):
try:
os.remove(target)
except Exception as e:
log_msg(LOG_E, "Faile to delete file %s!. error is %s", target, str(e))
if os.path.isfile(target):
cmd = 'rm -f {}'.format(target)
status, output = subprocess.getstatusoutput(cmd)
if status != SUCC:
log_msg(LOG_E, "rm -f %s failed, %s" , target, output)
return FAIL
if os.path.isdir(target):
log_msg(LOG_E, "%s is directory, can't add softlink", target)
return FAIL
if not os.path.exists(link_target_path):
os.mkdir(link_target_path)
tmp_dir = os.getcwd()
os.chdir(link_target_path)
os.symlink(relative_path, link_target_name)
os.chdir(tmp_dir)
return SUCC
def parse_install_info(target_config, operate_type):
"""
功能描述: 根据配置解析生成安装信息
参数: target_config, operate_type
返回值: install_info
"""
install_info_list = []
install_info_list.append(target_config.get('module', 'NA'))
install_info_list.append(operate_type)
value_list = target_config.get('value').split('/')
target_rename = target_config.get('rename')
if target_rename:
target_name = target_rename
else:
target_name = value_list[-1] if value_list[-1] else value_list[-2]
if operate_type == 'copy':
install_info_list.append(
os.path.join(target_config.get('dst_path'), target_name)
)
install_info_list.append(
os.path.join(target_config.get('install_path', ''), target_name)
)
elif operate_type == 'mkdir':
install_info_list.append('NA')
install_info_list.append(target_config.get('value'))
elif operate_type == 'del':
install_info_list.append('NA')
install_info_list.append(
os.path.join(target_config.get('install_path', ''), target_name)
)
else:
raise Exception("error operate_type=%s" % operate_type)
install_info_list.append(target_config.get('install_mod', 'NA'))
install_info_list.append(target_config.get('install_own', 'NA').replace('$', '\\\\$'))
install_info_list.append(target_config.get('install_softlink', 'NA'))
install_info_list.append(target_config.get('configurable', 'FALSE'))
install_info_list.append(target_config.get('hash', 'NA'))
install_info = ','.join(install_info_list)
return install_info
def generate_filelist(file_content_list, delivery_path):
"""
功能描述: 组装生成安装文件filelist.csv
参数: file_content_list, delivery_path
返回值: SUCC/FAIL
"""
content_list = [
"module,operation,relative_path_in_pkg,relative_install_path,\
permission,owner:group,softlink,configurable,hash"
]
content_list.extend(file_content_list)
file_str = '\n'.join(content_list)
filelist_path = os.path.join(delivery_path, 'filelist.csv')
try:
with open(filelist_path, 'w') as file:
file.write(file_str)
except Exception as e:
log_msg(LOG_E, "generate filelist.csv failed: %s!", e)
return FAIL
parser_script = os.path.join(
TOP_DIR, 'package/script/parser_install.sh')
shutil.copy(parser_script, delivery_path)
return SUCC
def main(args=None):
"""
功能描述: 执行打包流程(解析配置--->生成文件列表--->执行拷贝/打包动作)
参数: args
返回值: SUCC/FAIL
"""
delivery_dir = args.delivery_path
if not os.path.exists(delivery_dir):
os.makedirs(delivery_dir)
if args.xml_file:
pkg_xml_file = args.xml_file
else:
log_msg(LOG_E, "Input xml_file is None, please check paramters!")
return FAIL
xmlparser = Xmlparser(pkg_xml_file, delivery_dir, args.os_arch)
if xmlparser.parse():
sys.exit(FAIL)
file_install_list = []
for item in xmlparser.dir_install_list:
file_install_list.append(parse_install_info(item, 'mkdir'))
for item in xmlparser.package_content_list:
file_install_list.append(parse_install_info(item, 'copy'))
for item in xmlparser.expand_content_list:
file_install_list.append(parse_install_info(item, 'del'))
if generate_filelist(file_install_list, delivery_dir):
return FAIL
release_dir = os.path.join(
delivery_dir, xmlparser.default_config.get('name', ''))
status = SUCC
for item in xmlparser.package_content_list:
if do_copy(item, delivery_dir, release_dir):
status = FAIL
continue
return status
def args_prase():
"""
功能描述 : 脚本入参解析
参数 : 调用脚本的传参
返回值 : 解析后的参数值
"""
parser = argparse.ArgumentParser(description='This script is for spiltpackage repack processing.')
parser.add_argument('-x', '--xml', metavar='xml_file', required=True, dest='xml_file', nargs='?', const='',
default='', help="This parameter define xml file")
parser.add_argument('--delivery_path', metavar='delivery_path', required=True, dest='delivery_path', nargs='?', const='',
help="This parameter define the delivery path for all module." )
parser.add_argument('-o', '--os_arch', metavar='os_arch', required=False, dest='os_arch', nargs='?', const='',
default=None, help="This parameter define the package's os_arch")
return parser.parse_args()
if __name__ == "__main__":
os.chdir(CURRENT_DIR)
log_msg(LOG_I, "%s", " ".join(sys.argv))
args = args_prase()
try:
status = main(args)
except Exception as e:
log_msg(LOG_E, "exception is occurred (%s)!", e)
log_msg(LOG_I, "%s", traceback.format_exc())
status = FAIL
sys.exit(status)