# Copyright (c) 2026 Huawei Technologies Co., Ltd
# All rights reserved.
#
# Licensed under the BSD 3-Clause License  (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://opensource.org/licenses/BSD-3-Clause
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Owner(s): ["oncall: profiler"]
import copy
import json
import os
import stat
import subprocess
import time
import unittest.mock as mock
from unittest.mock import MagicMock, patch

import torch_npu.profiler.dynamic_profile as dp
from torch_npu.profiler._dynamic_profiler._dynamic_profiler_config_context import (
    ConfigContext,
)
from torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor_shm import (
    DynamicProfilerShareMemory,
)
from torch_npu.profiler._dynamic_profiler._dynamic_profiler_utils import (
    DynamicProfilerUtils,
)
from torch_npu.profiler.dynamic_profile import _DynamicProfile
from torch_npu.profiler.profiler import profile, tensorboard_trace_handler
from torch_npu.profiler.scheduler import Schedule as schedule
from torch_npu.testing.testcase import run_tests, TestCase
from torch_npu.utils._path_manager import PathManager

import torch


class SmallModel(torch.nn.Module):
    def __init__(self, in_channel=3, out_channel=12):
        super().__init__()
        self.conv1 = torch.nn.Conv2d(in_channel, in_channel, 3, padding=1)
        self.relu1 = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv2d(in_channel, out_channel, 3, padding=1)

    def forward(self, input_1):
        input_1 = self.conv1(input_1)
        input_1 = self.relu1(input_1)
        input_1 = self.conv2(input_1)
        return input_1.reshape(input_1.shape[0], -1)


class TrainModel:
    def __init__(self):
        self.input_shape = (4, 3, 24, 24)
        self.out_shape = (4, 12, 24, 24)
        self.device = torch.device("npu:0")
        self.model = SmallModel(self.input_shape[1], self.out_shape[1]).to(self.device)
        self.criterion = torch.nn.MSELoss()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=0.0001)

    def train_one_step(self):
        inputs = torch.rand(self.input_shape).to(self.device)
        target = (
            torch.rand(self.out_shape).reshape(self.out_shape[0], -1).to(self.device)
        )
        output = self.model(inputs)
        loss = self.criterion(output, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


class TestDynamicProfiler(TestCase):
    TRACE_FILE_NAME = "trace_view.json"
    KERNEL_FILE_NAME = "kernel_details.csv"
    model_train = TrainModel()
    small_steps = 1
    large_steps = 5
    flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
    mode = stat.S_IRUSR | stat.S_IWUSR
    start_step = 0

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.json_sample = DynamicProfilerShareMemory.JSON_DATA
        cls.results_path = f"./dynamic_profiler_results_{str(os.getpid())}"
        cls.default_prof_dir = os.path.join(cls.results_path, "default_prof_dir")
        cls.rank_prof_dir = os.path.join(cls.results_path, "rank_prof_dir")
        cls.invalid_rank_prof_dir = os.path.join(
            cls.results_path, "invalid_rank_prof_dir"
        )
        cls.active_rank_prof_dir = os.path.join(
            cls.results_path, "active_rank_prof_dir"
        )
        cls.cfg_prof_dir = os.path.join(cls.results_path, "cfg_prof_dir")
        cls.cfg_path = os.path.join(cls.results_path, "profiler_config.json")
        os.environ["RANK"] = "0"
        dp.init(cls.results_path)

    @classmethod
    def tearDownClass(cls):
        if os.path.exists(cls.results_path):
            PathManager.remove_path_safety(cls.results_path)

    def test_modify_cfg_prof_dir_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = 1
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_analyse_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["analyse"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_record_shapes_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["record_shapes"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_profile_memory_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["profile_memory"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_with_stack_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["with_stack"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_with_flops_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["with_flops"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_with_modules_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["with_modules"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_rank_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["is_rank"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_rank_list_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["is_rank"] = True
        cfg_json["rank_list"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_profiler_level_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["profiler_level"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_aic_metrics_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["aic_metrics"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_l2_cache_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["l2_cache"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_data_simplification_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["data_simplification"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_record_op_args_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["record_op_args"] = "1"
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def test_modify_cfg_export_type_invalid(self) -> None:
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["is_rank"] = False
        if "experimental_config" not in cfg_json:
            self.assertTrue("experimental_config" in cfg_json)
        cfg_json["experimental_config"]["export_type"] = 1
        cfg_ctx = ConfigContext(cfg_json)
        prof = profile(
            activities=cfg_ctx.activities(),
            schedule=schedule(
                wait=0, warmup=0, active=cfg_ctx.active(), repeat=1, skip_first=0
            ),
            on_trace_ready=tensorboard_trace_handler(
                self.cfg_prof_dir, analyse_flag=cfg_ctx.analyse()
            ),
            record_shapes=cfg_ctx.record_shapes,
            profile_memory=cfg_ctx.profile_memory,
            with_stack=cfg_ctx.with_stack,
            with_flops=cfg_ctx.with_flops,
            with_modules=cfg_ctx.with_modules,
            experimental_config=cfg_ctx.experimental_config,
        )
        prof.start()
        prof.stop()
        has_prof = False
        if self.has_prof_dir(self.cfg_prof_dir):
            has_prof = True
        if os.path.exists(self.cfg_prof_dir):
            PathManager.remove_path_safety(self.cfg_prof_dir)
        self.assertTrue(has_prof)

    def ps_eT_grep(self, thread_name):
        proc = subprocess.Popen(["ps", "-eT"], stdout=subprocess.PIPE, text=True)
        grep = subprocess.Popen(
            ["grep", thread_name],
            stdin=proc.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        output, _ = grep.communicate()
        return output.strip() != ""

    def test_dynamic_profiler_default_start(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = self.default_prof_dir
        cfg_json["start_step"] = TestDynamicProfiler.start_step + 1
        # 验证删除配置文件不会影响profiler功能
        os.remove(self.cfg_path)
        with os.fdopen(os.open(self.cfg_path, self.flags, self.mode), "w") as f:
            time.sleep(1)
            json.dump(cfg_json, f, indent=4)
        time.sleep(3)
        dp.step()
        # 检查监控进程DynMonitorProc名称是否正确
        self.assertTrue(self.ps_eT_grep("DynMonitorProc"))
        TestDynamicProfiler.start_step += 1
        self.model_train.train_one_step()
        dp.step()
        TestDynamicProfiler.start_step += 1
        has_prof = False
        if self.has_prof_dir(self.default_prof_dir):
            has_prof = True
        if os.path.exists(self.default_prof_dir):
            PathManager.remove_path_safety(self.default_prof_dir)
        self.assertTrue(has_prof)

    def test_dynamic_profiler_default_start_next_step(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = self.default_prof_dir
        cfg_json["start_step"] = -1
        with os.fdopen(os.open(self.cfg_path, self.flags, self.mode), "w") as f:
            time.sleep(1)
            json.dump(cfg_json, f, indent=4)
        time.sleep(3)
        dp.step()
        TestDynamicProfiler.start_step += 1
        self.model_train.train_one_step()
        dp.step()
        TestDynamicProfiler.start_step += 1
        has_prof = self.has_prof_dir(self.default_prof_dir)
        if os.path.exists(self.default_prof_dir):
            PathManager.remove_path_safety(self.default_prof_dir)
        self.assertTrue(has_prof)

    def test_dynamic_profiler_rank(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = self.rank_prof_dir
        cfg_json["is_rank"] = True
        cfg_json["rank_list"] = [0]
        cfg_json["start_step"] = TestDynamicProfiler.start_step + 1

        with os.fdopen(os.open(self.cfg_path, self.flags, self.mode), "w") as f:
            time.sleep(1)
            json.dump(cfg_json, f, indent=4)
        time.sleep(3)
        dp.step()
        TestDynamicProfiler.start_step += 1
        self.model_train.train_one_step()
        dp.step()
        TestDynamicProfiler.start_step += 1
        has_prof = False
        if self.has_prof_dir(self.rank_prof_dir):
            has_prof = True
        if os.path.exists(self.rank_prof_dir):
            PathManager.remove_path_safety(self.rank_prof_dir)
        self.assertTrue(has_prof)

    def test_dynamic_profiler_rank_invalid(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = self.invalid_rank_prof_dir
        cfg_json["is_rank"] = True
        cfg_json["rank_list"] = [1]
        cfg_json["start_step"] = TestDynamicProfiler.start_step + 1

        with os.fdopen(os.open(self.cfg_path, self.flags, self.mode), "w") as f:
            time.sleep(1)
            json.dump(cfg_json, f, indent=4)
        time.sleep(3)
        dp.step()
        TestDynamicProfiler.start_step += 1
        self.model_train.train_one_step()
        dp.step()
        TestDynamicProfiler.start_step += 1
        has_prof = False
        if self.has_prof_dir(self.invalid_rank_prof_dir):
            has_prof = True
        if os.path.exists(self.invalid_rank_prof_dir):
            PathManager.remove_path_safety(self.invalid_rank_prof_dir)
        self.assertFalse(has_prof)

    @staticmethod
    def has_prof_dir(path: str) -> bool:
        path = os.path.realpath(path)
        if not os.path.exists(path):
            return False
        for sub_dir in os.listdir(path):
            if sub_dir.endswith("_pt"):
                sub_dir = os.path.join(path, sub_dir)
                for p in os.listdir(sub_dir):
                    if p.startswith("PROF"):
                        return True
        return False

    @staticmethod
    def has_analyse_dir(path: str) -> bool:
        path = os.path.realpath(path)
        if not os.path.exists(path):
            return False
        for sub_dir in os.listdir(path):
            if sub_dir.endswith("_pt"):
                sub_dir = os.path.join(path, sub_dir)
                for p in os.listdir(sub_dir):
                    if p.startswith("ASCEND"):
                        return True
        return False

    def test_out_log_dyno_model(self):
        original_model = DynamicProfilerUtils.DYNAMIC_PROFILER_MODEL
        DynamicProfilerUtils.DYNAMIC_PROFILER_MODEL = (
            DynamicProfilerUtils.DynamicProfilerConfigModel.DYNO_CONFIG
        )
        original_stdout_log = DynamicProfilerUtils.stdout_log
        log_calls = []

        def log_function(infos, level):
            log_calls.append((infos, level))

        DynamicProfilerUtils.stdout_log = log_function

        try:
            DynamicProfilerUtils.out_log(
                "test information", DynamicProfilerUtils.LoggerLevelEnum.INFO
            )
            DynamicProfilerUtils.out_log(
                "test warning", DynamicProfilerUtils.LoggerLevelEnum.WARNING
            )
            DynamicProfilerUtils.out_log(
                "test error", DynamicProfilerUtils.LoggerLevelEnum.ERROR
            )

            self.assertEqual(len(log_calls), 3)
            self.assertEqual(
                log_calls[0],
                ("test information", DynamicProfilerUtils.LoggerLevelEnum.INFO),
            )
            self.assertEqual(
                log_calls[1],
                ("test warning", DynamicProfilerUtils.LoggerLevelEnum.WARNING),
            )
            self.assertEqual(
                log_calls[2], ("test error", DynamicProfilerUtils.LoggerLevelEnum.ERROR)
            )
        finally:
            DynamicProfilerUtils.DYNAMIC_PROFILER_MODEL = original_model
            DynamicProfilerUtils.stdout_log = original_stdout_log

    def test_clean_shm_for_killed_pid_time_none(self):
        shm_instance = DynamicProfilerShareMemory(self.results_path, self.cfg_path, 0)
        with mock.patch(
            "torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor_shm.DynamicProfilerShareMemory._get_pid_st_ctime",
            return_value=None,
        ):
            try:
                shm_instance._clean_shm_for_killed()
            except Exception:
                self.fail(
                    "_clean_shm_for_killed should not raise exception when pid_time is None"
                )

    def test_create_shm_over_py38_retry_failure(self):
        from multiprocessing import shared_memory

        with mock.patch(
            "multiprocessing.resource_tracker.register", lambda *args, **kwargs: None
        ):
            with mock.patch.object(
                shared_memory.SharedMemory,
                "__init__",
                side_effect=FileNotFoundError("Test error"),
            ):
                with self.assertRaises(RuntimeError):
                    shm_instance = DynamicProfilerShareMemory(
                        self.results_path, self.cfg_path, 0
                    )
                    shm_instance._create_shm_over_py38()

    def test_get_pid_st_ctime_exception(self):
        shm_instance = DynamicProfilerShareMemory(self.results_path, self.cfg_path, 0)
        with mock.patch("os.open", side_effect=Exception("Test exception")):
            result = shm_instance._get_pid_st_ctime(12345)
            self.assertIsNone(result)

    def test_call_dyno_monitor_with_proxy(self):
        from torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor import (
            DynamicProfilerMonitor,
        )

        monitor = DynamicProfilerMonitor()
        mock_proxy = MagicMock()
        with patch(
            "torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor.PyDynamic"
            "MonitorProxySingleton"
        ) as mock_singleton:
            mock_singleton_instance = MagicMock()
            mock_singleton_instance.get_proxy.return_value = mock_proxy
            mock_singleton.return_value = mock_singleton_instance
            test_data = {"key": "value", "number": 123}
            monitor._call_dyno_monitor(test_data)
            mock_proxy.enable_dyno_npu_monitor.assert_called_once()

    def test_clean_resource_with_process(self):
        from torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor import (
            DynamicProfilerMonitor,
        )

        monitor = DynamicProfilerMonitor()
        monitor._process = MagicMock()
        with patch.object(monitor, "_shared_loop_flag") as mock_flag:
            monitor.clean_resource()
            mock_flag.value = False
            monitor._process.join.assert_called_once()

    def test_shm_to_prof_conf_context_read_time_exception(self):
        from torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor import (
            DynamicProfilerMonitor,
        )

        monitor = DynamicProfilerMonitor()
        with patch.object(
            monitor._shm_obj, "read_bytes", side_effect=Exception("Read error")
        ):
            result = monitor.shm_to_prof_conf_context()
            self.assertIsNone(result)

    def test_shm_to_prof_conf_context_none_shm(self):
        from torch_npu.profiler._dynamic_profiler._dynamic_profiler_monitor import (
            DynamicProfilerMonitor,
        )

        monitor = DynamicProfilerMonitor()
        monitor._shm_obj = None
        result = monitor.shm_to_prof_conf_context()
        self.assertIsNone(result)

    def test_start_while_profiler_active(self):
        cfg_json = copy.deepcopy(self.json_sample)
        cfg_json["prof_dir"] = self.active_rank_prof_dir
        with os.fdopen(os.open(self.cfg_path, self.flags, self.mode), "w") as f:
            json.dump(cfg_json, f, indent=4)

        dp.start(self.cfg_path)
        dp.start(self.cfg_path)
        self.assertIsNotNone(_DynamicProfile().prof)
        _DynamicProfile().prof.stop()
        _DynamicProfile().prof = None
        if os.path.exists(self.active_rank_prof_dir):
            PathManager.remove_path_safety(self.active_rank_prof_dir)

    def test_init_repeated_warning(self):
        dp.init(self.results_path)
        dp.init(self.results_path)
        self.assertTrue(_DynamicProfile().repeat_init)

    def test_step_time_calculation(self):
        dynamic_prof = _DynamicProfile()
        dynamic_prof.RECORD_TIME_STEP = 2
        dynamic_prof.cur_step = 1
        dynamic_prof._step_record_time = time.time()
        dynamic_prof.step()
        self.assertIsNotNone(dynamic_prof._step_time)

    def test_set_state(self):
        dynamic_prof = _DynamicProfile()
        dynamic_prof.cur_step = 7

        dynamic_prof.cur_step = 0
        dynamic_prof.set_state({"cur_step": 7})
        self.assertEqual(dynamic_prof.cur_step, 7)

        dynamic_prof.set_state({"cur_step": -1})
        self.assertEqual(dynamic_prof.cur_step, 7)

    def test_set_state_continue_step(self):
        dynamic_prof = _DynamicProfile()
        dynamic_prof.cur_step = 0
        dynamic_prof._dynamic_monitor = MagicMock()
        dynamic_prof._dynamic_monitor.shm_to_prof_conf_context.return_value = None
        dynamic_prof._step_mstx_range_id = 0
        dynamic_prof.set_state({"cur_step": 10})
        with patch("torch_npu.profiler.dynamic_profile.mstx.range_start", return_value=1):
            dynamic_prof.step()
            self.assertEqual(dynamic_prof.cur_step, 11)


if __name__ == "__main__":
    run_tests()