"""
"""
import json
import re
import argparse
import os
import shutil
import unicodedata
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
def parse_log_file(log_file_path):
all_blocks_data = []
current_block_aicpu = []
current_block_aicore = []
in_perf_trace_block = False
block_start_line = 0
with open(log_file_path, 'r', encoding='utf-8') as file:
for line_num, line in enumerate(file, 1):
line = line.strip()
if "Begin dump machine perf trace:" in line:
if in_perf_trace_block:
print(
f"Error: Nested perf trace block at line {line_num}. "
f"Block started at line {block_start_line} is not properly closed."
)
return {"error": f"Nested perf trace block at line {line_num}"}
in_perf_trace_block = True
block_start_line = line_num
current_block_aicpu = []
current_block_aicore = []
print(f"Found perf trace block start at line {line_num}")
continue
if "Finish dump machine perf trace." in line:
if not in_perf_trace_block:
print(f"Error: Finish without matching begin at line {line_num}")
return {"error": f"Finish without matching begin at line {line_num}"}
in_perf_trace_block = False
if current_block_aicpu and current_block_aicore:
block_json_str = ''.join(current_block_aicpu) + ',' + ''.join(current_block_aicore)
block_json_str = block_json_str.replace(",]", "]")
all_blocks_data.append(block_json_str)
print(f"Successfully parsed performance trace block from line {block_start_line} to {line_num}")
else:
print(f"Warning: Empty performance trace block from line {block_start_line} to {line_num}")
current_block_aicpu = []
current_block_aicore = []
continue
if in_perf_trace_block:
if "tile_fwk aicpu prof:" in line:
match = re.search(r'tile_fwk aicpu prof:(.*)', line)
if match:
content = match.group(1).strip()
if content.endswith('"'):
content = content[:-1]
current_block_aicpu.append(content)
elif "tile_fwk aicore prof:" in line:
match = re.search(r'tile_fwk aicore prof:(.*)', line)
if match:
content = match.group(1).strip()
if content.endswith('"'):
content = content[:-1]
current_block_aicore.append(content)
else:
if "tile_fwk aicpu prof:" in line or "tile_fwk aicore prof:" in line:
print(f"Warning: Ignoring prof data outside of perf trace block at line {line_num}")
if in_perf_trace_block:
print(f"Error: Unclosed perf trace block started at line {block_start_line}. Discarding incomplete block data.")
return {"error": f"Unclosed perf trace block started at line {block_start_line}"}
if all_blocks_data:
full_json_str = '[' + ','.join(all_blocks_data) + ']'
try:
parsed_data = json.loads(full_json_str)
print(f"Successfully parsed {len(all_blocks_data)} performance trace blocks")
return parsed_data
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}")
print(f"Raw JSON string: {full_json_str[:500]}...")
return {"error": str(e), "raw_blocks": all_blocks_data}
else:
print("No valid performance trace blocks found in log file")
return []
def save_json(data, output_file_path):
with open(output_file_path, 'w', encoding='utf-8') as file:
if isinstance(data, dict) and "raw_aicpu" in data:
file.write('[' + data["raw_aicpu"] + ',' + data["raw_aicore"] + ']')
else:
json.dump(data, file, indent=2, ensure_ascii=False)
def convert_to_perfetto_format(input_json: List[Dict]) -> List[Dict]:
trace_events = []
thread_id = 0
trace_events_head = {
"args": {
"name": "AICPU View",
"type": "aicpu"
},
"cat": "__metadata",
"name": "process_name",
"ph": "M",
"pid": 0
}
trace_events.append(trace_events_head)
for block in input_json:
block_idx = block.get("blockIdx", 0)
core_type = block.get("coreType", "UNKNOWN")
freq = block.get("freq", 0)
thread_name = f"{core_type}-{block_idx}"
trace_events.append({
"name": "thread_name",
"ph": "M",
"pid": 0,
"tid": thread_id,
"args": {
"name": thread_name
}
})
tasks = block.get("tasks", [])
sorted_tasks = sorted(tasks, key=lambda x: x.get("end", 0))
adjusted_tasks = []
prev_end = None
for task in sorted_tasks:
task_copy = task.copy()
current_end = task_copy.get("end", 0)
if prev_end is not None and current_end == prev_end:
task_copy["end"] = prev_end + 1
current_end = prev_end + 1
adjusted_tasks.append(task_copy)
prev_end = current_end
prev_end = None
for task in adjusted_tasks:
task_name = task.get("name", "UNKNOWN")
end_time = task.get("end", 0)
if task_name.startswith("BEGIN"):
start_time = end_time - 1
else:
start_time = prev_end
ts = start_time / freq
dur = (end_time - start_time) / freq
if dur <= 0:
dur = 1 / freq
perfetto_event = {
"name": f"{task_name}",
"cat": core_type,
"ph": "X",
"ts": ts,
"dur": dur,
"pid": 0,
"tid": thread_id,
"freq": freq
}
trace_events.append(perfetto_event)
prev_end = end_time
thread_id += 1
trace_events_pypto = {"traceEvents": trace_events}
return trace_events_pypto
def parse_log_command(input_file, output_file):
parsed_data = parse_log_file(input_file)
save_json(parsed_data, output_file)
print(f"Parsing completed, result saved to: {output_file}")
def merge_aicpu_aicore_swim_lane(aicpu_perfetto_data, input_kernel_file):
if input_kernel_file is not None and os.path.exists(input_kernel_file):
with open(input_kernel_file, 'r', encoding='utf-8') as f:
aicore_perfetto_data = json.load(f)
merged_trace_events = aicpu_perfetto_data["traceEvents"] + aicore_perfetto_data["traceEvents"]
merged_data = {
'traceEvents': merged_trace_events
}
with open(input_kernel_file, 'w', encoding='utf-8') as fw:
json.dump(merged_data, fw, indent=2)
def gen_perfetto_command(input_file, output_file, input_kernel_file=None):
try:
with open(input_file, 'r', encoding='utf-8') as f:
input_data = json.load(f)
perfetto_data = convert_to_perfetto_format(input_data)
merge_aicpu_aicore_swim_lane(perfetto_data, input_kernel_file)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(perfetto_data, f, ensure_ascii=False, indent=2)
print(f"Success to generate perfetto file: {output_file}")
complete_events = [e for e in perfetto_data["traceEvents"] if e.get('ph') == 'X']
metadata_events = [e for e in perfetto_data["traceEvents"] if e.get('ph') == 'M']
print(f"Process {len(complete_events)} task events, {len(metadata_events)} meta data events")
print("\ninfo: upload this json file to https://ui.perfetto.dev/")
except FileNotFoundError:
print(f"error: cannot find input file {input_file}")
except json.JSONDecodeError:
print(f"error: input file {input_file} is not valid json format")
except Exception as e:
print(f"process exception info: {str(e)}")
def gen_perfetto_example():
sample_data = [
{"blockIdx": 0, "coreType": "AICPU-SCHED", "freq": 50, "tasks": [
{"name": "BEGIN", "end": 5236903326282},
{"name": "ALLOC_THREAD_ID", "end": 5236903326381},
{"name": "INIT", "end": 5236903329385},
{"name": "HAND_SHAKE", "end": 5236903330212},
{"name": "WAIT_ALL_TASK_FIN", "end": 5236903331821},
{"name": "SEND_STOP", "end": 5236903332087},
{"name": "EXIT", "end": 5236903332854}
]},
{"blockIdx": 1, "coreType": "AICPU-SCHED", "freq": 50, "tasks": [
{"name": "BEGIN", "end": 5236903326282},
{"name": "ALLOC_THREAD_ID", "end": 5236903326383},
{"name": "INIT", "end": 5236903329389},
{"name": "HAND_SHAKE", "end": 5236903330219},
{"name": "WAIT_SEND_FIRST_TASK", "end": 5236903331446},
{"name": "WAIT_ALL_TASK_FIN", "end": 5236903331829},
{"name": "SEND_STOP", "end": 5236903332102},
{"name": "EXIT", "end": 5236903333765}
]}
]
result = convert_to_perfetto_format(sample_data)
with open('perfetto_output.json', 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print("example have saved to perfetto_output.json")
print("You can check it by upload this file to https://ui.perfetto.dev/")
def load_json(file_path: Path) -> Any:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
def safe_div(a: float, b: float) -> float:
return a / b if b else 0.0
def to_us(cycles: float, freq: float) -> float:
return safe_div(cycles, freq)
def display_width(text: str) -> int:
width = 0
for ch in text:
if unicodedata.combining(ch):
continue
width += 2 if unicodedata.east_asian_width(ch) in ("F", "W") else 1
return width
def pad_cell(text: str, width: int) -> str:
pad = max(width - display_width(text), 0)
return text + (" " * pad)
def render_table_lines(headers: List[str], rows: List[List[str]]) -> List[str]:
line_rows: List[List[str]] = [[str(x) for x in headers]]
line_rows.extend([[str(x) for x in row] for row in rows])
widths = [display_width(h) for h in headers]
for row in line_rows:
for i, cell in enumerate(row):
widths[i] = max(widths[i], display_width(cell))
def fmt_row(row: List[str]) -> str:
return "| " + " | ".join(pad_cell(cell, widths[i]) for i, cell in enumerate(row)) + " |"
def fmt_border() -> str:
return "| " + " | ".join("-" * widths[i] for i in range(len(widths))) + " |"
lines = [fmt_border(), fmt_row(line_rows[0]), fmt_border()]
for row in line_rows[1:]:
lines.append(fmt_row(row))
lines.append(fmt_border())
return lines
def print_table(headers: List[str], rows: List[List[str]]) -> None:
for line in render_table_lines(headers, rows):
print(line)
def print_section(title: str, total_width: Optional[int] = None) -> None:
if total_width is None:
term_cols = shutil.get_terminal_size(fallback=(100, 20)).columns
total_width = max(60, term_cols - 2)
else:
total_width = max(total_width, 20)
text = f" {title} "
text_width = display_width(text)
if text_width >= total_width:
print(f"\n{text}")
return
left = (total_width - text_width) // 2
right = total_width - text_width - left
print("\n" + ("=" * left) + text + ("=" * right))
def parse_task_name(name: str) -> Tuple[str, Optional[int], Optional[int]]:
m = re.match(r"^([A-Z0-9_]+?)(?:_(\d+))?(?:\((\d+)\))?$", str(name))
if not m:
return str(name), None, None
base = m.group(1)
round_id = int(m.group(2)) if m.group(2) is not None else None
idx = int(m.group(3)) if m.group(3) is not None else None
return base, round_id, idx
def collect_round_ids(aicpu_dev_pref: List[Dict[str, Any]]) -> List[Optional[int]]:
round_ids = set()
for core in aicpu_dev_pref:
for task in core.get("tasks", []):
_, round_id, _ = parse_task_name(task.get("name", ""))
if round_id is not None:
round_ids.add(round_id)
if not round_ids:
return [None]
return sorted(round_ids)
def get_task_cycle(
tasks: List[Dict[str, Any]],
task_name: str,
idx: Optional[int] = None,
round_id: Optional[int] = None,
sorted_tasks: Optional[List[Dict[str, Any]]] = None,
) -> Optional[float]:
tasks_by_end = sorted_tasks if sorted_tasks is not None else sort_tasks_by_end(tasks)
for task in tasks_by_end:
base, task_round, num = parse_task_name(task.get("name", ""))
if base != task_name:
continue
if round_id is not None and task_round != round_id:
continue
if idx is not None and num != idx:
continue
return float(task.get("end", 0))
return None
def get_task_cycle_map(
tasks: List[Dict[str, Any]],
task_name: str,
round_id: Optional[int] = None,
sorted_tasks: Optional[List[Dict[str, Any]]] = None,
) -> Dict[int, float]:
cycle_map: Dict[int, float] = {}
tasks_by_end = sorted_tasks if sorted_tasks is not None else sort_tasks_by_end(tasks)
for task in tasks_by_end:
base, task_round, num = parse_task_name(task.get("name", ""))
if base != task_name:
continue
if round_id is not None and task_round != round_id:
continue
if num is None:
continue
cycle_map[num] = float(task.get("end", 0))
return cycle_map
def calc_duration_from_ends(start_end: Optional[float], end_end: Optional[float]) -> Optional[float]:
if start_end is None or end_end is None:
return None
return end_end - start_end
def sort_tasks_by_end(tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return sorted(tasks, key=lambda x: float(x.get("end", 0)))
def normalize_cores_tasks_by_end(aicpu_dev_pref: List[Dict[str, Any]]) -> None:
for core in aicpu_dev_pref:
tasks = core.get("tasks", [])
if isinstance(tasks, list):
core["tasks"] = sort_tasks_by_end(tasks)
def get_event_duration(
tasks: List[Dict[str, Any]],
event_name: str,
round_id: Optional[int] = None,
idx: Optional[int] = None,
) -> Optional[float]:
prev_end: Optional[float] = None
for task in sort_tasks_by_end(tasks):
base, task_round, task_idx = parse_task_name(task.get("name", ""))
if round_id is not None and task_round != round_id:
continue
task_end = float(task.get("end", 0))
event_dur = None if prev_end is None else (task_end - prev_end)
if base == event_name and (idx is None or task_idx == idx):
return event_dur
prev_end = task_end
return None
def calc_event_duration_sum(
tasks: List[Dict[str, Any]],
event_name: str,
round_id: Optional[int] = None,
) -> Optional[float]:
total = 0.0
found = False
prev_end: Optional[float] = None
for task in sort_tasks_by_end(tasks):
base, task_round, _ = parse_task_name(task.get("name", ""))
if round_id is not None and task_round != round_id:
continue
task_end = float(task.get("end", 0))
event_dur = None if prev_end is None else (task_end - prev_end)
if base == event_name and event_dur is not None:
found = True
total += max(event_dur, 0.0)
prev_end = task_end
return total if found else None
def calc_sched_post_process_sum(
tasks: List[Dict[str, Any]],
round_id: Optional[int],
) -> Optional[float]:
post_events = [
("DEV_TASK_SYNC_CORE_STOP", 0),
("DEV_TASK_RSP", 0),
("WAIT_ALL_DEV_TASK_FINISH", None),
("WAIT_CORE_EXIT", None),
("EXIT", None),
]
total = 0.0
found = False
for event_name, idx in post_events:
dur = get_event_duration(tasks, event_name, round_id, idx)
if dur is None:
continue
found = True
total += max(dur, 0.0)
return total if found else None
def calc_total_runtime_sum(
tasks: List[Dict[str, Any]],
round_id: Optional[int] = None,
) -> Optional[float]:
prev_end: Optional[float] = None
total = 0.0
found = False
for task in sort_tasks_by_end(tasks):
_, task_round, _ = parse_task_name(task.get("name", ""))
if round_id is not None and task_round != round_id:
continue
task_end = float(task.get("end", 0))
if prev_end is not None:
total += max(task_end - prev_end, 0.0)
prev_end = task_end
found = True
return total if found else None
def calc_runtime_in_window(
tasks: List[Dict[str, Any]],
round_id: Optional[int],
start_event: str,
end_events: List[str],
) -> Optional[float]:
sorted_tasks = sort_tasks_by_end(tasks)
start_cycle = get_task_cycle(tasks, start_event, None, round_id, sorted_tasks)
if start_cycle is None:
return None
end_candidates: List[float] = []
for end_event in end_events:
for task in sorted_tasks:
base, task_round, _ = parse_task_name(task.get("name", ""))
if base != end_event:
continue
if round_id is not None and task_round != round_id:
continue
task_end = float(task.get("end", 0))
if task_end >= float(start_cycle):
end_candidates.append(task_end)
break
if not end_candidates:
return None
return max(max(end_candidates) - float(start_cycle), 0.0)
def get_end_bounds(
tasks: List[Dict[str, Any]],
round_id: Optional[int] = None,
sorted_tasks: Optional[List[Dict[str, Any]]] = None,
) -> Tuple[Optional[float], Optional[float]]:
tasks_by_end = sorted_tasks if sorted_tasks is not None else sort_tasks_by_end(tasks)
min_end: Optional[float] = None
max_end: Optional[float] = None
for task in tasks_by_end:
_, task_round, _ = parse_task_name(task.get("name", ""))
if round_id is not None and task_round != round_id:
continue
task_end = float(task.get("end", 0))
if min_end is None:
min_end = task_end
max_end = task_end
return min_end, max_end
def format_us(v: Optional[float], freq: float) -> str:
if v is None:
return "-"
return f"{to_us(v, freq):.2f}"
def calc_avg_aicore_exit_wait_us(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> Optional[float]:
aicore_exec_rows = collect_aicore_exec_rows(aicpu_dev_pref, round_id)
wait_us_values: List[float] = []
for row in aicore_exec_rows:
exit_wait = row.get("exit_wait")
freq = float(row.get("freq", 0)) or 1.0
if exit_wait is not None and exit_wait > 0:
wait_us_values.append(to_us(float(exit_wait), freq))
if not wait_us_values:
return None
return sum(wait_us_values) / len(wait_us_values)
def calc_aicore_init_preprocess_us(aicore_exec_rows: List[Dict[str, Any]]) -> Optional[float]:
min_end_values = [float(row["min_end"]) for row in aicore_exec_rows if row.get("min_end") is not None]
wait_first_values = [
float(row["wait_first_cycle"]) for row in aicore_exec_rows if row.get("wait_first_cycle") is not None
]
if not min_end_values or not wait_first_values:
return None
freq = float(aicore_exec_rows[0].get("freq", 1.0)) or 1.0
return to_us(max(min(wait_first_values) - min(min_end_values), 0.0), freq)
def calc_aicore_last_core_exit_wait_us(
aicore_exec_rows: List[Dict[str, Any]],
aicore_core_bounds: List[Dict[str, Any]],
) -> Optional[float]:
all_exec_values = [
float(row["all_exec_cycle"]) for row in aicore_exec_rows if row.get("all_exec_cycle") is not None
]
max_end_values = [float(row["max_end"]) for row in aicore_core_bounds if row.get("max_end") is not None]
if not all_exec_values or not max_end_values:
return None
freq = float(aicore_core_bounds[0].get("freq", 1.0)) or 1.0
return to_us(max(max(max_end_values) - max(all_exec_values), 0.0), freq)
def format_sched_post_process(post_dur_cycles: Optional[float], sched_freq: float) -> str:
if post_dur_cycles is None:
return "-"
return f"{to_us(post_dur_cycles, sched_freq):.2f}"
def is_aicore_sched(core_type: str) -> bool:
return core_type.startswith("SCHED") and ("-AIC" in core_type or "-AIV" in core_type)
def collect_aicore_core_bounds(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for core in aicpu_dev_pref:
core_type = str(core.get("coreType", ""))
if not is_aicore_sched(core_type):
continue
sorted_tasks = sort_tasks_by_end(core.get("tasks", []))
min_end, max_end = get_end_bounds(sorted_tasks, round_id, sorted_tasks)
if min_end is None or max_end is None:
continue
rows.append(
{
"core_type": core_type,
"block_idx": int(core.get("blockIdx", -1)),
"freq": float(core.get("freq", 0)) or 1.0,
"min_end": min_end,
"max_end": max_end,
}
)
rows.sort(key=lambda x: x["block_idx"])
return rows
def collect_aicore_exec_rows(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for core in aicpu_dev_pref:
core_type = str(core.get("coreType", ""))
if not is_aicore_sched(core_type):
continue
tasks = core.get("tasks", [])
sorted_tasks = sort_tasks_by_end(tasks)
wait_first_map = get_task_cycle_map(tasks, "DEV_TASK_WAIT_RCV_FIRST_LEAF_TASK", round_id, sorted_tasks)
all_exec_map = get_task_cycle_map(tasks, "DEV_TASK_ALL_LEAF_TASK_EXEC", round_id, sorted_tasks)
wait_exit_notify = get_task_cycle(tasks, "WAIT_EXIT_NOTIFY", None, round_id, sorted_tasks)
if not all_exec_map:
continue
wait_first_cycle = min(wait_first_map.values()) if wait_first_map else None
all_exec_cycle = max(all_exec_map.values()) if all_exec_map else None
init_dur = get_event_duration(tasks, "INIT", round_id)
rcv_model_dur = get_event_duration(tasks, "DEV_TASK_RCV_MODEL", round_id)
first_wait_first_idx = min(wait_first_map, key=wait_first_map.get) if wait_first_map else None
wait_first_dur = get_event_duration(
tasks, "DEV_TASK_WAIT_RCV_FIRST_LEAF_TASK", round_id, first_wait_first_idx
)
exit_wait = calc_duration_from_ends(all_exec_cycle, wait_exit_notify)
min_end, max_end = get_end_bounds(tasks, round_id, sorted_tasks)
rows.append(
{
"core_type": core_type,
"block_idx": int(core.get("blockIdx", -1)),
"freq": float(core.get("freq", 0)) or 1.0,
"wait_first_map": wait_first_map,
"all_exec_map": all_exec_map,
"exit_wait": exit_wait,
"wait_first_cycle": wait_first_cycle,
"all_exec_cycle": all_exec_cycle,
"init_dur": init_dur,
"rcv_model_dur": rcv_model_dur,
"wait_first_dur": wait_first_dur,
"min_end": min_end,
"max_end": max_end,
}
)
rows.sort(key=lambda x: x["block_idx"])
return rows
def _min_optional(current: Optional[float], value: float) -> float:
if current is None:
return value
return min(current, value)
def _max_optional(current: Optional[float], value: float) -> float:
if current is None:
return value
return max(current, value)
def _accumulate_aicore_exec_timing(
aicore_exec_rows: List[Dict[str, Any]],
) -> Tuple[float, Optional[float], Optional[float], Optional[float], Optional[float]]:
freq = 1.0
min_end: Optional[float] = None
max_end: Optional[float] = None
first_wait_cycle: Optional[float] = None
last_all_exec_cycle: Optional[float] = None
for row in aicore_exec_rows:
row_min_end = row.get("min_end")
row_max_end = row.get("max_end")
if row_min_end is None or row_max_end is None:
continue
freq = float(row.get("freq", 1.0)) or 1.0
min_end = _min_optional(min_end, float(row_min_end))
max_end = _max_optional(max_end, float(row_max_end))
wait_first_cycle = row.get("wait_first_cycle")
if wait_first_cycle is not None:
first_wait_cycle = _min_optional(first_wait_cycle, float(wait_first_cycle))
all_exec_cycle = row.get("all_exec_cycle")
if all_exec_cycle is not None:
last_all_exec_cycle = _max_optional(last_all_exec_cycle, float(all_exec_cycle))
return freq, min_end, max_end, first_wait_cycle, last_all_exec_cycle
def _extend_aicore_bounds_max_end(
aicore_core_bounds: List[Dict[str, Any]],
freq: float,
max_end: Optional[float],
) -> Tuple[float, Optional[float]]:
for row in aicore_core_bounds:
row_max_end = row.get("max_end")
if row_max_end is None:
continue
freq = float(row.get("freq", 1.0)) or 1.0
max_end = _max_optional(max_end, float(row_max_end))
return freq, max_end
def _format_aicore_timing_e2e(
freq: float,
first_wait_cycle: Optional[float],
last_all_exec_cycle: Optional[float],
min_end: Optional[float],
max_end: Optional[float],
) -> Tuple[str, str]:
e2e_time = "-"
total_runtime_e2e = "-"
if first_wait_cycle is not None and last_all_exec_cycle is not None:
e2e_cycles = max(last_all_exec_cycle - first_wait_cycle, 0.0)
e2e_time = f"{to_us(e2e_cycles, freq):.2f}"
if min_end is not None and max_end is not None:
total_runtime_e2e_cycles = max(max_end - min_end, 0.0)
total_runtime_e2e = f"{to_us(total_runtime_e2e_cycles, freq):.2f}"
return e2e_time, total_runtime_e2e
def calc_aicore_timing_summary(
aicore_exec_rows: List[Dict[str, Any]],
aicore_core_bounds: List[Dict[str, Any]],
) -> Tuple[str, str]:
if not aicore_exec_rows:
return "-", "-"
freq, min_end, max_end, first_wait_cycle, last_all_exec_cycle = _accumulate_aicore_exec_timing(
aicore_exec_rows
)
freq, max_end = _extend_aicore_bounds_max_end(aicore_core_bounds, freq, max_end)
return _format_aicore_timing_e2e(
freq, first_wait_cycle, last_all_exec_cycle, min_end, max_end
)
def build_ctrl_row(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> Optional[List[str]]:
ctrl = next((x for x in aicpu_dev_pref if str(x.get("coreType")) == "AICPU-CTRL"), None)
if ctrl is None:
return None
tasks = ctrl.get("tasks", [])
freq = float(ctrl.get("freq", 0)) or 1.0
block_idx = int(ctrl.get("blockIdx", 0))
build_dur = calc_event_duration_sum(tasks, "DEV_TASK_BUILD", round_id)
ctrl_post_dur = get_event_duration(tasks, "EXIT", round_id)
ctrl_total_dur = calc_total_runtime_sum(tasks, round_id)
return [
f"AICPU-CTRL-{block_idx}",
format_us(build_dur, freq),
"-",
"-",
"-",
"-",
format_us(ctrl_post_dur, freq),
"-",
format_us(ctrl_total_dur, freq),
]
def build_sched_rows(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> List[List[str]]:
rows: List[List[str]] = []
scheds = [x for x in aicpu_dev_pref if str(x.get("coreType")) == "AICPU-SCHED"]
for s in sorted(scheds, key=lambda x: int(x.get("blockIdx", 0))):
block_idx = int(s.get("blockIdx", -1))
tasks = s.get("tasks", [])
freq = float(s.get("freq", 0)) or 1.0
alloc_dur = get_event_duration(tasks, "ALLOC_THREAD_ID", round_id)
init_dur = get_event_duration(tasks, "INIT", round_id)
handshake_dur = get_event_duration(tasks, "CORE_HAND_SHAKE", round_id)
dev_task_rcv = get_event_duration(tasks, "DEV_TASK_RCV", round_id, 0)
post_dur = calc_sched_post_process_sum(tasks, round_id)
sched_total_dur = calc_runtime_in_window(
tasks, round_id, "BEGIN", ["WAIT_CORE_EXIT", "EXIT"]
)
if sched_total_dur is None:
sched_total_dur = calc_total_runtime_sum(tasks, round_id)
rows.append(
[
f"AICPU-SCHED-{block_idx}",
"-",
format_us(alloc_dur, freq),
format_us(init_dur, freq),
format_us(handshake_dur, freq),
format_us(dev_task_rcv, freq),
format_sched_post_process(post_dur, freq),
"-",
format_us(sched_total_dur, freq),
]
)
return rows
def build_aicore_row(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> List[str]:
aicore_exec_rows = collect_aicore_exec_rows(aicpu_dev_pref, round_id)
aicore_core_bounds = collect_aicore_core_bounds(aicpu_dev_pref, round_id)
aicore_post_process_us = calc_aicore_last_core_exit_wait_us(aicore_exec_rows, aicore_core_bounds)
aicore_init_us = calc_aicore_init_preprocess_us(aicore_exec_rows)
if not aicore_exec_rows:
return ["AICore", "-", "-", "-", "-", "-", "-", "-", "-"]
e2e_time, total_runtime_e2e = calc_aicore_timing_summary(aicore_exec_rows, aicore_core_bounds)
return [
"AICore",
"-",
"-",
"-" if aicore_init_us is None else f"{aicore_init_us:.2f}",
"-",
"-",
"-" if aicore_post_process_us is None else f"{aicore_post_process_us:.2f}",
e2e_time,
total_runtime_e2e,
]
def build_round_combined_rows(aicpu_dev_pref: List[Dict[str, Any]], round_id: Optional[int]) -> List[List[str]]:
rows: List[List[str]] = []
ctrl_row = build_ctrl_row(aicpu_dev_pref, round_id)
if ctrl_row is not None:
rows.append(ctrl_row)
rows.extend(build_sched_rows(aicpu_dev_pref, round_id))
rows.append(build_aicore_row(aicpu_dev_pref, round_id))
return rows
def analyze_output_command(output_dir_arg: Optional[str]) -> None:
if output_dir_arg:
input_path = Path(output_dir_arg)
else:
print("Error: analyze requires an input json path")
return
if not input_path.exists():
print(f"Error: path does not exist: {input_path}")
return
aicpu_pref_file = input_path
analyze_target = str(input_path)
if not aicpu_pref_file.exists():
print(f"Error: {aicpu_pref_file} does not exist")
return
print(f"Analyzing input: {analyze_target}")
aicpu_dev_pref = load_json(aicpu_pref_file)
if not isinstance(aicpu_dev_pref, list):
print("Error: invalid aicpu_dev_pref.json format, expected list")
return
normalize_cores_tasks_by_end(aicpu_dev_pref)
rounds = collect_round_ids(aicpu_dev_pref)
for round_id in rounds:
display_round = 1 if round_id is None else (round_id + 1)
round_name = f"round{display_round}"
headers = [
"Compute Units",
"DEV_TASK_BUILD(us)",
"ALLOC_THREAD_ID(us)",
"INIT(us)",
"CORE_HAND_SHAKE(us)",
"DEV_TASK_RCV(us)",
"Post-process(us)",
"End-to-End time(us)",
"Total run time(us)",
]
rows = build_round_combined_rows(aicpu_dev_pref, round_id)
table_lines = render_table_lines(headers, rows)
table_width = max(display_width(line) for line in table_lines) if table_lines else None
print_section(round_name, table_width)
print_table(headers, rows)
print()
def main():
parser = argparse.ArgumentParser(description='Performance data processing tool')
subparsers = parser.add_subparsers(dest='command', help='Available commands', required=True)
parse_parser = subparsers.add_parser('parse_log', help='Parse device log and generate performance JSON')
parse_parser.add_argument('input_file', help='Path to input log file')
parse_parser.add_argument('output_file', help='Path to output JSON file')
perfetto_parser = subparsers.add_parser('gen_perfetto', help='Convert performance JSON to Perfetto format')
perfetto_parser.add_argument('input_file', help='Input JSON file path')
perfetto_parser.add_argument('output_file', help='Output Perfetto JSON file path')
perfetto_parser.add_argument('kernel_file', help='aicore kernel Perfetto JSON file path', default="", nargs='?')
example_parser = subparsers.add_parser('gen_perfetto_example', help='Generate example Perfetto data')
analyze_parser = subparsers.add_parser('analyze', help='Analyze perf json by output dir or json file path')
analyze_parser.add_argument(
'output_dir',
nargs='?',
help='Output directory or perf json file path; latest output_* if omitted',
)
args = parser.parse_args()
if args.command == 'parse_log':
parse_log_command(args.input_file, args.output_file)
elif args.command == 'gen_perfetto':
gen_perfetto_command(args.input_file, args.output_file, args.kernel_file)
elif args.command == 'gen_perfetto_example':
gen_perfetto_example()
elif args.command == 'analyze':
analyze_output_command(args.output_dir)
else:
parser.print_help()
if __name__ == "__main__":
main()