from unittest.mock import patch, MagicMock, mock_open
import json
import os
import re
import shutil
import sqlite3
import logging
from pathlib import Path
import pandas as pd
import pytest
import sys
import stat
from ms_service_profiler.parse import Task
from ms_service_profiler.parse import (
main
)
from ms_service_profiler.task.task_register import get_dag, TaskDag, regist_map, TaskRegisterInfo
from ms_service_profiler.data_source.mspti_data_source import (
MsptiDataSource
)
logging.basicConfig(level=logging.INFO)
class LoadDataError(Exception):
pass
def load_prof(filepaths):
return filepaths
def build_msproftx_db(db_path):
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE MsprofTxEx (
pid INTEGER,
tid INTEGER,
event_type TEXT,
start_time INTEGER,
end_time INTEGER,
mark_id INTEGER,
message TEXT
);
""")
conn.commit()
cursor.execute("""
INSERT INTO MsprofTxEx (pid, tid, event_type, start_time, end_time, mark_id, message)
VALUES (38282, 38282, 'marker', 79857442762972, '79857442762972', '464', 'span=462*{^name^: ^BatchSchedule^}'),
(38282, 38282, 'start/end', 79857442726897, '79857442763640', '462', 'BatchSchedule');
""")
conn.commit()
conn.close()
@pytest.fixture
def setup_test_directory(tmp_path):
prof_dir = tmp_path / "PROF_test"
prof_dir.mkdir()
(prof_dir / "host_cpu_usage.db").write_text("cpu data")
(prof_dir / "host_mem_usage.db").write_text("memory data")
(prof_dir / "host_start.log").write_text("""
cntvct: 123
clock_monotonic_raw: 456
""")
(prof_dir / "info.json").write_text('{"key": "value"}')
(prof_dir / "start_info").write_text(
'{"collectionTimeBegin": "123456.789", "clockMonotonicRaw": "0"}')
(prof_dir / "msprof_20250211122756.json").write_text('{"data": "example data"}')
db_path = prof_dir / "msproftx.db"
build_msproftx_db(db_path)
yield tmp_path
for filename in tmp_path.rglob('*'):
if filename.is_file():
filename.unlink()
elif filename.is_dir():
shutil.rmtree(filename)
tmp_path.rmdir()
class MockTask(Task):
name = "mock_task"
@classmethod
def depends(cls):
return ["depend_task1", "depend_task2"]
class MockTask2(Task):
name = "mock_task2"
@classmethod
def depends(cls):
return []
def test_build_task_dag():
regist_map.update(dict(mock_task=TaskRegisterInfo(task_cls="mock_task", name="mock_task",
data_depends=["depend_task1", "depend_task2"],
data_outputs=["mock_task"]),
mock_task2=TaskRegisterInfo(task_cls="mock_task2", name="mock_task2",
data_depends=["prof_path"], data_outputs=["mock_task2"]),
depend_task1= TaskRegisterInfo(task_cls="depend_task1", name='depend_task1',
data_depends=["prof_path"], data_outputs=['depend_task1']),
depend_task2=TaskRegisterInfo(task_cls="depend_task2", name='depend_task2',
data_depends=["prof_path"], data_outputs=['depend_task2'])))
result = get_dag(["mock_task", "mock_task2"])
assert isinstance(result, tuple)
assert len(result) == 2
dag, head_tasks = result
assert isinstance(dag, TaskDag)
print("##################", dag)
print("##################", dag.dag_data_flow)
print("##################", dag.dag_task_flow)
print("##################", dag.head_tasks_name)
print("##################", dag.ordered_tasks_name)
assert dag.get_next_task_names("depend_task1") == ["mock_task"]
assert dag.get_next_task_names("depend_task2") == ["mock_task"]
assert dag.get_prev_task_names("mock_task") == ["depend_task1", "depend_task2"]
assert dag.get_prev_task_names("mock_task2") == []
assert len(head_tasks) == 3
assert "mock_task2" in [x.name for x in head_tasks]
assert "depend_task1" in [x.name for x in head_tasks]
assert "depend_task2" in [x.name for x in head_tasks]
regist_map.clear()
def test_timestamp_conversion_and_duration_calculation(setup_test_directory):
"""
测试时间戳转换和持续时间计算是否正确。
"""
data = {
"timestamp": [1622547600000, 1622547602000],
"endTimestamp": [1622547601000, 1622547603000],
"message": ["{\"key\":\"value\"}", "{\"key\":\"value2\"}"],
"hostname": ["host1", "host2"]
}
df = pd.DataFrame(data)
df = df.reset_index(drop=True).rename(columns={'timestamp': 'start_time', 'endTimestamp': 'end_time'})
df[['start_time', 'end_time']] = df[['start_time', 'end_time']].div(1000)
df['during_time'] = df['end_time'] - df['start_time']
assert df['start_time'].tolist() == [1622547600, 1622547602], "开始时间戳转换应正确"
assert df['end_time'].tolist() == [1622547601, 1622547603], "结束时间戳转换应正确"
assert df['during_time'].tolist() == [1, 1], "持续时间计算应正确"
def test_timestamp_to_local_time(setup_test_directory):
"""
测试时间戳转换为本地时间(上海时区)是否正确。
"""
data = {
"start_time": [1622547600, 1622547602],
"end_time": [1622547601, 1622547603]
}
df = pd.DataFrame(data)
df['start_datetime'] = pd.to_datetime(df['start_time'], unit='s', utc=True).dt.tz_convert(
'Asia/Shanghai').dt.strftime("%Y-%m-%d %H:%M:%S:%f")
df['end_datetime'] = pd.to_datetime(df['end_time'], unit='s', utc=True).dt.tz_convert(
'Asia/Shanghai').dt.strftime("%Y-%m-%d %H:%M:%S:%f")
expected_start_datetime = "2021-06-01 19:40:00:000000"
expected_end_datetime = "2021-06-01 19:40:01:000000"
assert df.iloc[0]['start_datetime'] == expected_start_datetime, "开始时间戳转换应正确"
assert df.iloc[0]['end_datetime'] == expected_end_datetime, "结束时间戳转换应正确"
def test_message_field_processing(setup_test_directory):
"""
测试消息字段处理和展开是否正确。
"""
data = {
"message": ["{\"key\":\"value\"}", "{\"key\":\"value2\"}"],
"hostname": ["host1", "host2"]
}
df = pd.DataFrame(data)
df['message'] = (
df['message']
.str.replace(r'\^', '"', regex=True)
.where(
lambda s: s.str.match(r'^{.*}$'),
other=lambda s: "{" + s.str.replace(r",$", "", regex=True) + "}"
)
.apply(json.loads)
)
msg_df = pd.json_normalize(df['message'])
all_data_df = df.join(msg_df)
assert all_data_df.shape == (2, 3), "展开后的 DataFrame 的列数应正确"
assert all_data_df.columns.tolist() == ["message", "hostname", "key"], "展开后的列名应正确"
assert all_data_df.iloc[0]['key'] == "value", "展开后的 key 值应正确"
assert all_data_df.iloc[1]['key'] == "value2", "展开后的 key 值应正确"
def test_add_and_rename_hostname(setup_test_directory):
"""
测试添加 hostname 列并将其重命名为 hostuid 是否正确。
"""
data = {
"hostname": ["host1", "host2"]
}
df = pd.DataFrame(data)
df.insert(0, 'hostuid', df['hostname'])
assert df.columns.tolist() == ["hostuid", "hostname"], "列名应正确"
assert df.iloc[0]['hostuid'] == "host1", "hostuid 列的值应正确"
assert df.iloc[1]['hostuid'] == "host2", "hostuid 列的值应正确"
def test_main():
input_path = "test_input"
output_path = "test_output"
os.makedirs(input_path, exist_ok=True)
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.chmod(input_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
sys.argv = [
"test_script.py",
"--input-path", input_path,
"--output-path", output_path,
"--log-level", "info",
"--format", "db", "csv", "json"
]
try:
main()
print("main() 函数运行成功,没有报错。")
except Exception as e:
print(f"main() 函数运行失败,报错信息:{e}")
finally:
if os.path.exists(input_path):
shutil.rmtree(input_path)
if os.path.exists(output_path):
shutil.rmtree(output_path)
def test_load_ops_db_with_valid_db_path(setup_test_directory):
tmp_path = setup_test_directory
db_path = tmp_path / "PROF_test" / "msproftx.db"
assert os.path.exists(db_path)
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS Api (
name TEXT,
start INTEGER,
end INTEGER,
processId INTEGER,
threadId INTEGER,
correlationId INTEGER
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS Kernel (
name TEXT,
type TEXT,
start INTEGER,
end INTEGER,
deviceId INTEGER,
streamId INTEGER,
correlationId INTEGER
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS Communication (
name TEXT,
start INTEGER,
end INTEGER,
deviceId INTEGER,
streamId INTEGER,
dataCount INTEGER,
dataType TEXT,
commGroupName TEXT,
correlationId INTEGER
);
""")
cursor.execute("""
INSERT INTO Api (name, start, end, processId, threadId, correlationId)
VALUES ('api1', 1, 2, 1, 1, 1)
""")
conn.commit()
conn.close()
api_df, kernel_df, communication_df = MsptiDataSource.load_ops_db(str(db_path), 1)
assert api_df.shape[0] == 1
assert api_df["db_id"].iloc[0] == 1