import json
import logging
from collections import OrderedDict
from collections import defaultdict
from common_func.db_name_constant import DBNameConstant
from common_func.info_conf_reader import InfoConfReader
from common_func.ms_constant.number_constant import NumberConstant
from common_func.ms_constant.stars_constant import StarsConstant
from common_func.ms_constant.str_constant import StrConstant
from common_func.trace_view_header_constant import TraceViewHeaderConstant
from common_func.trace_view_manager import TraceViewManager
from msmodel.interface.view_model import ViewModel
from common_func.platform.chip_manager import ChipManager
class StarsChipTransView:
"""
Model of viewer for chip trans
"""
PA_LINK_RX = "PA Link Rx"
PA_LINK_TX = "PA Link Tx"
PCIE_WRITE = "PCIE Write Bandwidth"
PCIE_READ = "PCIE Read Bandwidth"
PA_ID = "PA Link ID"
PCIE_ID = "PCIE ID"
TIMELINE_MAP = {
StarsConstant.TYPE_STARS_PA: [PA_LINK_RX, PA_LINK_TX, PA_ID],
StarsConstant.TYPE_STARS_PCIE: [PCIE_WRITE, PCIE_READ, PCIE_ID]
}
def __init__(self: any, configs: dict, params: dict) -> None:
self._configs = configs
self._params = params
self._project_path = params.get(StrConstant.PARAM_RESULT_DIR)
self._model = None
self._timeline_data = []
self._pid = InfoConfReader().get_json_pid_data()
self._tid = InfoConfReader().get_json_tid_data()
def get_timeline_data(self: any) -> list:
"""
get timeline data of stars chip trans
:return:
"""
if ChipManager().is_chip_v6():
self.get_pcie_data_v6()
else:
self.get_pa_data()
self.get_pcie_data()
if not self._timeline_data:
logging.error("Failed to get stars chip trans data, please check the data parsing log.")
return []
self._timeline_data.extend(TraceViewManager.metadata_event(
[["process_name", self._pid, self._tid, "Stars Chip Trans"]]))
return self._timeline_data
def get_pa_data(self: any) -> None:
"""
get stars pa link data
:return:
"""
pa_model = ViewModel(self._project_path,
DBNameConstant.DB_STARS_CHIP_TRANS, [DBNameConstant.TABLE_STARS_PA_LINK])
if not pa_model.init():
logging.error("Can not get stars chip trans data, please check the data parsing log.")
return
if not pa_model.check_table():
return
sql = "select pa_link_id,pa_link_traffic_monit_rx,pa_link_traffic_monit_tx,sys_time/{NS_TO_US} from {0}" \
.format(DBNameConstant.TABLE_STARS_PA_LINK, NS_TO_US=NumberConstant.NS_TO_US)
pa_data = pa_model.get_sql_data(sql)
if not pa_data:
return
self._timeline_data.extend(TraceViewManager.metadata_event(
[["thread_name", self._pid, StarsConstant.TYPE_STARS_PA, "PA Link"]]))
self._format_data(pa_data, StarsConstant.TYPE_STARS_PA)
def get_pcie_data(self: any) -> None:
"""
get stars pcie data
:return:
"""
pcie_model = ViewModel(self._project_path,
DBNameConstant.DB_STARS_CHIP_TRANS, [DBNameConstant.TABLE_STARS_PCIE])
pcie_data = []
with pcie_model:
sql = "select pcie_id,pcie_write_bandwidth,pcie_read_bandwidth,sys_time/{NS_TO_US} from {0}" \
.format(DBNameConstant.TABLE_STARS_PCIE, NS_TO_US=NumberConstant.NS_TO_US)
pcie_data = pcie_model.get_sql_data(sql)
if not pcie_data:
return
self._timeline_data.extend(TraceViewManager.metadata_event(
[["thread_name", self._pid, StarsConstant.TYPE_STARS_PCIE, "PCIE"]]))
self._format_data(pcie_data, StarsConstant.TYPE_STARS_PCIE)
def get_pcie_data_v6(self: any) -> None:
"""
get stars pcie data v6
:return:
"""
pcie_model = ViewModel(self._project_path,
DBNameConstant.DB_STARS_CHIP_TRANS, [DBNameConstant.TABLE_STARS_PCIE_V6])
pcie_data = []
with pcie_model:
sql = "select die_id,pcie_write_bandwidth,pcie_read_bandwidth,sys_time/{NS_TO_US} from {0}" \
.format(DBNameConstant.TABLE_STARS_PCIE_V6, NS_TO_US=NumberConstant.NS_TO_US)
pcie_data = pcie_model.get_sql_data(sql)
if not pcie_data:
return
pcie_data_dict = defaultdict(dict)
for data in pcie_data:
pcie_data_dict.setdefault(data[3], {}).update({data[0]: data[1:]})
self._timeline_data.extend(TraceViewManager.metadata_event(
[["thread_name", self._pid, StarsConstant.TYPE_STARS_PCIE, "PCIE"]]))
stars_cons = self.TIMELINE_MAP.get(StarsConstant.TYPE_STARS_PCIE)
_res = []
for key, data in pcie_data_dict.items():
_res.extend(
[[stars_cons[0], InfoConfReader().trans_into_local_time(key, use_us=True), self._pid,
StarsConstant.TYPE_STARS_PCIE,
OrderedDict([("U-Die" + str(_key), _val[0]) for _key, _val in data.items()])],
[stars_cons[1], InfoConfReader().trans_into_local_time(key, use_us=True), self._pid,
StarsConstant.TYPE_STARS_PCIE,
OrderedDict([("U-Die" + str(_key), _val[1]) for _key, _val in data.items()])]
])
self._timeline_data.extend(TraceViewManager.column_graph_trace(TraceViewHeaderConstant.COLUMN_GRAPH_HEAD_LEAST,
_res))
def _format_data(self: any, stars_data: list, stars_type: int) -> None:
_res = []
stars_cons = self.TIMELINE_MAP.get(stars_type)
for _data in stars_data:
_res.extend(
[[stars_cons[0], InfoConfReader().trans_into_local_time(_data[3], use_us=True), self._pid,
stars_type, OrderedDict([(stars_cons[0], _data[1]), (stars_cons[2], _data[0])])],
[stars_cons[1], InfoConfReader().trans_into_local_time(_data[3], use_us=True), self._pid,
stars_type, OrderedDict([(stars_cons[1], _data[2]), (stars_cons[2], _data[0])])]
])
self._timeline_data.extend(TraceViewManager.column_graph_trace(TraceViewHeaderConstant.COLUMN_GRAPH_HEAD_LEAST,
_res))