"""The parser for step trace data."""
import csv
import json
import os
import stat
import struct
from collections import namedtuple
from decimal import Decimal
from abc import abstractmethod
from mindspore.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
ProfilerIOException, ProfilerRawFileException
from mindspore import log
from mindspore.profiler.common.util import get_summary_for_step_trace
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
ProfilingHeadStruct = namedtuple(
'ProfilingHeadStruct', ['mode', 'rptType', 'bufSize']
)
StepTraceStruct = namedtuple(
'StepTraceStruct', ['timeStamp', 'index_id', 'model_id', 'stream_id', 'task_id', 'tag_id']
)
class BaseStepTraceParser:
"""
The parser for step trace data.
Args:
input_dir (str): The directory that contains original step trace data.
output_file_path (str): The output file path.
job_id (int): The job id used to define the start of new step. Default: 0.
skip_first_step (bool): Whether skip the first step or not.
is_training_mode (bool): Whether in training mode or not.
is_gpu_kernel_async_launch (bool): Whether is gpu kernel async launch or not.
"""
def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False,
is_training_mode=True, is_gpu_kernel_async_launch=False):
self._input_dir = input_dir
self._output_path = output_file_path
self._job_id = job_id
self._skip_first_step = skip_first_step
self._result = []
self._header = []
self._step_num = 0
self._tag_map = {}
self._is_training_mode = is_training_mode
self._step_end_tag_id = 4
self._is_gpu_kernel_async_launch = is_gpu_kernel_async_launch
self._model_start_tag_id = 0
self._model_end_tag_id = 1
self._fp_tag_id = 2
self._bp_tag_id = 3
self._reduce_min_tag_id = 10000
self._reduce_max_tag_id = 20000
self._profiling_head_len = 4
self._profiling_head_pad_len = 4
self._st_data_len = 8 + 8 + 8 + 2 + 2 + 2
@property
def output_file(self):
"""The property of step trace header."""
file_name = self._output_path.rsplit('/', 2)
return file_name[-1] if len(file_name) == 3 else ''
def show(self):
"""The property of step trace info."""
summary_info = {}
if self._result:
summary_info = get_summary_for_step_trace(self._result[-1], self._header, self._is_training_mode)
summary_info['total_steps'] = len(self._result) - 1
print('\nStep trace summary info (unit: syscnt):')
print(summary_info)
print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
% self.output_file)
def parse_and_save(self):
"""Parse step trace files and save the result."""
try:
source_files = self._get_step_trace_files()
if self._is_gpu_kernel_async_launch:
self._parse_async_launch(source_files)
else:
self._parse(source_files)
self._save()
except IOError as err:
log.warning(err)
raise ProfilerIOException()
else:
log.info("Finish to save intermediate result for step trace file.")
def record_point_info(self, point_info, output_path):
"""
Record point info into json.
Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
def update_tag_op_type_map(self, point_info):
"""
update the map from tag id to op type.
Args:
point_info (dict): The point info about tag id and relative op name.
"""
self._get_step_trace_files()
tag_map = {}
for tag, op_name in point_info.items():
op_type = self._get_op_type(tag, op_name)
tag_map[tag] = op_type
log.info("Get tag types for step trace analysis: %s", tag_map)
self._tag_map = tag_map
def _get_op_type(self, tag, name):
"""
Get op type from tag and name.
Args:
tag (int): The tag id.
name (str): The op name.
Returns:
str, the op type or communication op name.
"""
tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._step_end_tag_id: 'end'}
op_type = tag_map.get(tag, '')
if op_type:
return op_type
if tag == 0:
return 'start'
op_name = name.rsplit('/', 1)[-1]
if not op_name:
log.warning("Unexpected op name:%s", name)
return op_name
def _get_step_trace_files(self):
"""Get step trace files."""
return self._input_dir
@staticmethod
def _search_file(input_dir):
"""Search step trace file under specific input directory."""
if not os.path.isdir(input_dir):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(input_dir)
)
files = os.listdir(input_dir)
step_trace_files = list(
filter(
lambda file: file.startswith('ts_track.data') and not file.endswith('.done'),
files
)
)
if len(step_trace_files) > 1:
try:
step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
except ValueError as err:
log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
step_trace_files = []
else:
training_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
if len(training_trace_files) >= 1:
log.warning("The training_trace file structure is changed, please upgrade "
"mindspore and regenerate profiling data")
file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
log.info("Find %d step trace files.", len(file_paths))
return file_paths
@abstractmethod
def _parse(self, source_files):
"""Parse source step trace files."""
def _get_next_step_trace(self, content, event_info):
"""
Get next step trace info.
Args:
content (bytes): The input step trace info.
event_info (dict): The event info.
Returns:
Generator, return the step trace one by one.
"""
start_time = event_info.get('end', '-')
event_info['start'] = start_time
if 'reduce' not in event_info.keys():
event_info['reduce'] = {}
i = 0
while i < len(content):
profiling_head_data = content[i:i + self._profiling_head_len]
parsed_head = struct.unpack('BBH', profiling_head_data)
profiling_head = ProfilingHeadStruct(*parsed_head)
if profiling_head.rptType == 10:
st_data = content[i + self._profiling_head_len + self._profiling_head_pad_len:
i + self._profiling_head_len + self._profiling_head_pad_len + self._st_data_len]
parsed_data = struct.unpack('QQQHHH', st_data)
next_event = StepTraceStruct(*parsed_data)
self._construct_event_info(next_event, event_info)
if event_info.get('end'):
yield event_info
start_time = event_info.get('end', '-')
event_info.clear()
event_info['start'] = start_time
event_info['reduce'] = {}
i = i + profiling_head.bufSize
def _construct_event_info(self, next_event, event_info):
"""Construct event info according to next_event."""
end_flag: bool = lambda tag: tag == self._step_end_tag_id
fp_flag: bool = lambda tag: tag == self._fp_tag_id
bp_flag: bool = lambda tag: tag == self._bp_tag_id
reduce_flag: bool = lambda tag: self._reduce_min_tag_id <= tag < self._reduce_max_tag_id
def _on_reduce_event(reduce_tag_id):
"""Handle reduce event."""
stream_id = next_event.stream_id
if event_info['reduce'].get(stream_id):
event_info['reduce'][stream_id].append((reduce_tag_id, time_stamp))
else:
event_info['reduce'][stream_id] = [(reduce_tag_id, time_stamp)]
tag_id = next_event.tag_id
time_stamp = next_event.timeStamp
if end_flag(tag_id):
event_info['end'] = time_stamp
elif fp_flag(tag_id):
event_info['fp'] = time_stamp
elif bp_flag(tag_id):
event_info['bp'] = time_stamp
elif reduce_flag(tag_id):
_on_reduce_event(tag_id)
def _record_trace_event(self, step_trace):
"""Record trace event."""
self._step_num += 1
start_time = step_trace.get('start')
end_time = step_trace.get('end')
fp_time = step_trace.get('fp')
bp_time = step_trace.get('bp')
if not (start_time and end_time and fp_time and bp_time):
log.warning("The step %d lacks basic time.", self._step_num)
return
if start_time == '-':
start_time = fp_time
row_data = {
'step_num': self._step_num,
'start_point': start_time,
'end_point': end_time,
'total': end_time - start_time,
'fp_point': fp_time,
'bp_point': bp_time,
'iteration_interval': fp_time - start_time,
'fp_and_bp': bp_time - fp_time,
'tail': end_time - bp_time
}
self._update_reduce_info(step_trace, row_data)
if not self._header:
self._header = list(row_data.keys())
row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
self._result.append(row_data_list)
def _update_reduce_info(self, step_trace, row_data):
"""Extract reduce info."""
reduce_time = step_trace.get('reduce', {})
for stream_id, time_points in reduce_time.items():
time_point_num = len(time_points)
if time_point_num % 2:
log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
continue
for index, point_id in enumerate(range(0, time_point_num, 2)):
field_name = f'stream_{stream_id}_{index}'
reduce_info = self._get_single_reduce_event_info(
field_name, time_points[point_id], time_points[point_id + 1])
row_data.update(reduce_info)
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
Returns:
dict, reduce info.
"""
ret_dict = {}
return ret_dict
def _record_average_info(self):
"""Calculate average info."""
result_size = len(self._result)
average_data = [0] * len(self._header)
if result_size >= 2:
for row_info in self._result[1:]:
average_data = [
Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
]
average_data = [
round((item / (result_size - 1))) for item in average_data
]
step_num_index = self._header.index('step_num')
average_data[step_num_index] = '-'
self._result.append(average_data)
log.info("Finish add average info for step trace.")
def _save(self):
"""save step trace file."""
bp_point, tail, fp_duration = 5, -1, -2
log.info("Start to save step trace file.")
if not self._header:
return
try:
with open(self._output_path, 'w') as file_handle:
csv_writer = csv.writer(file_handle)
if not self._is_training_mode:
self._header[fp_duration] = 'fp'
self._header = self._header[:bp_point] + self._header[bp_point + 1:tail]
csv_writer.writerow(self._header)
for row_data in self._result:
if not self._is_training_mode:
row_data[fp_duration] += row_data[tail]
row_data = row_data[:bp_point] + row_data[bp_point + 1:tail]
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD | stat.S_IWRITE)
except (IOError, OSError) as err:
log.warning('Failed to save step trace raw info. %s', err)
raise ProfilerIOException
class GpuStepTraceParser(BaseStepTraceParser):
"""The parser for gpu step trace data."""
def get_fp_bp(self, f_obj, all_step_fp, all_step_bp):
"""Parser the fp and bp."""
fp_start, bp_end = 0, 1
if self._is_gpu_kernel_async_launch:
for line in f_obj:
line = line.strip().split()
all_step_fp.append(line[1].split(',')[0])
all_step_bp.append(line[2].split(',')[0])
else:
lines = f_obj.readlines()
all_step_fp.append(lines[fp_start].split()[0])
all_step_bp.append(lines[bp_end].split()[0])
def record_point_info(self, source_file, output_path):
"""
Record point info into json.
Args:
source_file (str): The file path of step trace original data.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
all_step_points = []
all_step_fp = []
all_step_bp = []
try:
with open(source_file, 'r') as f_obj:
self.get_fp_bp(f_obj, all_step_fp, all_step_bp)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException
for fp_name, bp_name in zip(all_step_fp, all_step_bp):
if self._is_training_mode:
points = {
'fp_start': fp_name,
'bp_end': bp_name
}
else:
points = {
'fp_start': fp_name,
}
all_step_points.append(points)
try:
with open(output_path, 'w') as json_file:
if self._is_gpu_kernel_async_launch:
json.dump(all_step_points, json_file)
else:
json.dump(all_step_points[0], json_file)
os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return all_step_points[0]
def _get_step_trace_files(self):
"""Get step trace files."""
return self._input_dir
def _parse(self, source_file):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
fp_start, bp_end, iter_end, iter_start = 0, 1, 2, 3
reduce_start = 4
start_time, end_time = 0, 1
step_trace_point_count = 3
source_file = validate_and_normalize_path(source_file)
try:
with open(source_file, 'r') as f:
lines = f.readlines()
if len(lines) < step_trace_point_count:
raise ProfilerRawFileException(
f"Failed to parse {source_file} file. The FP_POINT/BP_POINT/ITER_END_POINT "
f"do not recognized correctly. Try to set the environment variable'PROFILING_FP_START' "
f"and 'PROFILING_BP_END' to solve this problem. For example, "
f"'export PROFILING_FP_START=Default/xxx/Conv2d-op1' ")
step_trace_info_all = [line.strip().split()[1:] for line in lines]
num_of_step = len(step_trace_info_all[0])
for step_trace_point in step_trace_info_all:
if len(step_trace_point) != num_of_step:
raise ProfilerRawFileException(
f"Failed to parse {source_file} file. Due to the profiled "
f"step_num of FP/BP/ITER_END Point are not equal")
iter_start_info = [step_trace_info_all[fp_start][0]] + \
step_trace_info_all[iter_end][:num_of_step]
step_trace_info_all.insert(iter_start, iter_start_info)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException
for step_num in range(num_of_step):
step_trace = {
'start': int(step_trace_info_all[iter_start][step_num].split(',')[start_time]),
'fp': int(step_trace_info_all[fp_start][step_num].split(',')[start_time]),
'bp': int(step_trace_info_all[bp_end][step_num].split(',')[end_time]),
'end': int(step_trace_info_all[iter_end][step_num].split(',')[end_time]),
'reduce': {}
}
num_of_step_point = len(step_trace_info_all)
if num_of_step_point > reduce_start:
reduce_info = {}
reduce_time_info = []
for reduce_idx in range(reduce_start, num_of_step_point):
cur_reduce_time = step_trace_info_all[reduce_idx][step_num]
reduce_time_info += cur_reduce_time.split(',')
reduce_info['ops'] = reduce_time_info
step_trace['reduce'] = reduce_info
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")
def _parse_one_step(self, line):
"""
Parse step text line to dict obj.
Args:
line (str): The step trace line text, it contains five parts, each part is separated by a space.
part 1: start_op_name,start_op_time
part 2: fp_op_name,fp_time
part 3: bp_op_name,bp_time
part 4: end_op_name,end_time
part 5: [reduce_op_name,reduce1_start],it contains multiple reduce, each reduce is separated by a space.
"""
line = line.strip().split()
start_time = int(line[0].split(',')[1][:-1])
fp_time = int(line[1].split(',')[1][:-1])
bp_time = int(line[2].split(',')[1][:-1])
end_time = int(line[3].split(',')[1][:-1])
reduce_info = {}
reduce_time_info = []
for reduce_item in line[4:]:
reduce_time_info.append(reduce_item.split(',')[1][:-1])
reduce_time_info.append(reduce_item.split(',')[2][:-1])
step_trace = {
'start': start_time,
'fp': fp_time,
'bp': bp_time,
'end': end_time
}
if reduce_time_info:
reduce_info['ops'] = reduce_time_info
step_trace['reduce'] = reduce_info
self._record_trace_event(step_trace)
def _parse_async_launch(self, source_file):
"""Parse source step trace files generated from async launch kernel."""
log.info("Start to parse step trace file.")
source_file = validate_and_normalize_path(source_file)
try:
with open(source_file, 'r') as f_obj:
for line in f_obj:
self._parse_one_step(line)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (str): Start point time.
end_point (str): End point time.
Returns:
dict, reduce info.
"""
reduce_info = {}
op_type = 'AllReduce'
field_name += '_' + op_type
reduce_info[field_name] = int(end_point) - int(start_point)
reduce_info[field_name + '_start_point'] = start_point
reduce_info[field_name + '_end_point'] = end_point
return reduce_info
class AscendStepTraceParser(BaseStepTraceParser):
"""The parser for ascend step trace data."""
_event_size = 20
_fp_tag = 2
_bp_tag = 3
_step_trace_files = []
def record_point_info(self, point_info, output_path):
"""
Record point info into json.
Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
if self._is_training_mode:
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
else:
points = {
'fp_start': point_info.get(self._fp_tag, ''),
}
if os.path.exists(output_path):
return points
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD | stat.S_IWRITE)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points
def _get_step_trace_files(self):
"""Get step trace files."""
if self._step_trace_files:
return self._step_trace_files
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')
self._step_trace_files = step_trace_files
return step_trace_files
def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}
for source_file in source_files:
source_file = validate_and_normalize_path(source_file)
try:
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
except (IOError, OSError) as err:
log.warning(f'Failed to read {source_file}', err)
raise ProfilerIOException
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or start_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]
return reduce_info