import json
import logging
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from openjiuwen.core.common.constants.constant import INTERACTIVE_INPUT
from openjiuwen.core.context_engine.base import ModelContext
from openjiuwen.core.session.node import Session
from openjiuwen_deepsearch.framework.openjiuwen.agent.main_graph_nodes import (
OutlineInteractionNode,
DependencyOutlineInteractionNode,
)
from openjiuwen_deepsearch.framework.openjiuwen.agent.search_context import OutlineInteraction
from openjiuwen_deepsearch.utils.constants_utils.node_constants import NodeId
from openjiuwen_deepsearch.utils.common_utils.stream_utils import StreamEvent
logger = logging.getLogger(__name__)
@pytest.fixture
def outline_interaction_node():
"""创建 OutlineInteractionNode 实例"""
return OutlineInteractionNode()
@pytest.fixture
def mock_session():
"""创建 Mock Session"""
session = MagicMock(spec=Session)
session.get_global_state = MagicMock(return_value=None)
session.update_global_state = MagicMock()
session.update_state = MagicMock()
session.interact = AsyncMock()
session.write_custom_stream = AsyncMock()
return session
@pytest.fixture
def mock_context():
"""创建 Mock ModelContext"""
return MagicMock(spec=ModelContext)
@pytest.fixture
def default_config():
"""默认配置"""
return {
"feedback_mode": "cmd",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
def setup_session_state(
session, config=None, history_outlines=None, current_outline=None, outline_interactions=None
):
"""设置 Session 状态"""
def get_state_side_effect(key):
if key == "config.workflow_feedback_mode":
return config.get("feedback_mode", "cmd") if config else "cmd"
elif key == "config.outline_interaction_enabled":
return config.get("outline_interaction_enabled", True) if config else True
elif key == "config.outline_interaction_max_rounds":
return config.get("outline_interaction_max_rounds", 5) if config else 5
elif key == "search_context.history_outlines":
return history_outlines if history_outlines else []
elif key == "search_context.current_outline":
return current_outline
elif key == "search_context.outline_interactions":
return outline_interactions if outline_interactions else []
return None
session.get_global_state.side_effect = get_state_side_effect
class TestOutlineInteractionNodeCoreFlow:
"""核心流程测试"""
@pytest.mark.asyncio
async def test_interaction_disabled_skip_to_editor_team(
self, outline_interaction_node, mock_session, mock_context
):
"""交互功能禁用时,应直接跳转到 EDITOR_TEAM"""
config = {
"feedback_mode": "cmd",
"outline_interaction_enabled": False,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config)
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.EDITOR_TEAM.value
mock_session.interact.assert_not_called()
@pytest.mark.asyncio
async def test_max_rounds_reached_skip_to_editor_team(
self, outline_interaction_node, mock_session, mock_context
):
"""达到最大轮数时,通知用户并跳转到 EDITOR_TEAM"""
config = {
"feedback_mode": "cmd",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 3,
}
outline_interactions = [
OutlineInteraction(feedback="comment1", interaction_mode="revise_comment"),
OutlineInteraction(feedback="comment2", interaction_mode="revise_comment"),
OutlineInteraction(feedback="comment3", interaction_mode="revise_comment"),
]
setup_session_state(
mock_session, config=config, outline_interactions=outline_interactions
)
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.EDITOR_TEAM.value
mock_session.write_custom_stream.assert_called()
call_args = mock_session.write_custom_stream.call_args[0][0]
assert "Maximum interaction rounds reached" in call_args["content"]
@pytest.mark.asyncio
async def test_user_accepts_outline(
self, outline_interaction_node, mock_session, mock_context
):
"""用户接受大纲:action='accepted' → 跳转 EDITOR_TEAM"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config)
user_input = json.dumps({"interrupt_feedback": "accepted", "feedback": ""})
mock_session.interact.return_value = user_input
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.EDITOR_TEAM.value
calls = mock_session.update_global_state.call_args_list
outline_interactions_updated = any(
"search_context.outline_interactions" in str(call) for call in calls
)
assert not outline_interactions_updated, "接受大纲时不应保存交互记录"
@pytest.mark.asyncio
async def test_user_revise_with_comments(
self, outline_interaction_node, mock_session, mock_context
):
"""用户评论修改:action='revise_comment' → 跳转 OUTLINE"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
current_outline = {"title": "测试大纲", "sections": []}
setup_session_state(
mock_session,
config=config,
current_outline=current_outline,
)
user_input = json.dumps(
{"interrupt_feedback": "revise_comment", "feedback": "请增加更多细节"}
)
mock_session.interact.return_value = user_input
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.OUTLINE.value
calls = mock_session.update_global_state.call_args_list
interaction_update_found = False
for call in calls:
state_dict = call[0][0]
if "search_context.outline_interactions" in state_dict:
interactions = state_dict["search_context.outline_interactions"]
assert len(interactions) == 1
assert interactions[0].feedback == "请增加更多细节"
assert interactions[0].interaction_mode == "revise_comment"
assert interactions[0].outline_before == current_outline
interaction_update_found = True
assert interaction_update_found, "应该保存交互记录"
@pytest.mark.asyncio
async def test_user_revise_outline_directly(
self, outline_interaction_node, mock_session, mock_context
):
"""用户直接修改大纲:action='revise_outline' → 跳转 OUTLINE"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
current_outline = {"title": "旧大纲", "sections": []}
setup_session_state(
mock_session,
config=config,
current_outline=current_outline,
)
user_input = json.dumps(
{
"interrupt_feedback": "revise_outline",
"feedback": "1. 新章节1\n2. 新章节2",
}
)
mock_session.interact.return_value = user_input
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.OUTLINE.value
calls = mock_session.update_global_state.call_args_list
interaction_update_found = False
for call in calls:
state_dict = call[0][0]
if "search_context.outline_interactions" in state_dict:
interactions = state_dict["search_context.outline_interactions"]
assert len(interactions) == 1
assert interactions[0].feedback == "1. 新章节1\n2. 新章节2"
assert interactions[0].interaction_mode == "revise_outline"
assert interactions[0].outline_before == current_outline
interaction_update_found = True
assert interaction_update_found, "应该保存交互记录"
@pytest.mark.asyncio
async def test_no_user_input_goto_end(
self, outline_interaction_node, mock_session, mock_context
):
"""无用户输入时,跳转到 END"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config, history_outlines=[])
mock_session.interact.return_value = ""
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.END.value
@pytest.mark.asyncio
async def test_unknown_action_goto_editor_team(
self, outline_interaction_node, mock_session, mock_context
):
"""未知 action 时,跳转到 END_NODE"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config, history_outlines=[])
user_input = json.dumps(
{"interrupt_feedback": "unknown_action", "feedback": "一些反馈"}
)
mock_session.interact.return_value = user_input
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.END.value
class TestOutlineInteractionNodeExceptionHandling:
"""异常处理测试"""
@pytest.mark.asyncio
async def test_json_parse_error_handling(
self, outline_interaction_node, mock_session, mock_context
):
"""JSON 解析错误时,记录异常信息并返回空字典"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config, history_outlines=[])
mock_session.interact.return_value = "这不是有效的JSON"
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.END.value
calls = mock_session.update_global_state.call_args_list
exception_logged = any(
"search_context.final_result.exception_info" in str(call) for call in calls
)
assert exception_logged, "应该记录异常信息"
class TestOutlineInteractionNodeHelperMethods:
"""辅助方法测试"""
def test_save_history_with_comments(self, outline_interaction_node, mock_session):
"""保存交互记录,包含 feedback 和 outline_before"""
current_outline = {"title": "当前大纲", "sections": [{"title": "章节1"}]}
outline_interactions = []
def get_state_side_effect(key):
if key == "search_context.current_outline":
return current_outline
elif key == "search_context.outline_interactions":
return outline_interactions
return None
mock_session.get_global_state.side_effect = get_state_side_effect
outline_interaction_node._save_history(mock_session, "这是反馈", "revise_comment")
mock_session.update_global_state.assert_called_once()
updated_state = mock_session.update_global_state.call_args[0][0]
assert "search_context.history_outlines" not in updated_state
assert len(updated_state["search_context.outline_interactions"]) == 1
interaction = updated_state["search_context.outline_interactions"][0]
assert interaction.feedback == "这是反馈"
assert interaction.interaction_mode == "revise_comment"
assert interaction.outline_before == current_outline
def test_save_history_without_comments(
self, outline_interaction_node, mock_session
):
"""保存交互记录:revise_outline 时也保存 feedback"""
current_outline = {"title": "当前大纲", "sections": []}
outline_interactions = []
def get_state_side_effect(key):
if key == "search_context.current_outline":
return current_outline
elif key == "search_context.outline_interactions":
return outline_interactions
return None
mock_session.get_global_state.side_effect = get_state_side_effect
outline_interaction_node._save_history(mock_session, "新大纲JSON", "revise_outline")
updated_state = mock_session.update_global_state.call_args[0][0]
assert "search_context.history_outlines" not in updated_state
assert len(updated_state["search_context.outline_interactions"]) == 1
interaction = updated_state["search_context.outline_interactions"][0]
assert interaction.feedback == "新大纲JSON"
assert interaction.interaction_mode == "revise_outline"
assert interaction.outline_before == current_outline
@pytest.mark.asyncio
async def test_notify_user_input_ended(
self, outline_interaction_node, mock_session
):
"""通知用户:发送 USER_INPUT_ENDED 事件"""
await outline_interaction_node._notify_user(
mock_session, "操作完成", StreamEvent.USER_INPUT_ENDED
)
mock_session.write_custom_stream.assert_called_once()
payload = mock_session.write_custom_stream.call_args[0][0]
assert payload["event"] == StreamEvent.USER_INPUT_ENDED.value
assert payload["content"] == "操作完成"
assert payload["message_type"] == "message_chunk"
@pytest.mark.asyncio
async def test_get_user_input_web_mode(
self, outline_interaction_node, mock_session
):
"""获取输入:web 模式调用 session.interact"""
mock_session.interact.return_value = '{"interrupt_feedback": "accepted"}'
result = await outline_interaction_node._get_user_input("web", "1", mock_session)
mock_session.interact.assert_called_once()
mock_session.update_state.assert_called_once_with({INTERACTIVE_INPUT: None})
assert result == {"interrupt_feedback": "accepted"}
@pytest.mark.asyncio
async def test_get_user_input_cmd_mode(
self, outline_interaction_node, mock_session
):
"""获取输入:cmd 模式调用 input()"""
with patch("builtins.input", return_value='{"interrupt_feedback": "accepted"}'):
result = await outline_interaction_node._get_user_input("cmd", "1", mock_session)
assert result == {"interrupt_feedback": "accepted"}
@pytest.mark.asyncio
async def test_max_rounds_boundary(
self, outline_interaction_node, mock_session, mock_context
):
"""边界条件:current_round == max_rounds - 1 时仍可交互"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 3,
}
outline_interactions = [
OutlineInteraction(feedback="comment1", interaction_mode="revise_comment"),
OutlineInteraction(feedback="comment2", interaction_mode="revise_comment"),
]
setup_session_state(
mock_session, config=config, outline_interactions=outline_interactions
)
mock_session.interact.return_value = '{"interrupt_feedback": "accepted"}'
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.EDITOR_TEAM.value
mock_session.interact.assert_called_once()
@pytest.mark.asyncio
async def test_empty_outline_interactions(
self, outline_interaction_node, mock_session, mock_context
):
"""空交互历史列表时 current_round 为 0"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config)
mock_session.interact.return_value = '{"interrupt_feedback": "accepted"}'
result = outline_interaction_node._pre_handle(
{}, mock_session, mock_context
)
assert result["current_round"] == 0
class TestDependencyOutlineInteractionNode:
"""依赖大纲交互节点测试"""
@pytest.fixture
def dependency_node(self):
"""创建 DependencyOutlineInteractionNode 实例"""
return DependencyOutlineInteractionNode()
@pytest.mark.asyncio
async def test_inherits_from_outline_interaction_node(self, dependency_node):
"""应该继承自 OutlineInteractionNode"""
assert isinstance(dependency_node, OutlineInteractionNode)
@pytest.mark.asyncio
async def test_accepted_redirects_to_dependency_reasoning_team(
self, dependency_node, mock_session, mock_context
):
"""接受大纲时,跳转到 DEPENDENCY_EDITOR_TEAM 而非 EDITOR_TEAM"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config, history_outlines=[])
mock_session.interact.return_value = '{"interrupt_feedback": "accepted"}'
result = await dependency_node._do_invoke({}, mock_session, mock_context)
assert result["next_node"] == NodeId.DEPENDENCY_EDITOR_TEAM.value
@pytest.mark.asyncio
async def test_interaction_disabled_redirects_to_dependency_reasoning_team(
self, dependency_node, mock_session, mock_context
):
"""交互禁用时,跳转到 DEPENDENCY_EDITOR_TEAM"""
config = {
"feedback_mode": "cmd",
"outline_interaction_enabled": False,
"outline_interaction_max_rounds": 5,
}
setup_session_state(mock_session, config=config)
result = await dependency_node._do_invoke({}, mock_session, mock_context)
assert result["next_node"] == NodeId.DEPENDENCY_EDITOR_TEAM.value
@pytest.mark.asyncio
async def test_revise_comment_still_goes_to_outline(
self, dependency_node, mock_session, mock_context
):
"""修改评论时,仍然跳转到 OUTLINE"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
current_outline = {"title": "测试大纲"}
setup_session_state(
mock_session,
config=config,
history_outlines=[],
current_outline=current_outline,
)
mock_session.interact.return_value = (
'{"interrupt_feedback": "revise_comment", "feedback": "修改意见"}'
)
result = await dependency_node._do_invoke({}, mock_session, mock_context)
assert result["next_node"] == NodeId.OUTLINE.value
class TestOutlineInteractionNodeIntegration:
"""集成测试"""
@pytest.mark.asyncio
async def test_full_flow_accepted(
self, outline_interaction_node, mock_session, mock_context
):
"""完整流程测试:用户接受大纲"""
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
current_outline = {
"title": "测试报告大纲",
"sections": [
{"title": "背景介绍", "description": "介绍背景"},
{"title": "分析内容", "description": "详细分析"},
],
}
setup_session_state(
mock_session,
config=config,
history_outlines=[],
current_outline=current_outline,
)
mock_session.interact.return_value = '{"interrupt_feedback": "accepted"}'
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.EDITOR_TEAM.value
assert mock_session.write_custom_stream.call_count >= 1
mock_session.interact.assert_called_once()
@pytest.mark.asyncio
async def test_multiple_rounds_history_accumulation(
self, outline_interaction_node, mock_session, mock_context, caplog
):
"""多轮交互历史累积测试"""
with caplog.at_level(logging.INFO):
config = {
"feedback_mode": "web",
"outline_interaction_enabled": True,
"outline_interaction_max_rounds": 5,
}
outline_interactions = [
OutlineInteraction(feedback="第一轮反馈", interaction_mode="revise_comment")
]
current_outline = {"title": "大纲v2"}
setup_session_state(
mock_session,
config=config,
current_outline=current_outline,
outline_interactions=outline_interactions,
)
mock_session.interact.return_value = (
'{"interrupt_feedback": "revise_comment", "feedback": "继续修改"}'
)
result = await outline_interaction_node._do_invoke(
{}, mock_session, mock_context
)
assert result["next_node"] == NodeId.OUTLINE.value
calls = mock_session.update_global_state.call_args_list
interactions_updated = False
for call in calls:
state = call[0][0]
if "search_context.outline_interactions" in state:
interactions = state["search_context.outline_interactions"]
assert len(interactions) == 2
assert interactions[1].feedback == "继续修改"
assert interactions[1].interaction_mode == "revise_comment"
assert interactions[1].outline_before == current_outline
interactions_updated = True
if "search_context.history_outlines" in state:
assert False, "大纲交互阶段不应保存到 history_outlines"
assert interactions_updated