from __future__ import annotations

import json
import sys
import importlib.util
from pathlib import Path
from types import ModuleType, SimpleNamespace
from unittest.mock import AsyncMock

import pytest

pytestmark = [pytest.mark.integration, pytest.mark.system]

_BOOTSTRAP_PATH = (
        Path(__file__).resolve().parents[2]
        / "jiuwenswarm"
        / "agents"
        / "harness"
        / "team"
        / "remote_member_bootstrap.py"
)
_BOOTSTRAP_SPEC = importlib.util.spec_from_file_location(
    "test_remote_member_bootstrap_module",
    _BOOTSTRAP_PATH,
)
assert _BOOTSTRAP_SPEC is not None and _BOOTSTRAP_SPEC.loader is not None
bootstrap_module = importlib.util.module_from_spec(_BOOTSTRAP_SPEC)
_BOOTSTRAP_SPEC.loader.exec_module(bootstrap_module)


def _install_fake_openjiuwen_schema(monkeypatch: pytest.MonkeyPatch) -> None:
    import openjiuwen.agent_teams.schema.events as real_events
    import openjiuwen.agent_teams.schema.team as real_team

    events_module = ModuleType("openjiuwen.agent_teams.schema.events")
    events_module.__dict__.update(real_events.__dict__)

    team_module = ModuleType("openjiuwen.agent_teams.schema.team")
    team_module.__dict__.update(real_team.__dict__)

    monkeypatch.setitem(sys.modules, "openjiuwen.agent_teams.schema.events", events_module)
    monkeypatch.setitem(sys.modules, "openjiuwen.agent_teams.schema.team", team_module)


def _bootstrap_event(*, message_id: str, from_member: str, to_member: str) -> SimpleNamespace:
    return SimpleNamespace(
        event_type="message",
        payload={
            "message_id": message_id,
            "from_member_name": from_member,
            "to_member_name": to_member,
        },
    )


def _bootstrap_envelope_json(*, member_name: str, dataset: str, service_id: str) -> str:
    return json.dumps(
        {
            "type": "jiuwen.remote_teammate_bootstrap",
            "version": 1,
            "member_name": member_name,
            "leader_member_name": "leader_1",
            "leader_agent_id": "leader-agent-id",
            "leader_direct_addr": "tcp://127.0.0.1:18555",
            # These two fields are intentionally present but should be ignored
            # since runtime now reads dataset/service_id only from local deep_agent.
            "a2x_dataset": dataset,
            "a2x_service_id": service_id,
        },
        ensure_ascii=False,
    )


def _make_team_agent(
        *,
        deep_agent: object,
        envelope_content: str,
        target_member: str,
) -> tuple[SimpleNamespace, list]:
    listeners: list = []
    mm = SimpleNamespace(
        mark_message_read=AsyncMock(),
        send_message=AsyncMock(return_value="ack-1"),
    )
    team_agent = SimpleNamespace(
        role="teammate",
        deep_agent=deep_agent,
        _member_name=lambda: "teammate_local",
        _team_name=lambda: "team_demo",
        _messager=SimpleNamespace(register_peer=lambda _cfg: None),
        team_backend=SimpleNamespace(
            db=SimpleNamespace(
                get_message=AsyncMock(return_value=SimpleNamespace(content=envelope_content))
            )
        ),
        message_manager=mm,
    )

    def _add_event_listener(cb):
        listeners.append(cb)

    team_agent.add_event_listener = _add_event_listener
    return team_agent, listeners


@pytest.mark.asyncio
async def test_teammate_bootstrap_replaces_card_using_local_dataset_service_id(
        monkeypatch: pytest.MonkeyPatch,
) -> None:
    _install_fake_openjiuwen_schema(monkeypatch)
    monkeypatch.setattr(bootstrap_module, "processed_message_ids", set(), raising=False)
    monkeypatch.setattr(
        "jiuwenswarm.common.config.get_config",
        lambda: {"team": {"runtime": {"mode": "distributed", "role": "teammate"}}},
    )
    monkeypatch.setattr(bootstrap_module, "_apply_leader_route_from_envelope", lambda *_a, **_k: True)

    client = SimpleNamespace(replace_agent_card=AsyncMock(return_value=SimpleNamespace(service_id="sid-local")))
    deep_agent = SimpleNamespace(
        _jiuwen_a2x_client=client,
        _jiuwen_a2x_blank_dataset="team_pool_local",
        _jiuwen_a2x_blank_service_id="sid-local",
    )
    target_member = "teammate_1"
    team_agent, listeners = _make_team_agent(
        deep_agent=deep_agent,
        envelope_content=_bootstrap_envelope_json(
            member_name=target_member,
            dataset="envelope_ds_should_be_ignored",
            service_id="envelope_sid_should_be_ignored",
        ),
        target_member=target_member,
    )

    bootstrap_module.attach_remote_teammate_bootstrap_listener(
        team_agent,
        session_id="sess_bootstrap_replace",
    )
    assert len(listeners) == 1

    await listeners[0](
        _bootstrap_event(
            message_id="msg-1",
            from_member="leader_1",
            to_member=target_member,
        )
    )

    client.replace_agent_card.assert_awaited_once()
    call_args = client.replace_agent_card.await_args
    assert call_args.args[0] == "team_pool_local"
    assert call_args.args[1] == "sid-local"
    assert call_args.args[2]["name"] == target_member


@pytest.mark.asyncio
async def test_teammate_bootstrap_raises_when_local_dataset_service_id_missing(
        monkeypatch: pytest.MonkeyPatch,
) -> None:
    _install_fake_openjiuwen_schema(monkeypatch)
    monkeypatch.setattr(bootstrap_module, "processed_message_ids", set(), raising=False)
    monkeypatch.setattr(
        "jiuwenswarm.common.config.get_config",
        lambda: {"team": {"runtime": {"mode": "distributed", "role": "teammate"}}},
    )
    monkeypatch.setattr(bootstrap_module, "_apply_leader_route_from_envelope", lambda *_a, **_k: True)

    client = SimpleNamespace(replace_agent_card=AsyncMock())
    deep_agent = SimpleNamespace(
        _jiuwen_a2x_client=client,
        _jiuwen_a2x_blank_dataset="",
        _jiuwen_a2x_blank_service_id="",
    )
    target_member = "teammate_1"
    team_agent, listeners = _make_team_agent(
        deep_agent=deep_agent,
        envelope_content=_bootstrap_envelope_json(
            member_name=target_member,
            dataset="ignored_ds",
            service_id="ignored_sid",
        ),
        target_member=target_member,
    )

    bootstrap_module.attach_remote_teammate_bootstrap_listener(team_agent, session_id="sess_missing_ids")
    assert len(listeners) == 1

    with pytest.raises(ValueError, match="missing required dataset/service_id/member_name"):
        await listeners[0](
            _bootstrap_event(
                message_id="msg-2",
                from_member="leader_1",
                to_member=target_member,
            )
        )


@pytest.mark.asyncio
async def test_teammate_bootstrap_raises_when_local_a2x_client_missing(
        monkeypatch: pytest.MonkeyPatch,
) -> None:
    _install_fake_openjiuwen_schema(monkeypatch)
    monkeypatch.setattr(bootstrap_module, "processed_message_ids", set(), raising=False)
    monkeypatch.setattr(
        "jiuwenswarm.common.config.get_config",
        lambda: {"team": {"runtime": {"mode": "distributed", "role": "teammate"}}},
    )
    monkeypatch.setattr(bootstrap_module, "_apply_leader_route_from_envelope", lambda *_a, **_k: True)

    deep_agent = SimpleNamespace(
        _jiuwen_a2x_client=None,
        _jiuwen_a2x_blank_dataset="team_pool_local",
        _jiuwen_a2x_blank_service_id="sid-local",
    )
    target_member = "teammate_1"
    team_agent, listeners = _make_team_agent(
        deep_agent=deep_agent,
        envelope_content=_bootstrap_envelope_json(
            member_name=target_member,
            dataset="ignored_ds",
            service_id="ignored_sid",
        ),
        target_member=target_member,
    )

    bootstrap_module.attach_remote_teammate_bootstrap_listener(team_agent, session_id="sess_missing_client")
    assert len(listeners) == 1

    with pytest.raises(RuntimeError, match="missing A2X client"):
        await listeners[0](
            _bootstrap_event(
                message_id="msg-3",
                from_member="leader_1",
                to_member=target_member,
            )
        )


@pytest.mark.asyncio
async def test_shutdown_cleanup_scheduler_deletes_team_session_and_pushes_notice(
        monkeypatch: pytest.MonkeyPatch,
) -> None:
    cleanup_tasks = getattr(bootstrap_module, "".join(["_SHUTDOWN", "_CLEANUP_TASKS"]))
    cleanup_tasks.clear()

    async def fake_sleep(_delay: float) -> None:
        return None

    manager = SimpleNamespace(delete_session_runtime=AsyncMock(return_value=True))
    notices: list[dict] = []

    async def fake_push_shutdown_cleanup_notice(**kwargs) -> None:
        notices.append(kwargs)

    monkeypatch.setattr(
        "jiuwenswarm.common.config.get_config",
        lambda: {"team": {"runtime": {"mode": "distributed", "role": "leader"}}},
    )
    monkeypatch.setattr(bootstrap_module.asyncio, "sleep", fake_sleep)
    monkeypatch.setattr(
        "jiuwenswarm.agents.harness.team.get_team_manager",
        lambda channel_id: manager,
    )
    monkeypatch.setattr(
        bootstrap_module,
        "".join(["_push", "_shutdown_cleanup_notice"]),
        fake_push_shutdown_cleanup_notice,
    )

    from openjiuwen.agent_teams.schema.status import MemberStatus
    from openjiuwen.agent_teams.schema.team import TeamRole
    from openjiuwen.core.runner import Runner

    class _Result:
        success = True

    class _ShutdownMemberTool:
        async def invoke(self, inputs, **kwargs):
            return _Result()

    tool = _ShutdownMemberTool()
    monkeypatch.setattr(
        Runner,
        "resource_mgr",
        SimpleNamespace(get_tool=lambda *_args, **_kwargs: tool),
    )
    team_agent = SimpleNamespace(
        role=TeamRole.LEADER,
        deep_agent=SimpleNamespace(
            ability_manager=SimpleNamespace(
                list=lambda: [SimpleNamespace(id="team.shutdown_member", name="shutdown_member")]
            ),
            card=SimpleNamespace(id="leader-card"),
        ),
        team_backend=SimpleNamespace(
            list_members=AsyncMock(
                return_value=[
                    SimpleNamespace(
                        member_name="teammate-1",
                        status=MemberStatus.SHUTDOWN.value,
                    )
                ]
            )
        ),
    )

    try:
        bootstrap_module.attach_shutdown_member_remote_cleanup_wrapper(
            team_agent,
            session_id="sess-shutdown",
            channel_id="web",
        )
        await tool.invoke({"member_name": "teammate-1"})
        cleaned = await bootstrap_module.wait_for_pending_shutdown_cleanup_for_session(
            "sess-shutdown",
            timeout=1.0,
        )
    finally:
        cleanup_tasks.clear()

    assert cleaned is True
    manager.delete_session_runtime.assert_awaited_once_with(
        "sess-shutdown",
        reason="team.shutdown_all_members: ",
    )
    assert notices == [
        {
            "session_id": "sess-shutdown",
            "channel_id": "web",
            "deleted": True,
        }
    ]