import os
import subprocess
import sys
from threading import Thread, Lock
from common import FileOperate as f
from common import log_error, log_warning
from common.cmd_run import check_command, run_linux_cmd
from common.task_common import out_progress_bar, str_to_hex, is_hexadecimal
from common.const import ADDR_LEN_HEX
class ParseData:
def __init__(self):
self.maps = {}
self.sig = 0
self.pid = -1
self.tgid = -1
self.comm = ""
class ParseCoreTrace:
missing_binary = set()
lock = Lock()
def __init__(self, symbol, file=None):
self.file = file
self.symbol_path = symbol
self.__addr2line = "addr2line"
self.warned = False
def check_tool_exists(self):
if not check_command(self.__addr2line):
log_error("The addr2line tool does not exist, install it before using it.")
return False
return True
def warn_missing(self, binary_path):
with self.lock:
if binary_path not in self.missing_binary:
log_warning(f"{os.path.realpath(binary_path)} is not exists.")
self.missing_binary.add(binary_path)
def get_binary_path(self, bin_name_path):
bin_name_str = bin_name_path.split("/")[-1]
if self.symbol_path:
for path in self.symbol_path:
binary_path = os.path.join(path, bin_name_str)
if os.path.exists(binary_path):
return binary_path
self.warn_missing(binary_path)
elif not os.path.exists(bin_name_path):
self.warn_missing(bin_name_path)
return bin_name_path
def run_addr2line(self, cmd):
ret = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8')
return ret.stdout.readlines()
def parse_addr_src_line(self, fp, data, shift):
parsed_line = ''
for bin_name, addrs in data.maps.items():
if bin_name == "":
continue
bin_path = self.get_binary_path(bin_name)
start_addr, end_addr = addrs
if start_addr < fp < end_addr:
delta = hex(fp - start_addr - shift)
cmd = [self.__addr2line, "-Cifps", "-e", bin_path, "-a", delta]
try:
out = self.run_addr2line(cmd)
except (OSError, ValueError) as e:
if not self.warned:
log_warning(f"Run \"{' '.join(cmd)}\" failed, error detail: {e}")
self.warned = True
out = []
if len(out) == 0:
parsed_line += "0x%x %s %s" % (fp, delta, bin_name) + '\n'
else:
for func in out:
parsed_line += "0x%x %s %s" % (fp, func.strip(), bin_name.strip()) + '\n'
return parsed_line
return parsed_line
def parse_line(self, line, parse_data):
line_parts = line.split()
this_line = line.strip() + '\n'
try:
if line_parts[0] == "Signal":
parse_data.sig = int(line_parts[1])
return this_line
elif line_parts[0] == "PID":
parse_data.pid = int(line_parts[1])
parse_data.tgid = int(line_parts[3])
parse_data.comm = line_parts[5]
return '\n' + this_line
elif line_parts[0].startswith("#"):
shift = 0 if line_parts[0] == "#0" else 4
fp = int(line_parts[1].strip('\x00'), base=16)
return self.parse_addr_src_line(fp, parse_data, shift)
elif line_parts[0] == "[<0>]" or "(deleted)" in line:
return this_line
elif "uburma" in line or "davinci_manager" in line:
return ''
else:
start_addr, end_addr = map(lambda x: int(x, base=16), line_parts[0].split("-"))
bin_name = line_parts[1].strip('\x00')
if bin_name in parse_data.maps:
start_addr = min(parse_data.maps[bin_name][0], start_addr)
end_addr = max(parse_data.maps[bin_name][1], end_addr)
parse_data.maps[bin_name] = [start_addr, end_addr]
return ''
except (IndexError, ValueError):
return this_line
def parse_file(self, file_lines, count):
if self.file:
count = len(file_lines)
parse_data = ParseData()
parsed_lines = ''
for index, line in enumerate(file_lines):
if self.file:
out_progress_bar(count, index)
parsed_lines += self.parse_line(line, parse_data)
return parsed_lines
def start_parse_file(self, coretrace_file, count=0):
"""Parsing a single file"""
coretrace_file_name = coretrace_file.split(os.sep)[-1]
if not coretrace_file_name.startswith("coretrace"):
log_error(f"The {coretrace_file} file is not in coretrace format.")
return False
if not self.check_tool_exists():
return False
with open(coretrace_file, "r") as fp:
file_lines = fp.readlines()
if not file_lines:
log_error(f"The {coretrace_file_name} file is empty.")
return False
try:
parsed_lines = self.parse_file(file_lines, count)
except Exception as e:
log_error(f"Parse {coretrace_file} failed, error detail: {e}")
return False
with open(coretrace_file, 'w') as fw:
fw.writelines(parsed_lines)
return True
def save_file_result(self, coretrace_file, count, num, results):
ret = self.start_parse_file(coretrace_file, count)
out_progress_bar(count, num)
if not ret:
log_error(f'Failed to analyze the "{coretrace_file}" file.')
results.append(ret)
def run(self, coretrace_path, count=0):
coretrace_dirs = f.walk_dir(coretrace_path)
if not coretrace_dirs or not self.check_tool_exists:
return False
num = 0
threads = []
results = []
for dirs, _, files in coretrace_dirs:
for file in files:
coretrace_file = os.path.join(dirs, file)
num += 1
t = Thread(target=self.save_file_result, args=(coretrace_file, count, num, results), daemon=True)
t.start()
threads.append(t)
for t in threads:
t.join()
out_progress_bar(count, count)
return any(results)