"""Unit tests for DeepAgent public APIs."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple
from unittest.mock import AsyncMock, call, patch
import pytest
from openjiuwen.core.context_engine import ContextEngineConfig
from openjiuwen.core.foundation.llm import (
AssistantMessage,
Model,
ModelClientConfig,
ModelRequestConfig,
UsageMetadata,
UserMessage,
)
from openjiuwen.core.foundation.tool import Tool, ToolCard, McpServerConfig
from openjiuwen.core.foundation.tool.schema import ToolInfo
from openjiuwen.core.runner.resources_manager.base import Ok
from openjiuwen.core.session.stream.base import OutputSchema, StreamMode
from openjiuwen.core.single_agent.agents.react_agent import ReActAgentConfig
from openjiuwen.core.single_agent.rail.base import (
AgentCallbackContext,
AgentCallbackEvent,
AgentRail,
)
from openjiuwen.core.runner import Runner
from openjiuwen.core.session.agent import Session
from openjiuwen.core.single_agent.schema.agent_card import AgentCard
from openjiuwen.harness import create_deep_agent, Workspace
from openjiuwen.harness.deep_agent import DeepAgent
from openjiuwen.harness.rails.sys_operation_rail import SysOperationRail
from openjiuwen.harness.schema.config import DeepAgentConfig, SubAgentConfig
from openjiuwen.harness.subagents import (
build_code_agent_config,
build_research_agent_config,
create_code_agent,
)
from openjiuwen.harness.subagents.code_agent import (
CODE_AGENT_FACTORY_NAME,
DEFAULT_CODE_AGENT_SYSTEM_PROMPT,
)
from openjiuwen.harness.subagents.research_agent import (
DEFAULT_RESEARCH_AGENT_SYSTEM_PROMPT,
RESEARCH_AGENT_FACTORY_NAME,
)
from openjiuwen.harness.task_loop.task_loop_event_handler import TaskLoopEventHandler
from openjiuwen.harness.task_loop.loop_coordinator import LoopCoordinator
from openjiuwen.harness.tools import WebFreeSearchTool
def _create_dummy_model() -> Model:
"""Create a dummy Model instance for testing."""
model_client_config = ModelClientConfig(
client_provider="OpenAI",
api_key="test-key",
api_base="http://test-base",
verify_ssl=False,
)
model_config = ModelRequestConfig(model="test-model")
return Model(model_client_config=model_client_config, model_config=model_config)
class FakeInnerCallbackManager:
def __init__(self) -> None:
self.unregister_calls: List[Tuple[AgentRail, Any]] = []
async def unregister_rail(self, rail: AgentRail, agent: Any) -> None:
self.unregister_calls.append((rail, agent))
class FakeReactAgent:
def __init__(self) -> None:
self.invoke_calls: List[Dict[str, Any]] = []
self.stream_calls: List[Dict[str, Any]] = []
self.registered_callbacks: List[Tuple[AgentCallbackEvent, Any, int]] = []
self.agent_callback_manager = FakeInnerCallbackManager()
self.config = ReActAgentConfig()
self.prompt_builder = None
self.system_prompt_builder = None
async def register_callback(
self,
event: AgentCallbackEvent,
callback: Any,
priority: int,
) -> None:
self.registered_callbacks.append((event, callback, priority))
async def invoke(
self,
inputs: Dict[str, Any],
session: Optional[Any] = None,
**kwargs: Any,
) -> Dict[str, Any]:
self.invoke_calls.append({"inputs": inputs, "session": session})
return {
"output": f"echo:{inputs['query']}",
"result_type": "answer",
}
async def stream(
self,
inputs: Dict[str, Any],
session: Optional[Any] = None,
stream_modes: Optional[List[StreamMode]] = None,
) -> AsyncIterator[Dict[str, Any]]:
self.stream_calls.append(
{
"inputs": inputs,
"session": session,
"stream_modes": stream_modes,
}
)
yield {"chunk": 1, "query": inputs["query"]}
yield {"chunk": 2, "query": inputs["query"]}
async def write_invoke_result_to_stream(
self,
result: Dict[str, Any],
session: Optional[Any] = None,
) -> None:
from openjiuwen.core.session.stream.base import OutputSchema
if session is not None:
await session.write_stream(OutputSchema(
type="answer",
index=0,
payload={
"output": result.get("output", ""),
"result_type": result.get("result_type", ""),
},
))
def configure(self, config: ReActAgentConfig) -> None:
self.config = config
class CountingRail(AgentRail):
def __init__(self) -> None:
super().__init__()
self.before_invoke_count = 0
self.after_invoke_count = 0
self.before_tool_call_count = 0
self.after_invoke_result: Optional[Dict[str, Any]] = None
def init(self, agent):
rail_tool = _build_tool_card("rail_tool")
agent.ability_manager.add(rail_tool)
def uninit(self, agent):
agent.ability_manager.remove("rail_tool")
async def before_invoke(self, ctx: AgentCallbackContext) -> None:
_ = ctx
self.before_invoke_count += 1
async def after_invoke(self, ctx: AgentCallbackContext) -> None:
self.after_invoke_count += 1
self.after_invoke_result = getattr(ctx.inputs, "result", None)
async def before_tool_call(self, ctx: AgentCallbackContext) -> None:
_ = ctx
self.before_tool_call_count += 1
def _build_tool_card(name: str) -> ToolCard:
return ToolCard(name=name, description=f"{name} tool")
class DummyTool(Tool):
def __init__(self, name: str, tool_id: Optional[str] = None) -> None:
super().__init__(ToolCard(id=tool_id or name, name=name, description=f"{name} tool"))
async def invoke(self, inputs: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]:
_ = kwargs
return {"inputs": inputs}
async def stream(self, inputs: Dict[str, Any], **kwargs: Any) -> AsyncIterator[Dict[str, Any]]:
_ = kwargs
yield {"inputs": inputs}
def test_configure_set_react_agent_and_is_initialized() -> None:
agent = DeepAgent(AgentCard(name="deep", description="test"))
configured = agent.configure(
DeepAgentConfig(enable_task_loop=False, max_iterations=3)
)
assert configured is agent
assert agent.is_initialized is False
fake_react = FakeReactAgent()
set_result = agent.set_react_agent(fake_react, initialized=True)
assert set_result is agent
assert agent.is_initialized is True
assert agent.loop_coordinator is None
@pytest.mark.asyncio
async def test_add_rail_lazy_register_on_first_invoke() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=False)
rail = CountingRail()
assert agent.add_rail(rail) is agent
result = await agent.invoke(
{"query": "hello", "conversation_id": "c1"}
)
assert result["output"] == "echo:hello"
assert rail.before_invoke_count == 1
assert rail.after_invoke_count == 1
assert agent.is_initialized is True
assert fake_react.invoke_calls[0]["inputs"] == {
"query": "hello",
"conversation_id": "c1",
}
bridged_events = [item[0] for item in fake_react.registered_callbacks]
assert AgentCallbackEvent.BEFORE_TOOL_CALL in bridged_events
assert AgentCallbackEvent.BEFORE_INVOKE not in bridged_events
@pytest.mark.asyncio
async def test_register_and_unregister_rail() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
rail = CountingRail()
await agent.register_rail(rail)
await agent.invoke("round1")
assert rail.before_invoke_count == 1
assert rail.after_invoke_count == 1
await agent.unregister_rail(rail)
await agent.invoke("round2")
assert rail.before_invoke_count == 1
assert rail.after_invoke_count == 1
assert len(fake_react.agent_callback_manager.unregister_calls) == 1
assert fake_react.agent_callback_manager.unregister_calls[0][0] is rail
def test_find_rails_by_type_returns_matching_rails() -> None:
"""find_rails_by_type locates queued rails by type without exposing internals."""
agent = DeepAgent(AgentCard(name="deep", description="test")).configure(DeepAgentConfig(enable_task_loop=False))
counting = CountingRail()
agent.add_rail(counting)
assert agent.find_rails_by_type((CountingRail,)) == [counting]
assert agent.find_rails_by_type((SysOperationRail,)) == []
assert agent.find_rails_by_type(()) == []
@pytest.mark.asyncio
async def test_invoke_runtime_error_when_not_configured() -> None:
agent = DeepAgent(AgentCard(name="deep", description="test"))
with pytest.raises(Exception, match="DeepAgent not configured"):
await agent.invoke({"query": "hello"})
@pytest.mark.asyncio
async def test_invoke_invalid_input_type_error() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
agent.set_react_agent(FakeReactAgent(), initialized=True)
with pytest.raises(Exception, match="Input must be dict"):
await agent.invoke(123)
@pytest.mark.asyncio
async def test_invoke_task_loop_requires_session() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=True)
)
agent.set_react_agent(FakeReactAgent(), initialized=True)
with pytest.raises(
Exception, match="session is required"
):
await agent.invoke("no_session")
@pytest.mark.asyncio
async def test_invoke_task_loop_delegates_to_event_queue() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=True)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
session = Session(session_id="s1")
result = await agent.invoke("loop_input", session=session)
assert result["output"] == "echo:loop_input"
assert agent.loop_coordinator is None
@pytest.mark.asyncio
async def test_stream_single_round_branch() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
chunks = [chunk async for chunk in agent.stream("stream_input")]
assert [chunk["chunk"] for chunk in chunks] == [1, 2]
assert fake_react.stream_calls[0]["inputs"] == {"query": "stream_input"}
@pytest.mark.asyncio
async def test_stream_sets_result_before_after_invoke() -> None:
class AnswerStreamingReactAgent(FakeReactAgent):
async def stream(
self,
inputs: Dict[str, Any],
session: Optional[Any] = None,
stream_modes: Optional[List[StreamMode]] = None,
) -> AsyncIterator[OutputSchema]:
self.stream_calls.append(
{
"inputs": inputs,
"session": session,
"stream_modes": stream_modes,
}
)
yield OutputSchema(
type="llm_output",
index=0,
payload={"content": "hello ", "result_type": "answer"},
)
yield OutputSchema(
type="llm_output",
index=1,
payload={"content": "world", "result_type": "answer"},
)
yield OutputSchema(
type="answer",
index=0,
payload={"output": "hello world", "result_type": "answer"},
)
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
rail = CountingRail()
agent.add_rail(rail)
agent.set_react_agent(AnswerStreamingReactAgent(), initialized=False)
chunks = [chunk async for chunk in agent.stream("stream_input")]
assert [chunk.type for chunk in chunks] == ["llm_output", "llm_output", "answer"]
assert rail.after_invoke_count == 1
assert rail.after_invoke_result == {
"output": "hello world",
"result_type": "answer",
}
@pytest.mark.asyncio
async def test_stream_task_loop_yields_result() -> None:
await Runner.start()
try:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=True)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
chunks = []
async for chunk in Runner.run_agent_streaming(
agent, {"query": "loop_input"}
):
chunks.append(chunk)
from openjiuwen.core.session.stream.base import OutputSchema
assert len(chunks) >= 1
assert isinstance(chunks[-1], OutputSchema)
answer_chunks = [c for c in chunks if c.type == "answer"]
assert len(answer_chunks) >= 1
assert answer_chunks[0].payload["output"] == "echo:loop_input"
finally:
await Runner.stop()
@pytest.mark.asyncio
async def test_follow_up_steer_noop_without_queue() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
await agent.follow_up("continue", task_id="task_1")
await agent.steer("change strategy")
assert agent.loop_coordinator is None
@pytest.mark.asyncio
async def test_get_context_usage_prefers_model_usage_metadata() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(
enable_task_loop=False,
context_engine_config=ContextEngineConfig(
context_window_tokens=1000,
),
)
)
session = Session(session_id="ctx_usage")
context = await agent.react_agent.context_engine.create_context(
session=session
)
await context.add_messages([
UserMessage(content="hello"),
AssistantMessage(
content="world",
usage_metadata=UsageMetadata(total_tokens=250),
),
])
usage = agent.get_context_usage(session_id="ctx_usage")
assert usage["session_id"] == "ctx_usage"
assert usage["total_tokens"] == 250
assert usage["context_window_tokens"] == 1000
assert usage["usage_ratio"] == 0.25
assert usage["usage_percent"] == 25.0
assert usage["stats"]["total_tokens"] == 250
@pytest.mark.asyncio
async def test_get_current_context_returns_messages() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
session = Session(session_id="ctx_messages")
context = await agent.react_agent.context_engine.create_context(
session=session
)
await context.add_messages(UserMessage(content="current"))
messages = agent.get_current_context(session_id="ctx_messages")
assert len(messages) == 1
assert messages[0].content == "current"
@pytest.mark.asyncio
async def test_create_new_context_engine_returns_session_id_and_keeps_existing_context() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
old_engine = agent.react_agent.context_engine
session = Session(session_id="old_ctx")
await old_engine.create_context(session=session)
new_session_id = await agent.create_new_context_engine("new_ctx")
assert new_session_id == "new_ctx"
assert agent.react_agent.context_engine is old_engine
assert old_engine.get_context(session_id="old_ctx") is not None
assert agent.react_agent.context_engine.get_context(session_id="new_ctx") is not None
@pytest.mark.asyncio
async def test_create_new_context_engine_seeds_messages() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
await agent.create_new_context_engine(
"seeded_ctx",
messages=["seed prompt"],
)
context = agent.react_agent.context_engine.get_context(
session_id="seeded_ctx"
)
messages = context.get_messages()
assert len(messages) == 1
assert messages[0].role == "system"
assert messages[0].content == "seed prompt"
@pytest.mark.asyncio
async def test_new_context_engine_accepts_messages() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=False)
)
session_id = await agent.new_context_engine(
session_id="alias_ctx",
messages=["alias prompt"],
)
context = agent.react_agent.context_engine.get_context(
session_id="alias_ctx"
)
assert session_id == "alias_ctx"
assert context.get_messages()[0].content == "alias prompt"
@pytest.mark.asyncio
async def test_abort_sets_coordinator_flag() -> None:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(
DeepAgentConfig(enable_task_loop=True)
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
coordinator = LoopCoordinator()
coordinator.reset()
handler = TaskLoopEventHandler(agent)
class FakeController:
"""Minimal Controller stub."""
def __init__(self) -> None:
self.event_handler = handler
self.event_queue = None
async def stop(self) -> None:
pass
agent._loop_coordinator = coordinator
agent._loop_controller = FakeController()
agent._loop_session = None
handler.prepare_round()
await agent.abort()
assert coordinator.is_aborted is True
fut_result = await handler.wait_completion(
timeout=1.0
)
assert fut_result == {"error": "aborted"}
@pytest.mark.asyncio
async def test_create_deep_agent_factory_public_api() -> None:
rail = CountingRail()
tool = _build_tool_card("factory_tool")
subagent = AgentCard(name="subagent_a", description="sub")
agent = create_deep_agent(
model=_create_dummy_model(),
system_prompt="factory prompt",
tools=[tool],
subagents=[subagent],
rails=[rail],
auto_create_workspace=False,
enable_task_loop=False,
max_iterations=4,
)
assert isinstance(agent, DeepAgent)
assert agent.card.name == "deep_agent"
assert agent.ability_manager.get("factory_tool") is tool
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=False)
result = await agent.invoke("factory_call")
assert result["output"] == "echo:factory_call"
assert rail.before_invoke_count == 1
assert rail.after_invoke_count == 1
def test_create_deep_agent_registers_tool_instances() -> None:
tool = DummyTool("factory_tool_instance")
try:
agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
auto_create_workspace=False,
)
assert isinstance(agent, DeepAgent)
assert agent.ability_manager.get(tool.card.name) is tool.card
assert Runner.resource_mgr.get_tool(tool.card.id) is not None
finally:
Runner.resource_mgr.remove_tool(tool.card.id)
def test_create_deep_agent_skips_free_search_when_all_free_engines_disabled(monkeypatch) -> None:
monkeypatch.setenv("FREE_SEARCH_DDG_ENABLED", "false")
monkeypatch.setenv("FREE_SEARCH_BING_ENABLED", "false")
tool = WebFreeSearchTool(language="cn", agent_id="disabled")
agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
auto_create_workspace=False,
)
assert agent.ability_manager.get("free_search") is None
assert Runner.resource_mgr.get_tool(tool.card.id) is None
def test_deep_agent_hot_reload_removes_and_restores_free_search(monkeypatch) -> None:
monkeypatch.setenv("FREE_SEARCH_DDG_ENABLED", "true")
monkeypatch.setenv("FREE_SEARCH_BING_ENABLED", "false")
tool = WebFreeSearchTool(language="cn", agent_id="hot_reload")
agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
auto_create_workspace=False,
)
try:
assert agent.ability_manager.get("free_search") is tool.card
assert Runner.resource_mgr.get_tool(tool.card.id) is not None
monkeypatch.setenv("FREE_SEARCH_DDG_ENABLED", "false")
monkeypatch.setenv("FREE_SEARCH_BING_ENABLED", "false")
agent.configure(DeepAgentConfig(tools=[tool.card]))
assert agent.ability_manager.get("free_search") is None
assert Runner.resource_mgr.get_tool(tool.card.id) is None
monkeypatch.setenv("FREE_SEARCH_DDG_ENABLED", "true")
agent.configure(DeepAgentConfig(tools=[tool.card], language="cn"))
assert agent.ability_manager.get("free_search") is tool.card
assert Runner.resource_mgr.get_tool(tool.card.id) is not None
finally:
if Runner.resource_mgr.get_tool(tool.card.id) is not None:
Runner.resource_mgr.remove_tool(tool.card.id)
def test_create_deep_agent_reuses_same_tool_instance_across_agents() -> None:
tool = DummyTool("shared_tool_instance", tool_id="shared_tool_instance_id")
try:
first_agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
auto_create_workspace=False,
)
second_agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
auto_create_workspace=False,
)
assert isinstance(first_agent, DeepAgent)
assert isinstance(second_agent, DeepAgent)
assert second_agent.ability_manager.get(tool.card.name) is tool.card
finally:
Runner.resource_mgr.remove_tool(tool.card.id)
def test_create_deep_agent_qualifies_conflicting_tool_ids_per_agent() -> None:
first_tool = DummyTool("tool_a", tool_id="shared_tool_id")
second_tool = DummyTool("tool_b", tool_id="shared_tool_id")
first_agent = create_deep_agent(
model=_create_dummy_model(),
tools=[first_tool],
auto_create_workspace=False,
)
second_agent = create_deep_agent(
model=_create_dummy_model(),
tools=[second_tool],
auto_create_workspace=False,
)
try:
assert first_agent.ability_manager.get("tool_a") is first_tool.card
assert second_agent.ability_manager.get("tool_b") is second_tool.card
assert first_tool.card.id != second_tool.card.id
assert first_tool.card.id.endswith(first_agent.card.id)
assert second_tool.card.id.endswith(second_agent.card.id)
finally:
Runner.resource_mgr.remove_tool(first_tool.card.id)
Runner.resource_mgr.remove_tool(second_tool.card.id)
@pytest.mark.asyncio
async def test_create_deep_agent_registers_mcps_on_first_invoke() -> None:
mcp_config = McpServerConfig(
server_name="test_mcp_server",
server_id="mcp_server_001",
server_path="http://127.0.0.1:8930/mcp",
client_type="streamable-http",
)
mcp_tool = ToolInfo(
name="mcp_lookup",
description="lookup through mcp",
parameters={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
)
mcp_tool_raw_name = mcp_tool.name
with patch.object(
Runner.resource_mgr,
"add_mcp_server",
new=AsyncMock(return_value=Ok(mcp_config.server_id)),
) as mock_add_mcp_server, patch.object(
Runner.resource_mgr,
"get_mcp_tool_infos",
new=AsyncMock(return_value=[mcp_tool]),
) as mock_get_mcp_tool_infos:
agent = create_deep_agent(
model=_create_dummy_model(),
mcps=[mcp_config],
auto_create_workspace=False,
enable_task_loop=False,
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=False)
assert mock_add_mcp_server.await_count == 0
result = await agent.invoke("factory_call")
assert result["output"] == "echo:factory_call"
mock_add_mcp_server.assert_awaited_once_with(
mcp_config,
tag=agent.card.id,
)
assert agent.ability_manager.get(mcp_config.server_name) is mcp_config
tool_infos = await agent.ability_manager.list_tool_info()
expected_mcp_tool_name = f"mcp_{mcp_config.server_name}_{mcp_tool_raw_name}"
assert any(tool_info.name == expected_mcp_tool_name for tool_info in tool_infos)
mcp_tool_card = agent.ability_manager.get(expected_mcp_tool_name)
assert isinstance(mcp_tool_card, ToolCard)
assert mcp_tool_card.input_params == mcp_tool.parameters
mock_get_mcp_tool_infos.assert_awaited()
@pytest.mark.asyncio
async def test_create_deep_agent_reuses_registered_mcps_with_same_config() -> None:
mcp_config = McpServerConfig(
server_name="test_mcp_server",
server_id="mcp_server_001",
server_path="http://127.0.0.1:8930/mcp",
client_type="streamable-http",
)
mcp_tool = ToolInfo(
name="mcp_lookup",
description="lookup through mcp",
parameters={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
)
mcp_tool_id = f"{mcp_config.server_id}.{mcp_config.server_name}.{mcp_tool.name}"
with patch.object(
Runner.resource_mgr,
"get_mcp_server_config",
return_value=mcp_config,
), patch.object(
Runner.resource_mgr,
"add_mcp_server",
new=AsyncMock(),
) as mock_add_mcp_server, patch.object(
Runner.resource_mgr,
"get_mcp_tool_ids",
return_value=[mcp_tool_id],
), patch.object(
Runner.resource_mgr,
"add_resource_tag",
return_value=Ok(["deep_agent_id"]),
) as mock_add_resource_tag, patch.object(
Runner.resource_mgr,
"get_mcp_tool_infos",
new=AsyncMock(return_value=[mcp_tool]),
):
agent = create_deep_agent(
model=_create_dummy_model(),
mcps=[mcp_config],
auto_create_workspace=False,
enable_task_loop=False,
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=False)
result = await agent.invoke("factory_call")
assert result["output"] == "echo:factory_call"
mock_add_mcp_server.assert_not_awaited()
mock_add_resource_tag.assert_has_calls(
[
call(mcp_config.server_id, agent.card.id),
call(mcp_tool_id, agent.card.id),
]
)
assert agent.ability_manager.get(mcp_config.server_name) is mcp_config
@pytest.mark.asyncio
async def test_create_deep_agent_rejects_conflicting_registered_mcp_config() -> None:
mcp_config = McpServerConfig(
server_name="test_mcp_server",
server_id="mcp_server_001",
server_path="http://127.0.0.1:8930/mcp",
client_type="streamable-http",
)
conflicting_config = mcp_config.model_copy(update={"server_path": "http://127.0.0.1:8940/mcp"})
with patch.object(
Runner.resource_mgr,
"get_mcp_server_config",
return_value=conflicting_config,
):
agent = create_deep_agent(
model=_create_dummy_model(),
mcps=[mcp_config],
auto_create_workspace=False,
enable_task_loop=False,
)
agent.set_react_agent(FakeReactAgent(), initialized=False)
with pytest.raises(Exception, match="different config"):
await agent.invoke("factory_call")
def test_create_deep_agent_with_custom_card() -> None:
custom_card = AgentCard(name="custom_deep", description="custom")
agent = create_deep_agent(
model=_create_dummy_model(),
card=custom_card,
auto_create_workspace=False,
)
assert isinstance(agent, DeepAgent)
assert agent.card is custom_card
def test_create_deep_agent_auto_add_task_planning_rail() -> None:
"""Test that TaskPlanningRail is auto-added when enable_task_loop=True."""
agent = create_deep_agent(
model=_create_dummy_model(),
auto_create_workspace=False,
enable_task_planning=True,
)
pending_rails = agent._pending_rails
assert len(pending_rails) > 0
rail_types = [type(rail).__name__ for rail in pending_rails if rail is not None]
assert "TaskPlanningRail" in rail_types
@pytest.mark.asyncio
async def test_hot_reconfigure_preserves_task_tool_from_subagent_rail() -> None:
tool = _build_tool_card("factory_tool")
subagent = SubAgentConfig(
agent_card=AgentCard(name="browser_agent", description="browser subagent"),
system_prompt="browser prompt",
model=_create_dummy_model(),
)
agent = create_deep_agent(
model=_create_dummy_model(),
tools=[tool],
subagents=[subagent],
auto_create_workspace=False,
enable_task_loop=False,
)
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=False)
await agent.invoke("initialize subagent rail")
assert agent.ability_manager.get("task_tool") is not None
agent.configure(
DeepAgentConfig(
model=_create_dummy_model(),
tools=[tool],
subagents=[subagent],
rails=[],
enable_task_loop=False,
system_prompt="updated prompt",
)
)
assert agent.ability_manager.get("task_tool") is not None
def test_create_deep_agent_auto_add_skill_rail(tmp_path) -> None:
"""Test that SkillUseRail is auto-added when skills parameter is provided."""
skills = ["name", "test_skill", "description", "test"]
workspace_root = tmp_path / "team_member_workspace"
agent = create_deep_agent(
model=_create_dummy_model(),
skills=skills,
workspace=Workspace(root_path=str(workspace_root)),
)
pending_rails = agent._pending_rails
assert len(pending_rails) > 0
non_null_rails = [rail for rail in pending_rails if rail is not None]
rail_types = [type(rail).__name__ for rail in non_null_rails]
assert "SkillUseRail" in rail_types
skill_rail = next(rail for rail in non_null_rails if type(rail).__name__ == "SkillUseRail")
assert isinstance(skill_rail.skills_dir, list)
assert Path(skill_rail.skills_dir[0]) == workspace_root / "skills"
assert skill_rail.enabled_skills == set()
def test_create_deep_agent_does_not_add_skill_rail_when_skills_empty(tmp_path) -> None:
workspace_root = tmp_path / "team_member_workspace"
agent = create_deep_agent(
model=_create_dummy_model(),
skills=[],
workspace=Workspace(root_path=str(workspace_root)),
)
non_null_rails = [rail for rail in agent._pending_rails if rail is not None]
rail_types = [type(rail).__name__ for rail in non_null_rails]
assert "SkillUseRail" not in rail_types
def test_create_deep_agent_auto_add_skill_rail_when_skill_discovery_enabled(tmp_path) -> None:
workspace_root = tmp_path / "team_member_workspace"
agent = create_deep_agent(
model=_create_dummy_model(),
skills=[],
workspace=Workspace(root_path=str(workspace_root)),
enable_skill_discovery=True,
)
non_null_rails = [rail for rail in agent._pending_rails if rail is not None]
rail_types = [type(rail).__name__ for rail in non_null_rails]
assert "SkillUseRail" in rail_types
skill_rail = next(rail for rail in non_null_rails if type(rail).__name__ == "SkillUseRail")
assert isinstance(skill_rail.skills_dir, list)
assert Path(skill_rail.skills_dir[0]) == workspace_root / "skills"
assert skill_rail.enabled_skills == set()
def test_create_deep_agent_no_duplicate_task_planning_rail() -> None:
"""Test that TaskPlanningRail is not duplicated when manually provided."""
from openjiuwen.harness.rails import TaskPlanningRail
manual_rail = TaskPlanningRail()
agent = create_deep_agent(
model=_create_dummy_model(),
auto_create_workspace=False,
enable_task_loop=True,
rails=[manual_rail],
)
pending_rails = agent._pending_rails
task_planning_count = sum(1 for rail in pending_rails if isinstance(rail, TaskPlanningRail))
assert task_planning_count == 1, f"Expected 1 TaskPlanningRail, but found {task_planning_count}"
def test_create_deep_agent_no_duplicate_skill_rail() -> None:
"""Test that SkillUseRail is not duplicated when manually provided."""
from openjiuwen.harness.rails import SkillUseRail
manual_rail = SkillUseRail(skills_dir="./", skill_mode="all")
skills = [{"name": "test_skill", "description": "test"}]
agent = create_deep_agent(
model=_create_dummy_model(),
skills=skills,
rails=[manual_rail],
auto_create_workspace=False,
)
pending_rails = agent._pending_rails
skill_rail_count = sum(1 for rail in pending_rails if isinstance(rail, SkillUseRail))
assert skill_rail_count == 1, f"Expected 1 SkillUseRail, but found {skill_rail_count}"
def test_create_deep_agent_subclass_skill_rail_not_duplicated() -> None:
"""Subclass of SkillUseRail should suppress the default SkillUseRail fallback."""
from openjiuwen.harness.rails import SkillUseRail
class _CustomSkillRail(SkillUseRail):
pass
custom_rail = _CustomSkillRail(skills_dir="./", skill_mode="all")
skills = ["some_skill"]
agent = create_deep_agent(
model=_create_dummy_model(),
skills=skills,
rails=[custom_rail],
auto_create_workspace=False,
)
skill_rail_count = sum(1 for r in agent._pending_rails if isinstance(r, SkillUseRail))
assert skill_rail_count == 1, (
f"Subclass should suppress default SkillUseRail, but found {skill_rail_count}"
)
def test_create_deep_agent_subclass_task_planning_rail_not_duplicated() -> None:
"""Subclass of TaskPlanningRail should suppress the default TaskPlanningRail fallback."""
from openjiuwen.harness.rails import TaskPlanningRail
class _CustomTaskPlanningRail(TaskPlanningRail):
pass
custom_rail = _CustomTaskPlanningRail()
agent = create_deep_agent(
model=_create_dummy_model(),
auto_create_workspace=False,
enable_task_planning=True,
rails=[custom_rail],
)
task_plan_count = sum(1 for r in agent._pending_rails if isinstance(r, TaskPlanningRail))
assert task_plan_count == 1, (
f"Subclass should suppress default TaskPlanningRail, but found {task_plan_count}"
)
def test_create_code_agent_injects_default_code_tool_and_fs_rail() -> None:
agent = create_code_agent(model=_create_dummy_model())
assert isinstance(agent, DeepAgent)
assert agent.card.name == "code_agent"
assert any(isinstance(rail, SysOperationRail) for rail in agent._pending_rails)
def test_create_code_agent_accepts_explicit_mcps() -> None:
mcp_config = McpServerConfig(
server_name="wrapper_mcp",
server_id="wrapper_mcp_001",
server_path="http://127.0.0.1:8930/mcp",
)
agent = create_code_agent(
model=_create_dummy_model(),
mcps=[mcp_config],
)
assert agent.deep_config is not None
assert agent.deep_config.mcps == [mcp_config]
def test_build_code_agent_config_uses_code_factory() -> None:
spec = build_code_agent_config(_create_dummy_model(), language="en")
assert isinstance(spec, SubAgentConfig)
assert spec.agent_card.name == "code_agent"
assert spec.system_prompt == DEFAULT_CODE_AGENT_SYSTEM_PROMPT["en"]
assert spec.factory_name == CODE_AGENT_FACTORY_NAME
assert spec.tools is None
assert spec.rails is None
def test_build_research_agent_config_uses_research_factory() -> None:
spec = build_research_agent_config(_create_dummy_model(), language="en")
assert isinstance(spec, SubAgentConfig)
assert spec.agent_card.name == "research_agent"
assert spec.system_prompt == DEFAULT_RESEARCH_AGENT_SYSTEM_PROMPT["en"]
assert spec.factory_name == RESEARCH_AGENT_FACTORY_NAME
assert spec.tools is None
assert spec.rails is None
def test_create_subagent_uses_code_agent_factory(tmp_path) -> None:
workspace_root = tmp_path / "parent_workspace"
parent = create_deep_agent(
model=_create_dummy_model(),
card=AgentCard(name="parent", description="parent"),
system_prompt="parent prompt",
workspace=Workspace(root_path=str(workspace_root)),
subagents=[build_code_agent_config(_create_dummy_model(), language="en")],
)
factory_result = object()
with patch(
"openjiuwen.harness.subagents.code_agent.create_code_agent",
return_value=factory_result,
) as mock_create_code_agent:
sub = parent.create_subagent("code_agent", "sub_session_id")
assert sub is factory_result
mock_create_code_agent.assert_called_once()
call_kwargs = mock_create_code_agent.call_args.kwargs
assert call_kwargs["card"].name == "code_agent"
assert call_kwargs["tools"] is None
assert call_kwargs["rails"] is None
assert Path(call_kwargs["workspace"].root_path).name == "sub_session_id"
def test_create_subagent_uses_research_agent_factory(tmp_path) -> None:
workspace_root = tmp_path / "parent_workspace"
parent = create_deep_agent(
model=_create_dummy_model(),
card=AgentCard(name="parent", description="parent"),
system_prompt="parent prompt",
workspace=Workspace(root_path=str(workspace_root)),
subagents=[build_research_agent_config(_create_dummy_model(), language="en")],
)
factory_result = object()
with patch(
"openjiuwen.harness.subagents.research_agent.create_research_agent",
return_value=factory_result,
) as mock_create_research_agent:
sub = parent.create_subagent("research_agent", "sub_session_id")
assert sub is factory_result
mock_create_research_agent.assert_called_once()
call_kwargs = mock_create_research_agent.call_args.kwargs
assert call_kwargs["card"].name == "research_agent"
assert call_kwargs["tools"] is None
assert call_kwargs["rails"] is None
assert Path(call_kwargs["workspace"].root_path).name == "sub_session_id"
@pytest.mark.asyncio
async def test_create_deep_agent_with_restrict_to_work_dir_enabled(tmp_path) -> None:
"""Test that restrict_to_work_dir=False results in no sandbox."""
agent = create_deep_agent(
model=_create_dummy_model(),
workspace=Workspace(root_path=str(tmp_path)),
restrict_to_work_dir=False,
)
assert agent.deep_config is not None
assert agent.deep_config.sys_operation is not None
sys_op = Runner.resource_mgr.get_sys_operation(f"{agent.card.name}_{agent.card.id}")
assert sys_op is not None
assert sys_op._run_config.sandbox_root is None
assert sys_op._run_config.shell_allowlist is None
assert sys_op._run_config.restrict_to_sandbox is False
@pytest.mark.asyncio
async def test_stream_cancel_waits_for_cleanup() -> None:
"""Cancelled stream should wait for cleanup before returning."""
await Runner.start()
try:
agent = DeepAgent(
AgentCard(name="deep", description="test")
).configure(DeepAgentConfig(enable_task_loop=True))
fake_react = FakeReactAgent()
agent.set_react_agent(fake_react, initialized=True)
async def _collect():
chunks = []
async for chunk in Runner.run_agent_streaming(agent, {"query": "test"}):
chunks.append(chunk)
return chunks
stream_task = asyncio.create_task(_collect())
await asyncio.sleep(0.2)
stream_task.cancel()
try:
await stream_task
except asyncio.CancelledError:
pass
assert agent._bound_session_id is None
assert agent._loop_controller is None
finally:
await Runner.stop()
def test_create_subagent_inherits_parent_restrict_to_work_dir(tmp_path) -> None:
"""子代理的 restrict_to_work_dir 不能低于父代理的约束级别。
当父代理设置了 restrict_to_work_dir=True(默认),即使子代理 SubAgentConfig
中指定 restrict_to_work_dir=False(如 explore_agent),创建出的子代理也必须
继承父代理的 restrict_to_sandbox=True,防止通过子代理绕过沙箱限制。
"""
from openjiuwen.harness.subagents.explore_agent import build_explore_agent_config
workspace_root = tmp_path / "parent_workspace"
parent = create_deep_agent(
model=_create_dummy_model(),
card=AgentCard(name="parent", description="parent"),
workspace=Workspace(root_path=str(workspace_root)),
subagents=[build_explore_agent_config(model=_create_dummy_model())],
restrict_to_work_dir=True,
)
assert parent.deep_config.restrict_to_work_dir is True
with patch("openjiuwen.harness.factory.create_deep_agent", return_value=object()) as mock_create:
parent.create_subagent("explore_agent", "sub_session_id")
mock_create.assert_called_once()
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["restrict_to_work_dir"] is True
def test_create_subagent_respects_subagent_restrict_when_parent_unrestricted(tmp_path) -> None:
"""父代理不限制时,子代理自身的 restrict_to_work_dir=True 仍然生效。"""
from openjiuwen.harness.subagents.explore_agent import build_explore_agent_config
workspace_root = tmp_path / "parent_workspace"
explore_spec = build_explore_agent_config(model=_create_dummy_model())
explore_spec.restrict_to_work_dir = True
parent = create_deep_agent(
model=_create_dummy_model(),
card=AgentCard(name="parent", description="parent"),
workspace=Workspace(root_path=str(workspace_root)),
subagents=[explore_spec],
restrict_to_work_dir=False,
)
assert parent.deep_config.restrict_to_work_dir is False
with patch("openjiuwen.harness.factory.create_deep_agent", return_value=object()) as mock_create:
parent.create_subagent("explore_agent", "sub_session_id")
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["restrict_to_work_dir"] is True
def test_create_subagent_unrestricted_when_both_unrestricted(tmp_path) -> None:
"""父子代理均不限制时,子代理可以无沙箱运行(CLI 宽松场景)。"""
from openjiuwen.harness.subagents.explore_agent import build_explore_agent_config
workspace_root = tmp_path / "parent_workspace"
parent = create_deep_agent(
model=_create_dummy_model(),
card=AgentCard(name="parent", description="parent"),
workspace=Workspace(root_path=str(workspace_root)),
subagents=[build_explore_agent_config(model=_create_dummy_model())],
restrict_to_work_dir=False,
)
with patch("openjiuwen.harness.factory.create_deep_agent", return_value=object()) as mock_create:
parent.create_subagent("explore_agent", "sub_session_id")
call_kwargs = mock_create.call_args.kwargs
assert call_kwargs["restrict_to_work_dir"] is False