#!/usr/bin/python3
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------
# Copyright (c) 2026 Huawei Technologies Co., Ltd.
# This file is part of the MindStudio project.
#
# MindStudio is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
#    http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# -------------------------------------------------------------------------

from pathlib import Path
from types import SimpleNamespace
from typing import Any

import pytest
from prompt_toolkit.layout.containers import Window

from msagent.cli.core.tool_output import ToolOutputEntry
from msagent.cli.dispatchers.commands import CommandDispatcher
from msagent.cli.handlers.tool_outputs import ToolOutputHandler
from msagent.configs import ApprovalMode


def _build_context() -> SimpleNamespace:
    return SimpleNamespace(
        agent="msagent",
        model="default",
        model_display=None,
        thread_id="thread-1",
        working_dir=Path.cwd(),
        approval_mode=ApprovalMode.SEMI_ACTIVE,
        recursion_limit=80,
        bash_mode=False,
        current_input_tokens=None,
        current_output_tokens=None,
        context_window=128000,
    )


@pytest.mark.asyncio
async def test_tool_output_handler_warns_when_no_expandable_output(monkeypatch) -> None:
    printed = []
    monkeypatch.setattr("msagent.cli.handlers.tool_outputs.console.print_warning", printed.append)
    monkeypatch.setattr("msagent.cli.handlers.tool_outputs.console.print", lambda *args, **kwargs: None)

    session = SimpleNamespace(context=_build_context(), latest_tool_output=None)
    handler = ToolOutputHandler(session)

    await handler.handle()

    assert printed == ["No expandable tool output available yet"]


@pytest.mark.asyncio
async def test_tool_output_handler_uses_expandable_entries_and_opens_viewer(
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    captured_kwargs: dict[str, Any] = {}
    run_calls: list[str] = []

    class FakeApp:
        async def run_async(self) -> None:
            run_calls.append("run")

    def fake_create_selector_application(**kwargs: Any) -> FakeApp:
        captured_kwargs.update(kwargs)
        return FakeApp()

    monkeypatch.setattr(
        "msagent.cli.handlers.tool_outputs.create_selector_application",
        fake_create_selector_application,
    )

    session = SimpleNamespace(
        context=_build_context(),
        tool_outputs=[
            ToolOutputEntry(
                tool_call_id="call-1",
                tool_name="read_file",
                preview_content="short",
                full_content="short",
            ),
            ToolOutputEntry(
                tool_call_id="call-2",
                tool_name="run_command",
                preview_content="preview-2",
                full_content="full-2\nline-2",
            ),
        ],
        latest_tool_output=None,
    )
    handler = ToolOutputHandler(session)

    await handler.handle()

    assert run_calls == ["run"]
    assert captured_kwargs["context"] is session.context
    assert captured_kwargs["full_screen"] is True
    assert captured_kwargs["mouse_support"] is True
    assert isinstance(captured_kwargs["content_window"], Window)
    assert captured_kwargs["header_windows"]


@pytest.mark.asyncio
async def test_tool_output_handler_falls_back_to_latest_expandable_output(
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    captured_kwargs: dict[str, Any] = {}
    run_calls: list[str] = []

    class FakeApp:
        async def run_async(self) -> None:
            run_calls.append("run")

    def fake_create_selector_application(**kwargs: Any) -> FakeApp:
        captured_kwargs.update(kwargs)
        return FakeApp()

    monkeypatch.setattr(
        "msagent.cli.handlers.tool_outputs.create_selector_application",
        fake_create_selector_application,
    )

    latest = ToolOutputEntry(
        tool_call_id="call-latest",
        tool_name="run_command",
        preview_content="preview-latest",
        full_content="full-latest",
    )
    session = SimpleNamespace(
        context=_build_context(),
        tool_outputs=[],
        latest_tool_output=latest,
    )
    handler = ToolOutputHandler(session)

    await handler.handle()

    assert run_calls == ["run"]
    assert captured_kwargs["context"] is session.context


@pytest.mark.asyncio
async def test_tool_output_handler_ignores_non_expandable_latest_output(
    monkeypatch: pytest.MonkeyPatch,
) -> None:
    printed = []
    monkeypatch.setattr("msagent.cli.handlers.tool_outputs.console.print_warning", printed.append)
    monkeypatch.setattr("msagent.cli.handlers.tool_outputs.console.print", lambda *args, **kwargs: None)

    latest = ToolOutputEntry(
        tool_call_id="call-1",
        tool_name="read_file",
        preview_content="same",
        full_content="same",
    )
    session = SimpleNamespace(
        context=_build_context(),
        tool_outputs=[],
        latest_tool_output=latest,
    )
    handler = ToolOutputHandler(session)

    await handler.handle()

    assert printed == ["No expandable tool output available yet"]


def test_command_dispatcher_registers_tool_output_command() -> None:
    session = SimpleNamespace(
        context=_build_context(),
        renderer=SimpleNamespace(render_help=lambda *_args, **_kwargs: None),
        prompt=SimpleNamespace(hotkeys={}),
        running=True,
        update_context=lambda **_kwargs: None,
        clear_tool_output=lambda: None,
    )
    dispatcher = CommandDispatcher(session)

    assert "/tool-output" in dispatcher.commands


def test_session_like_tool_output_list_keeps_multiple_entries() -> None:
    tool_outputs: list[ToolOutputEntry] = []
    latest_tool_output = None

    def remember(entry: ToolOutputEntry) -> None:
        nonlocal latest_tool_output
        for index, existing in enumerate(tool_outputs):
            if existing.tool_call_id and existing.tool_call_id == entry.tool_call_id:
                entry.sequence = existing.sequence
                tool_outputs[index] = entry
                latest_tool_output = entry
                return
        entry.sequence = len(tool_outputs) + 1
        tool_outputs.append(entry)
        latest_tool_output = entry

    remember(
        ToolOutputEntry(
            tool_call_id="call-1",
            tool_name="run_command",
            preview_content="preview-1",
            full_content="full-1",
        )
    )
    remember(
        ToolOutputEntry(
            tool_call_id="call-2",
            tool_name="read_file",
            preview_content="preview-2",
            full_content="full-2",
        )
    )

    assert [entry.tool_call_id for entry in tool_outputs] == ["call-1", "call-2"]
    assert latest_tool_output is tool_outputs[-1]
    assert [entry.sequence for entry in tool_outputs] == [1, 2]


def test_tool_output_handler_body_lines_include_tool_name_and_args() -> None:
    entry = ToolOutputEntry(
        tool_call_id="call-1",
        tool_name="execute",
        preview_content="preview output",
        full_content="full output",
        tool_args={
            "cwd": "/tmp/project",
            "command": "bash scripts/run.sh --flag value",
        },
    )

    lines = ToolOutputHandler._build_body_lines(entry, width=120)

    assert lines[:6] == [
        "Tool: execute",
        "Args:",
        "  command: bash scripts/run.sh --flag value",
        "  cwd: /tmp/project",
        "",
        "Output:",
    ]
    assert lines[6] == "preview output"


def test_stringify_tool_arg_converts_dict_to_json() -> None:
    result = ToolOutputHandler._stringify_tool_arg({"key": "value", "num": 42})
    assert '"key"' in result
    assert '"value"' in result
    assert "42" in result


def test_stringify_tool_arg_converts_list_to_json() -> None:
    result = ToolOutputHandler._stringify_tool_arg([1, 2, 3])
    assert "1" in result
    assert "2" in result
    assert "3" in result


def test_stringify_tool_arg_normalizes_crlf_in_strings() -> None:
    result = ToolOutputHandler._stringify_tool_arg("line1\r\nline2")
    assert result == "line1\nline2"


def test_stringify_tool_arg_converts_non_string_types() -> None:
    assert ToolOutputHandler._stringify_tool_arg(42) == "42"
    assert ToolOutputHandler._stringify_tool_arg(True) == "True"


def test_wrap_block_preserves_line_breaks() -> None:
    text = "first line\nsecond line\nthird line"
    result = ToolOutputHandler._wrap_block(text, width=80)
    assert "first line" in result
    assert "second line" in result
    assert "third line" in result


def test_wrap_block_returns_empty_for_empty_input() -> None:
    result = ToolOutputHandler._wrap_block("", width=80)
    assert result == [""]


def test_wrap_block_applies_initial_and_subsequent_indent() -> None:
    text = "a long line that needs wrapping because it exceeds the specified width"
    result = ToolOutputHandler._wrap_block(text, width=20, initial_indent="  key: ", subsequent_indent="       ")
    assert result[0].startswith("  key: ")
    for line in result[1:]:
        assert line.startswith("       ")


def test_build_body_lines_includes_tool_name_and_args() -> None:
    entry = ToolOutputEntry(
        tool_call_id="call-1",
        tool_name="execute",
        preview_content="preview",
        full_content="full output\nline 2",
        tool_args={"command": "ls -la"},
    )
    lines = ToolOutputHandler._build_body_lines(entry, width=120)
    assert lines[0] == "Tool: execute"
    assert "Args:" in lines
    assert "  command: ls -la" in lines
    assert "Output:" in lines


def test_build_body_lines_uses_full_content_when_expanded() -> None:
    entry = ToolOutputEntry(
        tool_call_id="call-1",
        tool_name="run",
        preview_content="preview",
        full_content="expanded detailed output",
        tool_args={},
    )
    entry.expanded = True
    lines = ToolOutputHandler._build_body_lines(entry, width=120)
    assert "expanded detailed output" in lines


def test_build_body_lines_shows_empty_output_placeholder_when_no_content() -> None:
    entry = ToolOutputEntry(
        tool_call_id="call-1",
        tool_name="test",
        preview_content="",
        full_content="",
    )
    lines = ToolOutputHandler._build_body_lines(entry, width=120)
    assert "(empty output)" in lines