# Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.

"""System tests for StructuredAskUserRail and interrupt_helpers questions extraction.

Tests the integration of:
1. StructuredAskUserRail — init/uninit lifecycle, tool card schema, resolve_interrupt
2. interrupt_helpers._extract_questions_from_value — extraction from tool_args
3. interrupt_helpers.convert_interactions_to_ask_user_question — full conversion pipeline
4. init.prompts.ts prompt text — structured ask_user instructions present
"""

from __future__ import annotations

import json
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from openjiuwen.core.foundation.llm.schema.tool_call import ToolCall
from openjiuwen.core.single_agent.interrupt.response import (
    ToolCallInterruptRequest,
)
from openjiuwen.harness.rails.interrupt.ask_user_rail import AskUserPayload

from jiuwenswarm.agents.harness.common.rails.ask_user_rail import (
    EXTENDED_INPUT_PARAMS_CN,
    EXTENDED_INPUT_PARAMS_EN,
    StructuredAskUserRail,
    StructuredAskUserTool,
)
from jiuwenswarm.agents.harness.common.rails.interrupt.interrupt_helpers import (
    _extract_questions_from_value,
    convert_interactions_to_ask_user_question,
)

pytestmark = [pytest.mark.integration, pytest.mark.system]


# =====================================================================
# Helpers
# =====================================================================

def _make_tool_call(
    tool_call_id: str = "tc_001",
    arguments: dict | str | None = None,
) -> ToolCall:
    """Create a ToolCall with given arguments."""
    if arguments is None:
        arguments = {"query": "Update?"}
    if isinstance(arguments, dict):
        arguments = json.dumps(arguments)
    return ToolCall(id=tool_call_id, type="function", name="ask_user", arguments=arguments)


def _make_tcir(
    message: str = "Update?",
    tool_args: dict | str | None = None,
) -> ToolCallInterruptRequest:
    """Create a ToolCallInterruptRequest for testing."""
    if tool_args is None:
        tool_args = {"query": message}
    return ToolCallInterruptRequest(
        message=message,
        payload_schema={},
        tool_name="ask_user",
        tool_call_id="tc_001",
        tool_args=tool_args,
    )


def _make_mock_agent() -> MagicMock:
    """Create a mock agent with ability_manager and card."""
    agent = MagicMock()
    agent.ability_manager = MagicMock()
    agent.card = MagicMock()
    agent.card.id = "test_agent_001"
    return agent


# =====================================================================
# 1. StructuredAskUserTool Schema Tests
# =====================================================================

class TestStructuredAskUserToolSchema:
    """Verify the extended tool card schema for ask_user."""

    @staticmethod
    def test_en_schema_has_query_and_questions():
        """English schema must include both `query` and `questions` properties."""
        props = EXTENDED_INPUT_PARAMS_EN["properties"]
        assert "query" in props
        assert "questions" in props
        assert props["query"]["type"] == "string"
        assert props["questions"]["type"] == "array"

    @staticmethod
    def test_cn_schema_has_query_and_questions():
        """Chinese schema must include both `query` and `questions` properties."""
        props = EXTENDED_INPUT_PARAMS_CN["properties"]
        assert "query" in props
        assert "questions" in props

    @staticmethod
    def test_required_fields_only_query():
        """Only `query` is required; `questions` is optional."""
        assert EXTENDED_INPUT_PARAMS_EN["required"] == ["query"]
        assert EXTENDED_INPUT_PARAMS_CN["required"] == ["query"]

    @staticmethod
    def test_questions_item_schema_structure():
        """Each question item must have `question` (required) and optional
        `header`, `options`, `multi_select`.
        """

        from jiuwenswarm.agents.harness.common.rails.ask_user_rail import (
            _QUESTIONS_ITEM_SCHEMA,
        )
        props = _QUESTIONS_ITEM_SCHEMA["properties"]
        assert "question" in props
        assert "header" in props
        assert "options" in props
        assert "multi_select" in props
        assert _QUESTIONS_ITEM_SCHEMA["required"] == ["question"]

    @staticmethod
    def test_tool_card_name_is_ask_user():
        """Tool card name must be `ask_user` (same as original for compat)."""
        tool = StructuredAskUserTool(language="en")
        assert tool.card.name == "ask_user"

    @staticmethod
    def test_tool_card_input_params_match_en():
        """Tool card input_params should match EXTENDED_INPUT_PARAMS_EN."""
        tool = StructuredAskUserTool(language="en")
        assert tool.card.input_params == EXTENDED_INPUT_PARAMS_EN

    @staticmethod
    def test_tool_card_input_params_match_cn():
        """Tool card input_params should match EXTENDED_INPUT_PARAMS_CN."""
        tool = StructuredAskUserTool(language="cn")
        assert tool.card.input_params == EXTENDED_INPUT_PARAMS_CN


# =====================================================================
# 2. StructuredAskUserRail Lifecycle Tests
# =====================================================================

class TestStructuredAskUserRailLifecycle:
    """Verify rail init/uninit lifecycle with mock agent."""

    @staticmethod
    def test_init_registers_tool_in_ability_manager():
        """init() must register the tool card in agent.ability_manager."""
        rail = StructuredAskUserRail()
        agent = _make_mock_agent()

        with patch("openjiuwen.harness.rails.interrupt.ask_user_rail.resolve_language", return_value="en"):
            rail.init(agent)

        agent.ability_manager.add.assert_called_once()
        added_card = agent.ability_manager.add.call_args[0][0]
        assert added_card.name == "ask_user"

    @staticmethod
    def test_uninit_removes_tool_from_ability_manager():
        """uninit() must remove the tool from agent.ability_manager."""
        rail = StructuredAskUserRail()
        agent = _make_mock_agent()

        with patch("openjiuwen.harness.rails.interrupt.ask_user_rail.resolve_language", return_value="en"):
            rail.init(agent)
            rail.uninit(agent)

        agent.ability_manager.remove.assert_called_once_with("ask_user")

    @staticmethod
    def test_init_uninit_clears_structured_tools():
        """uninit() must clear the internal _structured_tools list."""
        rail = StructuredAskUserRail()
        agent = _make_mock_agent()

        with patch("openjiuwen.harness.rails.interrupt.ask_user_rail.resolve_language", return_value="en"):
            rail.init(agent)
            assert len(rail.get_structured_tools()) == 1

            rail.uninit(agent)
            assert len(rail.get_structured_tools()) == 0

    @staticmethod
    def test_tool_names_default_is_ask_user():
        """Default tool_names should be {'ask_user'}."""
        rail = StructuredAskUserRail()
        assert rail.get_tools() == {"ask_user"}


# =====================================================================
# 3. StructuredAskUserRail _extract_questions Tests
# =====================================================================

class TestStructuredAskUserRailExtractQuestions:
    """Verify _extract_questions method parses tool call arguments correctly."""

    @staticmethod
    def test_extract_questions_from_dict_args():
        """Should extract questions from dict arguments."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={
            "query": "Update?",
            "questions": [
                {"question": "Apply update?", "header": "Update",
                 "options": [{"label": "Apply", "description": "apply"}]},
            ],
        })
        result = rail.extract_questions(tc)
        assert result is not None
        assert len(result) == 1
        assert result[0]["question"] == "Apply update?"

    @staticmethod
    def test_extract_questions_from_string_args():
        """Should extract questions from JSON string arguments."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments=json.dumps({
            "query": "Update?",
            "questions": [{"question": "Q1", "header": "H1"}],
        }))
        result = rail.extract_questions(tc)
        assert result is not None
        assert len(result) == 1

    @staticmethod
    def test_extract_questions_returns_none_for_plain_query():
        """Should return None for a plain query (no questions)."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={"query": "What is your role?"})
        result = rail.extract_questions(tc)
        assert result is None

    @staticmethod
    def test_extract_questions_returns_none_for_none_tool_call():
        """Should return None when tool_call is None."""
        rail = StructuredAskUserRail()
        result = rail.extract_questions(None)
        assert result is None


# =====================================================================
# 4. interrupt_helpers._extract_questions_from_value Tests
# =====================================================================

class TestExtractQuestionsFromValue:
    """Verify _extract_questions_from_value handles all extraction paths."""

    @staticmethod
    def test_dict_value_obj_with_questions():
        """Should extract questions from a dict value_obj."""
        result = _extract_questions_from_value({
            "questions": [{"question": "Q1", "header": "H1"}],
        })
        assert result is not None
        assert len(result) == 1

    @staticmethod
    def test_tcir_with_questions_in_tool_args_dict():
        """Should extract questions from ToolCallInterruptRequest.tool_args (dict)."""
        tcir = _make_tcir(tool_args={
            "query": "Update?",
            "questions": [
                {"question": "Apply?", "header": "Update",
                 "options": [{"label": "Apply", "description": "apply"}]},
            ],
        })
        result = _extract_questions_from_value(tcir)
        assert result is not None
        assert result[0]["question"] == "Apply?"

    @staticmethod
    def test_tcir_with_questions_in_tool_args_json_string():
        """Should extract questions from ToolCallInterruptRequest.tool_args (JSON string)."""
        tcir = _make_tcir(tool_args=json.dumps({
            "query": "Update?",
            "questions": [{"question": "Apply?", "header": "Update"}],
        }))
        result = _extract_questions_from_value(tcir)
        assert result is not None
        assert result[0]["question"] == "Apply?"

    @staticmethod
    def test_tcir_plain_query_returns_none():
        """Should return None for plain query (no questions in tool_args)."""
        tcir = _make_tcir(tool_args={"query": "What is your role?"})
        result = _extract_questions_from_value(tcir)
        assert result is None

    @staticmethod
    def test_tcir_invalid_json_string_returns_none():
        """Should return None for tool_args that is invalid JSON."""
        tcir = _make_tcir(tool_args="not valid json{{{")
        result = _extract_questions_from_value(tcir)
        assert result is None

    @staticmethod
    def test_tcir_json_string_without_questions_returns_none():
        """Should return None for JSON string tool_args without questions field."""
        tcir = _make_tcir(tool_args=json.dumps({"query": "role?"}))
        result = _extract_questions_from_value(tcir)
        assert result is None

    @staticmethod
    def test_empty_questions_list_returns_none():
        """Should return None for an empty questions list."""
        tcir = _make_tcir(tool_args={"query": "Q?", "questions": []})
        result = _extract_questions_from_value(tcir)
        assert result is None

    @staticmethod
    def test_direct_questions_attribute_on_object():
        """Should extract questions from hasattr path (questions attribute)."""
        obj = MagicMock()
        obj.questions = [{"question": "Q1", "header": "H1"}]
        # Remove tool_args to ensure it goes through the hasattr path
        del obj.tool_args
        result = _extract_questions_from_value(obj)
        assert result is not None
        assert len(result) == 1

    @staticmethod
    def test_tool_args_takes_priority_over_direct_attribute():
        """If both direct questions and tool_args.questions exist, direct path wins."""
        tcir = _make_tcir(tool_args={
            "query": "Q?",
            "questions": [{"question": "from_tool_args", "header": "TA"}],
        })
        # ToolCallInterruptRequest does NOT have .questions attribute
        # so only tool_args path will be hit
        result = _extract_questions_from_value(tcir)
        assert result is not None
        assert result[0]["question"] == "from_tool_args"


# =====================================================================
# 5. convert_interactions_to_ask_user_question Full Pipeline
# =====================================================================

class TestConvertInteractionsToAskUserQuestion:
    """Verify the full conversion pipeline from TCIR to frontend event."""

    @staticmethod
    def test_structured_questions_produce_ask_user_interrupt():
        """Structured questions in tool_args should produce source=ask_user_interrupt."""
        tcir = _make_tcir(tool_args={
            "query": "Update?",
            "questions": [
                {"question": "Apply update?", "header": "Update",
                 "options": [{"label": "Apply", "description": "apply"},
                             {"label": "Skip", "description": "skip"}],
                 "multi_select": False},
            ],
        })

        # Wrap in InteractionOutput-like structure
        interaction = MagicMock()
        interaction.id = "req_001"
        interaction.value = tcir

        result = convert_interactions_to_ask_user_question([interaction])
        assert result is not None
        assert result["event_type"] == "chat.ask_user_question"
        assert result["source"] == "ask_user_interrupt"
        assert len(result["questions"]) == 1
        q = result["questions"][0]
        assert q["question"] == "Apply update?"
        assert q["header"] == "Update"
        # Options should include original 2 + "Other" appended by _build_multi_questions
        assert len(q["options"]) == 3
        assert q["options"][0]["label"] == "Apply"
        assert q["options"][1]["label"] == "Skip"
        assert q["options"][2]["label"] == "Other"

    @staticmethod
    def test_plain_query_produce_permission_interrupt():
        """Plain query (no questions) should produce source=permission_interrupt."""
        tcir = _make_tcir(tool_args={"query": "What is your role?"})

        interaction = MagicMock()
        interaction.id = "req_002"
        interaction.value = tcir

        result = convert_interactions_to_ask_user_question([interaction])
        assert result is not None
        assert result["source"] == "permission_interrupt"

    @staticmethod
    def test_empty_state_outputs_returns_none():
        """Empty state_outputs should return None."""
        result = convert_interactions_to_ask_user_question([])
        assert result is None

    @staticmethod
    def test_dict_interaction_with_questions_in_value():
        """Dict-format interaction should also work."""
        result = convert_interactions_to_ask_user_question([
            {
                "id": "req_003",
                "value": {
                    "query": "Update?",
                    "questions": [{"question": "Apply?", "header": "Upd",
                                   "options": [{"label": "Yes"}]}],
                },
            }
        ])
        assert result is not None
        assert result["source"] == "ask_user_interrupt"


# =====================================================================
# 6. StructuredAskUserRail resolve_interrupt Tests
# =====================================================================

class TestStructuredAskUserRailResolveInterrupt:
    """Verify resolve_interrupt handles structured and plain answers."""

    @staticmethod
    @pytest.mark.asyncio
    async def test_none_user_input_returns_interrupt():
        """When user_input is None, should return interrupt (first-time call)."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={
            "query": "Update?",
            "questions": [{"question": "Apply?", "header": "Upd",
                          "options": [{"label": "Apply"}]}],
        })
        ctx = MagicMock()

        decision = await rail.resolve_interrupt(ctx, tc, None)

        # Should be an InterruptResult
        from openjiuwen.harness.rails.interrupt.interrupt_base import InterruptResult
        assert isinstance(decision, InterruptResult)

    @staticmethod
    @pytest.mark.asyncio
    async def test_structured_answer_dict_returns_reject():
        """Structured answer as dict should return RejectResult with formatted text."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={
            "query": "Update?",
            "questions": [{"question": "Apply update?", "header": "Update",
                          "options": [{"label": "Apply update"}, {"label": "Skip"}]}],
        })
        ctx = MagicMock()

        # Simulate user selecting "Apply update"
        user_input = {"answers": {"Apply update?": "Apply update"}}
        decision = await rail.resolve_interrupt(ctx, tc, user_input)

        from openjiuwen.harness.rails.interrupt.interrupt_base import RejectResult
        assert isinstance(decision, RejectResult)
        assert "Apply update" in decision.tool_result

    @staticmethod
    @pytest.mark.asyncio
    async def test_structured_answer_string_fallback():
        """String answer for a structured question should be handled as free-text."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={
            "query": "Update?",
            "questions": [{"question": "Apply?", "header": "Upd"}],
        })
        ctx = MagicMock()

        decision = await rail.resolve_interrupt(ctx, tc, "I want to customize")

        from openjiuwen.harness.rails.interrupt.interrupt_base import RejectResult
        assert isinstance(decision, RejectResult)
        assert "I want to customize" in decision.tool_result

    @staticmethod
    @pytest.mark.asyncio
    async def test_plain_query_delegates_to_parent():
        """Plain query (no questions) should delegate to parent AskUserRail."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={"query": "What is your role?"})
        ctx = MagicMock()

        # AskUserPayload changed: answer (str) → answers (dict)
        # Construct payload compatible with both old and new upstream versions
        if "answer" in AskUserPayload.model_fields:
            user_input = AskUserPayload(answer="I am a developer")
        else:
            user_input = AskUserPayload(answers={"What is your role?": "I am a developer"})
        decision = await rail.resolve_interrupt(ctx, tc, user_input)

        from openjiuwen.harness.rails.interrupt.interrupt_base import RejectResult
        assert isinstance(decision, RejectResult)
        assert "I am a developer" in decision.tool_result

    @staticmethod
    @pytest.mark.asyncio
    async def test_structured_answer_with_multiple_questions():
        """Multiple structured questions answered should format all answers."""
        rail = StructuredAskUserRail()
        tc = _make_tool_call(arguments={
            "query": "Setup info",
            "questions": [
                {"question": "Branch naming?", "header": "Branch"},
                {"question": "Test runner?", "header": "Test"},
            ],
        })
        ctx = MagicMock()

        user_input = {
            "answers": {
                "Branch naming?": "feature/*",
                "Test runner?": "pytest",
            },
        }
        decision = await rail.resolve_interrupt(ctx, tc, user_input)

        from openjiuwen.harness.rails.interrupt.interrupt_base import RejectResult
        assert isinstance(decision, RejectResult)
        assert "Branch naming?" in decision.tool_result
        assert "feature/*" in decision.tool_result
        assert "Test runner?" in decision.tool_result
        assert "pytest" in decision.tool_result


# =====================================================================
# 7. init.prompts.ts Prompt Text Tests (read source file directly)
# =====================================================================

_INIT_PROMPTS_TS_PATH = (
    Path(__file__).parent.parent.parent
    / "jiuwenswarm"
    / "cli"
    / "src"
    / "core"
    / "commands"
    / "builtins"
    / "init.prompts.ts"
)


class TestInitPromptStructuredAskUser:
    """Verify the /init prompt text instructs structured ask_user usage.

    These tests read the TypeScript source file directly rather than importing,
    since init.prompts.ts is a TypeScript module not importable by Python.
    """

    @staticmethod
    def _read_prompts_ts() -> str:
        """Read the init.prompts.ts source file."""
        if not _INIT_PROMPTS_TS_PATH.exists():
            pytest.skip("init.prompts.ts not found at expected path")
        return _INIT_PROMPTS_TS_PATH.read_text(encoding="utf-8")

    def test_en_prompt_contains_ask_user_questions_parameter(self):
        """EN prompt must instruct LLM to use `ask_user` with `questions`."""
        content = self._read_prompts_ts()
        assert "questions" in content
        assert "ask_user" in content
        # Must NOT contain the old conditional language
        assert "If `ask_user` supports" not in content

    def test_zh_prompt_contains_ask_user_questions_parameter(self):
        """ZH prompt must instruct LLM to use `ask_user` with `questions`."""
        content = self._read_prompts_ts()
        assert "questions" in content
        assert "ask_user" in content
        # Must NOT contain the old conditional language
        assert "若 `ask_user` 支持" not in content

    def test_en_prompt_contains_apply_update_skip_options(self):
        """EN prompt must mention 'Apply update' / 'Skip' as concrete options."""
        content = self._read_prompts_ts()
        assert "Apply update" in content
        assert "Skip (keep current)" in content

    def test_zh_prompt_contains_apply_update_skip_options(self):
        """ZH prompt must mention '应用更新' / '跳过' as concrete options."""
        content = self._read_prompts_ts()
        assert "应用更新" in content
        assert "跳过" in content

    def test_en_step3_has_questions_usage_example(self):
        """EN Step 3 must include ask_user questions usage example."""
        content = self._read_prompts_ts()
        assert "multi_select" in content
        # The example should show the questions parameter structure
        assert "header" in content

    def test_zh_step3_has_questions_usage_example(self):
        """ZH Step 3 must include ask_user questions usage example."""
        content = self._read_prompts_ts()
        assert "multi_select" in content
        assert "header" in content