"""DeepAgent implementation."""
from __future__ import annotations
import importlib
import os
import sys
import uuid
from typing import (
Any, AsyncIterator, Dict, List, Optional,
TYPE_CHECKING, Tuple, cast,
)
import asyncio
from pathlib import Path
from openjiuwen.core.common.exception.codes import StatusCode
from openjiuwen.core.common.exception.errors import build_error
from openjiuwen.core.common.logging import logger
from openjiuwen.core.common.security.user_config import UserConfig
from openjiuwen.core.runner import Runner
from openjiuwen.core.session.agent import Session
from openjiuwen.core.session.interaction.interactive_input import InteractiveInput
from openjiuwen.core.session.stream.base import StreamMode
from openjiuwen.core.single_agent.agents.react_agent import ReActAgent, ReActAgentConfig
from openjiuwen.core.single_agent.base import BaseAgent
from openjiuwen.core.single_agent.rail.base import (
AgentCallbackContext,
AgentCallbackEvent,
AgentRail,
InvokeInputs,
RunKind,
RunContext,
)
from openjiuwen.core.single_agent.schema.agent_card import AgentCard
from openjiuwen.harness.rails import DeepAgentRail
from openjiuwen.harness.rails.task_completion_rail import (
TaskCompletionRail,
)
from openjiuwen.core.foundation.tool import ToolCard
from openjiuwen.core.foundation.llm import BaseMessage, SystemMessage
from openjiuwen.harness.schema.config import DeepAgentConfig
from openjiuwen.core.controller.config import ControllerConfig
from openjiuwen.core.context_engine import ContextEngine
from openjiuwen.core.context_engine.context.context_utils import ContextUtils
from openjiuwen.core.controller.schema.event import (
TaskInteractionEvent,
FollowUpEvent,
)
from openjiuwen.harness.task_loop.task_loop_event_handler import (
TaskLoopEventHandler,
)
from openjiuwen.harness.task_loop.task_loop_event_executor import (
DEEP_TASK_TYPE,
build_deep_executor,
)
from openjiuwen.harness.task_loop.loop_coordinator import (
LoopCoordinator,
)
from openjiuwen.harness.schema.state import (
DeepAgentState,
_SESSION_RUNTIME_ATTR,
_SESSION_STATE_KEY,
)
from openjiuwen.harness.task_loop.loop_queues import (
LoopQueues,
)
from openjiuwen.harness.harness_config.loader import (
HarnessConfigLoader,
)
from openjiuwen.harness.task_loop.task_loop_controller import (
TaskLoopController,
)
from openjiuwen.harness.rails.progressive_tool_rail import ProgressiveToolRail
from openjiuwen.harness.security.factory import build_permission_interrupt_rail
from openjiuwen.harness.tools import SessionToolkit, is_free_search_enabled, is_paid_search_enabled
if TYPE_CHECKING:
from openjiuwen.core.controller.modules.event_queue import (
EventQueue,
)
from openjiuwen.harness.schema.config import SubAgentConfig
from openjiuwen.harness.prompts import (
resolve_language,
resolve_mode,
PromptSection,
SystemPromptBuilder,
)
from openjiuwen.harness.prompts.sections import SectionName
from openjiuwen.harness.prompts.sections.identity import build_identity_section
from openjiuwen.harness.workspace.workspace import Workspace
_BRIDGE_EVENTS = frozenset(
{
AgentCallbackEvent.BEFORE_MODEL_CALL,
AgentCallbackEvent.AFTER_MODEL_CALL,
AgentCallbackEvent.ON_MODEL_EXCEPTION,
AgentCallbackEvent.BEFORE_TOOL_CALL,
AgentCallbackEvent.AFTER_TOOL_CALL,
AgentCallbackEvent.ON_TOOL_EXCEPTION,
}
)
_OUTER_ONLY_EVENTS = frozenset(
{
AgentCallbackEvent.BEFORE_INVOKE,
AgentCallbackEvent.AFTER_INVOKE,
}
)
_DEEP_EVENTS = frozenset(
{
AgentCallbackEvent.BEFORE_TASK_ITERATION,
AgentCallbackEvent.AFTER_TASK_ITERATION,
}
)
_SUB_AGENTS_DIR = "sub_agents"
class DeepAgent(BaseAgent):
"""High-level agent that delegates to an internal ReActAgent."""
def __init__(self, card: AgentCard):
self._deep_config: Optional[DeepAgentConfig] = None
self._react_agent: Optional[ReActAgent] = None
self._pending_rails: List[AgentRail] = []
self._stale_rails: List[AgentRail] = []
self._registered_rails: List[AgentRail] = []
self._loop_coordinator: Optional[LoopCoordinator] = None
self._loop_controller: Optional[TaskLoopController] = None
self._loop_session: Optional[Session] = None
self._task_completion_rail: Optional[TaskCompletionRail] = None
self._initialized = False
self.system_prompt_builder: Optional[SystemPromptBuilder] = None
self._invoke_active: bool = False
self._stream_process_task: Optional[asyncio.Task] = None
self._auto_invoke_scheduled: bool = False
self._bound_session_id: Optional[str] = None
self._session_toolkit: SessionToolkit | None = None
self._pending_harness_configs: List[str] = []
super().__init__(card)
def set_session_toolkit(self, toolkit: SessionToolkit | None) -> None:
"""Attach or clear the session toolkit (wired by SubagentRail async)."""
self._session_toolkit = toolkit
def configure(self, config: DeepAgentConfig) -> "DeepAgent":
"""Apply configuration and rebuild the internal ReActAgent."""
self._filter_disabled_tools(config)
if self._deep_config is None:
self._initial_configure(config)
else:
self._hot_reconfigure(config)
self._initialized = False
return self
@staticmethod
def _filter_disabled_tools(config: DeepAgentConfig) -> None:
if config.tools is None:
return
disabled_tool_names: set[str] = set()
if not is_free_search_enabled():
disabled_tool_names.add("free_search")
if not is_paid_search_enabled():
disabled_tool_names.add("paid_search")
if not disabled_tool_names:
return
config.tools = [
card for card in config.tools
if not (isinstance(card, ToolCard) and card.name in disabled_tool_names)
]
def _unregister_tool_resource(self, card: ToolCard) -> None:
if card.name not in {"free_search", "paid_search"}:
return
if not getattr(card, "id", None):
return
if Runner.resource_mgr.get_tool(card.id) is None:
return
tags = Runner.resource_mgr.get_resource_tag(card.id) or []
if self.card.id in tags and len(tags) > 1:
result = Runner.resource_mgr.remove_resource_tag(
card.id,
self.card.id,
skip_if_tag_not_exists=True,
)
if result.is_err():
logger.warning(
"[DeepAgent] Failed to remove tool tag during hot reload: %s",
result.msg(),
)
return
if self.card.id in tags or not tags:
result = Runner.resource_mgr.remove_tool(card.id)
if result.is_err():
logger.warning(
"[DeepAgent] Failed to unregister tool during hot reload: %s",
result.msg(),
)
def _ensure_builtin_tool_resource(self, card: ToolCard, config: DeepAgentConfig) -> None:
if card.name not in {"free_search", "paid_search"}:
return
existing_tool = Runner.resource_mgr.get_tool(card.id)
if existing_tool is not None:
tag_result = Runner.resource_mgr.add_resource_tag(card.id, self.card.id)
if tag_result.is_err():
logger.warning(
"[DeepAgent] Failed to tag existing free_search during hot reload: %s",
tag_result.msg(),
)
return
from openjiuwen.harness.tools import WebFreeSearchTool, WebPaidSearchTool
tool_cls = WebPaidSearchTool if card.name == "paid_search" else WebFreeSearchTool
tool = tool_cls(language=resolve_language(config.language), card=card)
result = Runner.resource_mgr.add_tool(tool, tag=self.card.id)
if result.is_err():
logger.warning(
"[DeepAgent] Failed to register free_search during hot reload: %s",
result.msg(),
)
def _initial_configure(self, config: DeepAgentConfig) -> None:
"""First-time setup: persist config, create the inner ReActAgent, and queue rails."""
self._deep_config = config
if config.card is not None:
self.card = config.card
self._react_agent = self._create_react_agent()
self._queue_pending_rails(config)
def _hot_reconfigure(self, config: DeepAgentConfig) -> None:
"""Hot-reconfigure an already-running agent without restarting it."""
previous_config = self._deep_config
self._deep_config = config
if config.card is not None:
self.card = config.card
self._hot_reload_rails(config)
if config.model is not None:
self._hot_reload_model(config)
if config.tools is not None and self._react_agent is not None:
self._hot_reload_tools(
config,
previous_tools=(previous_config.tools if previous_config is not None else None),
)
if config.system_prompt is not None and self._react_agent is not None:
self._hot_reload_system_prompt(config)
self._queue_pending_rails(config)
self._sync_builder_to_active_rails()
def _hot_reload_rails(self, config: DeepAgentConfig) -> None:
"""Cycle stale rails out and prepare replacement rails during hot-reconfigure.
config.rails is not None - partial update: only rails whose type appears
in the new list are retired; others are retained.
config.rails is None - full replacement: all existing rails become stale.
config.rails == [] - nothing to replace; keep all existing rails.
"""
if config.rails is not None:
replacing_types = {type(r) for r in config.rails}
retained = []
for rail in self._registered_rails:
if type(rail) in replacing_types:
self._stale_rails.append(rail)
else:
retained.append(rail)
self._registered_rails = retained
self._pending_rails = [
rail for rail in self._pending_rails
if type(rail) not in replacing_types
]
else:
self._stale_rails.extend(self._registered_rails)
self._registered_rails.clear()
self._pending_rails.clear()
def _hot_reload_model(self, config: DeepAgentConfig) -> None:
"""Apply updated model/iteration config to the live inner ReActAgent.
Mutates a copy of the existing ReActAgentConfig with the new model
fields, then drives the update through react_agent.configure() so that
all side-effects (LLM reset, context-engine rebuild, etc.) fire correctly.
"""
if self._react_agent is None:
return
new_react_config = self._react_agent.config.model_copy()
if config.model is not None:
new_react_config.model_client_config = config.model.model_client_config
new_react_config.model_config_obj = config.model.model_config
if config.model.model_config is not None and config.model.model_config.model_name:
new_react_config.model_name = config.model.model_config.model_name
if config.model.model_client_config is not None:
client_cfg = config.model.model_client_config
new_react_config.api_base = client_cfg.api_base
new_react_config.api_key = client_cfg.api_key
new_react_config.model_provider = str(
client_cfg.client_provider.value
if hasattr(client_cfg.client_provider, "value")
else client_cfg.client_provider
)
new_react_config.max_iterations = (
sys.maxsize if config.enable_task_loop else config.max_iterations
)
if config.context_engine_config is not None:
new_react_config.context_engine_config = config.context_engine_config
self._react_agent.configure(new_react_config)
logger.info("[DeepAgent] Model configuration hot reloaded")
def _hot_reload_tools(
self,
config: DeepAgentConfig,
*,
previous_tools: Optional[List[ToolCard]] = None,
) -> None:
"""Sync tool cards in the shared AbilityManager during hot-reconfigure.
Tools are matched by id: a card whose id already exists in the
AbilityManager is left untouched. Cards with a new id replace any
existing entry with the same name, or are added fresh. Tools present
in the AbilityManager but absent from config.tools are removed.
MCP server registrations and other ability types are not affected.
"""
new_by_name = {card.name: card for card in (config.tools or [])}
previous_by_name = {
card.name: card for card in (previous_tools or []) if isinstance(card, ToolCard)
}
managed_tool_names = set(previous_by_name)
if not managed_tool_names:
managed_tool_names = set(new_by_name)
stale = [name for name in managed_tool_names if name not in new_by_name]
if stale:
for name in stale:
card = previous_by_name.get(name)
if card is not None:
self._unregister_tool_resource(card)
self.ability_manager.remove(stale)
for name, card in new_by_name.items():
existing = self.ability_manager.get(name)
existing_tool = existing if isinstance(existing, ToolCard) else None
if existing_tool is not None and existing_tool.id == card.id:
self._ensure_builtin_tool_resource(card, config)
continue
if existing_tool is not None:
self._unregister_tool_resource(existing_tool)
self.ability_manager.remove(name)
self.ability_manager.add(card)
self._ensure_builtin_tool_resource(card, config)
self.ability_manager.reorder_tools(list(new_by_name))
def _hot_reload_system_prompt(self, config: DeepAgentConfig) -> None:
"""Rebuild the SystemPromptBuilder and update both agents during hot-reconfigure.
The same builder object must be referenced by both DeepAgent.system_prompt_builder
and react_agent.system_prompt_builder so that rails mutating the builder before
model calls see a consistent view.
"""
language = resolve_language(config.language)
mode = resolve_mode(config.prompt_mode)
prompt_builder = SystemPromptBuilder(language=language, mode=mode)
if config.system_prompt:
prompt_builder.add_section(PromptSection(
name=SectionName.IDENTITY,
content={"cn": config.system_prompt, "en": config.system_prompt},
priority=10,
))
else:
prompt_builder.add_section(build_identity_section(language))
prompt = prompt_builder.build()
new_react_config = self._react_agent.config.model_copy()
new_react_config.prompt_template = [{"role": "system", "content": prompt}]
self._react_agent.configure(new_react_config)
self.system_prompt_builder = prompt_builder
self._react_agent.prompt_builder = prompt_builder
self._react_agent.system_prompt_builder = prompt_builder
logger.info("[DeepAgent] System prompt hot reloaded")
def _sync_builder_to_active_rails(self) -> None:
"""Push the current system_prompt_builder reference to active rails.
Syncs to _registered_rails and _stale_rails only. _pending_rails do not
need syncing because they will acquire the correct builder from the agent
during init() in _ensure_initialized.
"""
for rail in (*self._registered_rails, *self._stale_rails):
if hasattr(rail, "system_prompt_builder"):
rail.system_prompt_builder = self.system_prompt_builder
def _queue_pending_rails(self, config: DeepAgentConfig) -> None:
"""Append config-driven rails to _pending_rails for lazy registration."""
if config.rails is not None:
self._pending_rails.extend(config.rails)
self._task_completion_rail = None
if config.progressive_tool_enabled:
self._pending_rails.append(
ProgressiveToolRail(config=config)
)
if config.enable_task_loop:
self._pending_rails.append(TaskCompletionRail())
if isinstance(config.permissions, dict) and config.permissions.get("enabled"):
ws_root = None
if config.workspace is not None:
ws_root = Path(config.workspace.root_path).resolve()
model_name = None
if config.model is not None:
model_name = getattr(config.model, "model_name", None)
prail = build_permission_interrupt_rail(
permissions=config.permissions,
llm=config.model,
model_name=model_name,
host=config.permission_host,
workspace_root=ws_root,
)
if prail is not None:
self._pending_rails.append(prail)
def set_react_agent(
self,
react_agent: Any,
initialized: bool = False,
) -> "DeepAgent":
"""Inject an inner agent implementation for runtime wiring/tests."""
self._react_agent = cast(ReActAgent, react_agent)
self._initialized = initialized
return self
@property
def is_initialized(self) -> bool:
"""Whether lazy rail initialization is completed."""
return self._initialized
@property
def is_invoke_active(self) -> bool:
"""Whether is invoke_active."""
return self._invoke_active
@property
def is_auto_invoke_scheduled(self) -> bool:
"""Whether is auto_invoke_scheduled."""
return self._auto_invoke_scheduled
def set_auto_invoke_scheduled(self, is_scheduled: bool) -> None:
"""Set auto_invoke_scheduled."""
self._auto_invoke_scheduled = is_scheduled
@property
def deep_config(self) -> DeepAgentConfig:
"""deep_config carried by this agent."""
return self._deep_config
@property
def react_agent(self) -> Optional[ReActAgent]:
"""The internal ReActAgent instance."""
return self._react_agent
def _resolve_context_session_id(self, session_id: Optional[str] = None) -> str:
"""Resolve the session id used by context inspection APIs."""
resolved = (
session_id
or self._bound_session_id
or (
self._loop_session.get_session_id()
if self._loop_session is not None
else None
)
)
if not resolved:
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg="session_id is required when no context is bound.",
)
return resolved
def _get_context_or_error(
self,
session_id: Optional[str] = None,
context_id: str = "default_context_id",
):
"""Return the inner ReAct context for a session, or raise."""
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
resolved_session_id = self._resolve_context_session_id(session_id)
context = self._react_agent.context_engine.get_context(
context_id=context_id,
session_id=resolved_session_id,
)
if context is None:
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg=(
f"cannot find context '{context_id}' "
f"in session '{resolved_session_id}'"
),
)
return context
def _get_react_config(self) -> ReActAgentConfig:
"""Return inner ReActAgent config for real and test agents."""
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
config = getattr(
self._react_agent,
"config",
getattr(self._react_agent, "_config", None),
)
if config is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="Inner ReActAgent config is not available.",
)
return config
def _resolve_context_window_tokens(self) -> int:
"""Resolve the configured model context window size."""
config = self._get_react_config().context_engine_config
model_name = None
if self._react_agent is not None:
model_name = getattr(self._get_react_config(), "model_name", None)
if config is not None and getattr(config, "model_name", None):
model_name = config.model_name
return ContextUtils.resolve_context_max(
model_name=model_name,
fallback_context_window_tokens=(
config.context_window_tokens if config is not None else None
),
model_context_window_tokens=(
config.model_context_window_tokens if config is not None else None
),
)
def get_context_usage(
self,
session_id: Optional[str] = None,
context_id: str = "default_context_id",
) -> Dict[str, Any]:
"""Get current context occupancy statistics.
The underlying ``ContextStats`` already prefers the latest
``AssistantMessage.usage_metadata`` returned by the real model when
present; otherwise it falls back to local token counting.
"""
context = self._get_context_or_error(session_id, context_id)
resolved_session_id = self._resolve_context_session_id(session_id)
stats = context.statistic()
context_window_tokens = self._resolve_context_window_tokens()
usage_ratio = (
stats.total_tokens / context_window_tokens
if context_window_tokens > 0
else 0.0
)
return {
"session_id": resolved_session_id,
"context_id": context.context_id(),
"total_tokens": stats.total_tokens,
"context_window_tokens": context_window_tokens,
"usage_ratio": usage_ratio,
"usage_percent": round(usage_ratio * 100, 2),
"stats": stats.model_dump(),
}
def get_context_occupancy(
self,
session_id: Optional[str] = None,
context_id: str = "default_context_id",
) -> Dict[str, Any]:
"""Alias for ``get_context_usage``."""
return self.get_context_usage(session_id, context_id)
def get_current_context(
self,
session_id: Optional[str] = None,
context_id: str = "default_context_id",
) -> List[Any]:
"""Return current context messages for the given session/context."""
context = self._get_context_or_error(session_id, context_id)
return context.get_messages()
@staticmethod
def _normalize_context_messages(
messages: Optional[List[Any] | Any] = None,
) -> List[BaseMessage]:
"""Normalize initial message inputs for a fresh context."""
if messages is None:
return []
raw_messages = (
messages
if isinstance(messages, list)
else [messages]
)
normalized: List[BaseMessage] = []
for message in raw_messages:
if isinstance(message, BaseMessage):
normalized.append(message)
elif isinstance(message, str):
normalized.append(SystemMessage(content=message))
elif isinstance(message, dict):
normalized.append(SystemMessage(**message))
else:
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg=(
"messages must be a string, dict, "
"BaseMessage, or a list of those values."
),
)
return normalized
async def create_new_context_engine(
self,
session_id: Optional[str] = None,
messages: Optional[List[Any] | Any] = None,
) -> str:
"""Create a fresh context in the inner ContextEngine and return its session id."""
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
new_session_id = session_id or str(uuid.uuid4())
normalized_messages = self._normalize_context_messages(
messages
)
await self._react_agent.context_engine.create_context(
session=Session(session_id=new_session_id, card=self.card),
history_messages=normalized_messages,
)
return new_session_id
async def new_context_engine(
self,
session_id: Optional[str] = None,
messages: Optional[List[Any] | Any] = None,
) -> str:
"""Alias for ``create_new_context_engine``."""
return await self.create_new_context_engine(
session_id,
messages,
)
@property
def loop_coordinator(self) -> Optional[LoopCoordinator]:
"""LoopCoordinator for the outer task loop."""
return self._loop_coordinator
@property
def loop_controller(self) -> Optional[TaskLoopController]:
"""LoopController for the outer task loop."""
return self._loop_controller
@property
def event_queue(self) -> Optional["EventQueue"]:
"""EventQueue for the current task loop."""
if self._loop_controller is None:
return None
return self._loop_controller.event_queue
@property
def event_handler(self) -> Optional[TaskLoopEventHandler]:
"""EventHandler for the current task loop."""
if self._loop_controller is None:
return None
return cast(
TaskLoopEventHandler,
self._loop_controller.event_handler,
)
def _create_react_agent(self) -> ReActAgent:
"""Build the internal ReActAgent from current DeepAgentConfig."""
cfg = self._deep_config
if cfg is None:
raise build_error(
StatusCode.DEEPAGENT_CONFIG_PARAM_ERROR,
error_msg="DeepAgentConfig is required. Call configure() first.",
)
inner_card = AgentCard(
name=f"{self.card.name}_react",
description=self.card.description or "",
)
react_config = ReActAgentConfig()
react_config.max_iterations = (
sys.maxsize
if cfg.enable_task_loop
else cfg.max_iterations
)
if cfg.context_engine_config is not None:
react_config.context_engine_config = cfg.context_engine_config
react_config.workspace = cfg.workspace
language = resolve_language(cfg.language)
mode = resolve_mode(cfg.prompt_mode)
prompt_builder = SystemPromptBuilder(language=language, mode=mode)
if cfg.system_prompt:
prompt_builder.add_section(PromptSection(
name=SectionName.IDENTITY,
content={"cn": cfg.system_prompt, "en": cfg.system_prompt},
priority=10,
))
else:
prompt_builder.add_section(build_identity_section(language))
prompt = prompt_builder.build()
react_config.prompt_template = [{"role": "system", "content": prompt}]
if cfg.model is not None:
model = cfg.model
if model.model_client_config is not None:
react_config.model_client_config = model.model_client_config
if model.model_config is not None:
react_config.model_config_obj = model.model_config
if model.model_config.model_name:
react_config.model_name = model.model_config.model_name
agent = ReActAgent(inner_card)
agent.configure(react_config)
self.system_prompt_builder = prompt_builder
agent.prompt_builder = prompt_builder
agent.system_prompt_builder = prompt_builder
agent.ability_manager = self.ability_manager
if cfg.model is not None:
agent.set_llm(cfg.model)
return agent
async def _register_pending_mcps(self) -> None:
"""Register configured MCP servers before rails initialize."""
cfg = self._deep_config
if cfg is None or not cfg.mcps:
return
for mcp_config in cfg.mcps:
existing_config = Runner.resource_mgr.get_mcp_server_config(mcp_config.server_id)
if existing_config is None:
result = await Runner.resource_mgr.add_mcp_server(
mcp_config,
tag=self.card.id,
)
if result.is_err():
raise result.msg()
else:
if existing_config.model_dump() != mcp_config.model_dump():
raise build_error(
StatusCode.RESOURCE_MCP_SERVER_ADD_ERROR,
server_config=mcp_config,
reason=(
f"server_id '{mcp_config.server_id}' is already registered "
"with a different config"
),
)
tag_result = Runner.resource_mgr.add_resource_tag(
mcp_config.server_id,
self.card.id,
)
if tag_result.is_err():
raise tag_result.msg()
for tool_id in Runner.resource_mgr.get_mcp_tool_ids(mcp_config.server_id):
tag_result = Runner.resource_mgr.add_resource_tag(tool_id, self.card.id)
if tag_result.is_err():
raise tag_result.msg()
self.ability_manager.add(mcp_config)
async def _ensure_initialized(self) -> None:
"""Perform lazy async initialization."""
if self._initialized:
return
if self._deep_config and self._deep_config.workspace:
from openjiuwen.core.sys_operation.cwd import init_cwd
init_root = self._deep_config.workspace.root_path or os.getcwd()
init_cwd(
init_root,
workspace=self._deep_config.workspace.root_path,
)
await self._register_pending_mcps()
if self._needs_workspace_init():
await self.init_workspace()
for stale_rail in self._stale_rails:
if stale_rail not in self._pending_rails:
await self.unregister_rail(stale_rail)
self._stale_rails.clear()
for rail_inst in self._pending_rails:
if isinstance(rail_inst, TaskCompletionRail):
self._task_completion_rail = rail_inst
if isinstance(rail_inst, DeepAgentRail):
rail_inst.set_sys_operation(self._deep_config.sys_operation)
rail_inst.set_workspace(self._deep_config.workspace)
rail_inst.init(self)
await self._register_rail_selective(rail_inst)
self._pending_rails.clear()
self._initialized = True
def _needs_workspace_init(self) -> bool:
"""Check if workspace initialization is needed."""
config = self._deep_config
if config:
return (
config.workspace is not None
and config.sys_operation is not None
and config.auto_create_workspace
)
return False
async def ensure_initialized(self) -> None:
"""Perform lazy initialization. For testing only."""
await self._ensure_initialized()
async def init_workspace(self) -> None:
"""Initialize the workspace with directory structure and default content.
Only creates directories/files if they don't exist, so it's safe to call
multiple times.
"""
from openjiuwen.harness.workspace.directory_builder import DirectoryBuilder
if self._deep_config and self._deep_config.workspace:
root = Path(self._deep_config.workspace.root_path)
if (root / ".workspace").exists():
return
builder = DirectoryBuilder(
sys_operation=self._deep_config.sys_operation,
root_path=self._deep_config.workspace.root_path,
)
await builder.build(self._deep_config.workspace.directories)
@property
def loop_session(self) -> Optional[Session]:
"""The active loop session, or None if no session is running."""
return self._loop_session
@property
def react_config(self) -> ReActAgentConfig:
"""React agent config. For testing only."""
return self._react_agent.config
def create_subagent(self, subagent_type: str, subsession_id: str) -> "DeepAgent":
"""Create a subagent instance (shared by TaskTool and SessionSpawnExecutor).
Args:
subagent_type: Type of subagent to create (e.g., "general-purpose").
subsession_id: The session id for the subagent.
Returns:
Configured DeepAgent instance.
Raises:
BaseError: If subagent spec not found.
"""
spec = self._find_subagent_spec(subagent_type)
if spec is None:
raise build_error(
StatusCode.DEEPAGENT_CREATE_SUBAGENT_NOT_FOUND,
error_msg=f"Subagent spec not found: {subagent_type}",
)
if isinstance(spec, DeepAgent):
logger.info("already get deepagent instance, return it")
return spec
if not self._deep_config.workspace or isinstance(self._deep_config.workspace, str):
workspace_path = (
str(Path(self._deep_config.workspace) / _SUB_AGENTS_DIR / subsession_id)
if self._deep_config.workspace
else str(Path(".") / _SUB_AGENTS_DIR / subsession_id)
)
workspace = Workspace(
root_path=workspace_path,
language=self._deep_config.language
)
else:
workspace = Workspace(
root_path=Path(self._deep_config.workspace.root_path) / _SUB_AGENTS_DIR / subsession_id,
language=self._deep_config.language
)
create_kwargs = {
"model": spec.model or self._deep_config.model,
"card": spec.agent_card,
"system_prompt": spec.system_prompt,
"tools": spec.tools,
"mcps": spec.mcps,
"rails": spec.rails,
"enable_task_loop": spec.enable_task_loop,
"max_iterations": (
spec.max_iterations
if spec.max_iterations is not None
else self._deep_config.max_iterations
),
"workspace": (
spec.workspace
if spec.workspace is not None
else workspace
),
"skills": spec.skills,
"backend": (
spec.backend
if spec.backend is not None
else self._deep_config.backend
),
"sys_operation": (
spec.sys_operation
if spec.sys_operation is not None and spec.workspace is not None
else None
),
"language": (
spec.language
if spec.language is not None
else self._deep_config.language
),
"prompt_mode": (
spec.prompt_mode
if spec.prompt_mode is not None
else self._deep_config.prompt_mode
),
"subagents": None,
"enable_async_subagent": False,
"add_general_purpose_agent": False,
"enable_plan_mode": spec.enable_plan_mode,
"restrict_to_work_dir": spec.restrict_to_work_dir or self._deep_config.restrict_to_work_dir,
}
if spec.factory_name:
normalized_factory = (spec.factory_name or "").strip().lower()
if normalized_factory in {"browser_agent", "browser_runtime"}:
from openjiuwen.harness.subagents.browser_agent import (
create_browser_agent,
)
return create_browser_agent(
**create_kwargs,
**dict(spec.factory_kwargs or {}),
)
if normalized_factory == "code_agent":
from openjiuwen.harness.subagents.code_agent import (
create_code_agent,
)
return create_code_agent(
**create_kwargs,
**dict(spec.factory_kwargs or {}),
)
if normalized_factory == "research_agent":
from openjiuwen.harness.subagents.research_agent import (
create_research_agent,
)
return create_research_agent(
**create_kwargs,
**dict(spec.factory_kwargs or {}),
)
if normalized_factory in {"mobile_gui_agent", "mobile_agent"}:
from openjiuwen.harness.subagents.mobile_gui_agent import (
create_mobile_gui_agent,
)
return create_mobile_gui_agent(
**create_kwargs,
**dict(spec.factory_kwargs or {}),
)
raise build_error(
StatusCode.DEEPAGENT_CREATE_SUBAGENT_NOT_FOUND,
error_msg=f"Unsupported subagent factory: {spec.factory_name}",
)
from openjiuwen.harness.factory import create_deep_agent
return create_deep_agent(**create_kwargs, **dict(spec.factory_kwargs or {}))
def _find_subagent_spec(self, subagent_type: str) -> Optional["SubAgentConfig | DeepAgent"]:
"""Find SubAgentConfig matching subagent_type.
Args:
subagent_type: Type of subagent to find.
Returns:
Matching SubAgentConfig or DeepAgent instance, or None.
"""
if self._deep_config is None:
return None
from openjiuwen.harness.schema.config import SubAgentConfig
for spec in self._deep_config.subagents or []:
if isinstance(spec, SubAgentConfig) and spec.agent_card.name == subagent_type:
return spec
if isinstance(spec, DeepAgent):
card = getattr(spec, "card", None)
if getattr(card, "name", None) == subagent_type:
return spec
return None
def _normalize_inputs(self, inputs: Any) -> InvokeInputs:
"""Parse user inputs into typed InvokeInputs."""
if isinstance(inputs, dict):
query = inputs.get("query", "")
conversation_id = inputs.get("conversation_id")
run = inputs.get("run", {})
run_kind = None
run_context = None
if run:
kind = run.get("kind", "normal")
run_kind = RunKind(kind)
context_data = run.get("context")
if context_data:
run_context = RunContext(**context_data)
elif isinstance(inputs, str):
query = inputs
conversation_id = None
run_kind = None
run_context = None
elif isinstance(inputs, InteractiveInput):
query = inputs
conversation_id = None
run_kind = None
run_context = None
else:
raise build_error(
StatusCode.DEEPAGENT_INPUT_PARAM_ERROR,
error_msg="Input must be dict with 'query', str, or InteractiveInput.",
)
return InvokeInputs(
query=query,
conversation_id=conversation_id,
run_kind=run_kind,
run_context=run_context
)
@staticmethod
def _to_effective_inputs(invoke_inputs: InvokeInputs) -> Dict[str, Any]:
"""Convert typed invoke inputs to ReAct input dict."""
effective_inputs: Dict[str, Any] = {"query": invoke_inputs.query}
if invoke_inputs.conversation_id is not None:
effective_inputs["conversation_id"] = invoke_inputs.conversation_id
if invoke_inputs.run_kind is not None:
effective_inputs["run_kind"] = invoke_inputs.run_kind
if invoke_inputs.run_context is not None:
effective_inputs["run_context"] = invoke_inputs.run_context
return effective_inputs
@staticmethod
def _is_resume_input(invoke_inputs: InvokeInputs) -> bool:
"""Return True if the query is an InteractiveInput (interrupt resume)."""
return isinstance(invoke_inputs.query, InteractiveInput)
@staticmethod
def _result_from_stream_chunk(chunk: Any, output_parts: List[str]) -> Optional[Dict[str, Any]]:
"""Build an invoke-style result from emitted stream chunks."""
chunk_type = getattr(chunk, "type", None)
payload = getattr(chunk, "payload", None)
if isinstance(chunk, dict):
chunk_type = chunk.get("type", chunk_type)
payload = chunk.get("payload", payload)
if not isinstance(payload, dict):
return None
result: Optional[Dict[str, Any]] = None
if chunk_type == "llm_output":
content = payload.get("content")
if isinstance(content, str):
output_parts.append(content)
elif chunk_type == "answer":
result = dict(payload)
result.setdefault("result_type", "answer")
if "output" not in result:
content = result.get("content")
if isinstance(content, str):
output_parts.append(content)
result["output"] = content
else:
result = None
return result
def add_rail(self, rail: AgentRail) -> "DeepAgent":
"""Synchronously queue a rail for registration.
When a TaskCompletionRail is added it replaces any
previously queued one (default or user-provided).
"""
if isinstance(rail, TaskCompletionRail):
self._pending_rails = [
r for r in self._pending_rails
if not isinstance(r, TaskCompletionRail)
]
self._pending_rails.append(rail)
return self
def find_rails_by_type(self, rail_types: tuple[type, ...]) -> List[AgentRail]:
"""Return queued + registered rails matching any of the given types.
Public counterpart to ``strip_rails_by_type`` for callers that need to
locate a rail instance without touching DeepAgent's internal rail lists
directly.
"""
if not rail_types:
return []
return [rail for rail in (*self._pending_rails, *self._registered_rails) if isinstance(rail, rail_types)]
def strip_rails_by_type(self, rail_types: tuple[type, ...]) -> int:
"""Remove queued rails by type and mark registered ones as stale.
This provides a public API for callers that need to replace built-in rails
without touching DeepAgent's internal rail lists directly.
"""
if not rail_types:
return 0
removed = 0
before = len(self._pending_rails)
self._pending_rails = [r for r in self._pending_rails if not isinstance(r, rail_types)]
removed += before - len(self._pending_rails)
for rail in list(self._registered_rails):
if isinstance(rail, rail_types):
self._stale_rails.append(rail)
removed += 1
return removed
async def register_rail(self, rail: AgentRail) -> "DeepAgent":
"""Register a rail with selective routing."""
if isinstance(rail, TaskCompletionRail):
self._task_completion_rail = rail
if isinstance(rail, DeepAgentRail):
rail.set_sys_operation(self.deep_config.sys_operation)
rail.set_workspace(self.deep_config.workspace)
rail.init(self)
await self._register_rail_selective(rail)
return self
async def unregister_rail(self, rail: AgentRail) -> "DeepAgent":
"""Unregister a rail from pending/outer/inner."""
self._pending_rails = [queued for queued in self._pending_rails if queued is not rail]
self._registered_rails = [r for r in self._registered_rails if r is not rail]
if isinstance(rail, TaskCompletionRail) and self._task_completion_rail is rail:
self._task_completion_rail = None
await self._agent_callback_manager.unregister_rail(rail, self)
if self._react_agent is not None:
await self._react_agent.agent_callback_manager.unregister_rail(
rail, self._react_agent
)
rail.uninit(self)
return self
async def load_harness_config(
self,
config_path: str,
) -> list[str]:
"""Hot-load resources from a harness_config.yaml.
Parses the config and registers any declared rails,
tools, and skills onto this agent instance.
Args:
config_path: Absolute path to harness_config.yaml.
Returns:
List of human-readable names of loaded resources.
"""
from openjiuwen.harness.harness_config.builder import (
_resolve_rails,
_resolve_tools,
)
resolved = HarnessConfigLoader.load(config_path)
resources = resolved.config.resources
if not resources:
return []
loaded: list[str] = []
config_path_obj = Path(config_path).resolve()
runtime_ext = self._runtime_extension_artifact_for_config(
config_path_obj,
resources,
)
if resources.rails:
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_rails,
)
rail_classes = load_runtime_rails(
runtime_ext,
session_id=self._runtime_extension_session_id(),
)
rails = [rail_cls() for rail_cls in rail_classes]
else:
rails = _resolve_rails(resources)
for rail in rails:
await self.register_rail(rail)
loaded.append(
f"rail:{type(rail).__name__}"
)
if resources.tools:
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_tools,
)
tool_classes = load_runtime_tools(
runtime_ext,
session_id=self._runtime_extension_session_id(),
)
tools = [tool_cls() for tool_cls in tool_classes]
else:
sys_op = self.deep_config.sys_operation
tools = _resolve_tools(
resources, sys_op
)
for tool in tools:
existing = Runner.resource_mgr.get_tool(
tool.card.id
)
if existing is None:
Runner.resource_mgr.add_tool(tool)
self.ability_manager.add(tool.card)
loaded.append(
f"tool:{type(tool).__name__}"
)
if resources.skills and resources.skills.dirs:
from openjiuwen.harness.rails.skills.skill_use_rail import (
SkillUseRail,
)
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_skill_dirs,
)
skill_dirs = load_runtime_skill_dirs(runtime_ext)
else:
source_dir = config_path_obj.parent
skill_dirs = [
str((source_dir / d).resolve())
for d in resources.skills.dirs
]
existing_skill_rail: (
SkillUseRail | None
) = None
for r in (*self._registered_rails, *self._pending_rails):
if isinstance(r, SkillUseRail):
existing_skill_rail = r
break
if existing_skill_rail is not None:
cur = existing_skill_rail.skills_dir
if isinstance(cur, str):
existing_skill_rail.skills_dir = [
*skill_dirs,
cur,
]
else:
existing = list(cur)
existing = [
d
for d in existing
if d not in skill_dirs
]
existing_skill_rail.skills_dir = [
*skill_dirs,
*existing,
]
existing_skill_rail.enable_cache = False
if existing_skill_rail.sys_operation is None:
existing_skill_rail.set_sys_operation(
self.deep_config.sys_operation
)
await existing_skill_rail.reload_skills()
else:
mode = (
resources.skills.mode or "all"
)
new_rail = SkillUseRail(
skills_dir=skill_dirs,
skill_mode=mode,
)
await self.register_rail(new_rail)
await new_rail.reload_skills()
for sd in skill_dirs:
loaded.append(f"skill_dir:{sd}")
return loaded
async def unload_harness_config(
self,
config_path: str,
) -> list[str]:
"""Unload resources declared in a harness_config.yaml.
Parses the config and removes rails, tools, and skill directories
that match the declarations in the config file.
This method does NOT track which resources were previously loaded.
Instead, it re-parses the config file and removes matching resources
by type/name/module. This means:
- Rails are removed by matching type (class name and module)
- Tools are removed by matching card.id and card.name
- Skill dirs are removed from SkillUseRail if present
Args:
config_path: Absolute path to harness_config.yaml.
Returns:
List of human-readable names of unloaded resources.
Raises:
FileNotFoundError: If the config file does not exist.
ValueError: If parsing the config fails.
"""
config_path_obj = Path(config_path).resolve()
if not config_path_obj.exists():
raise FileNotFoundError(
f"Harness config file not found: {config_path_obj}"
)
resolved = HarnessConfigLoader.load(config_path)
resources = resolved.config.resources
if not resources:
return []
unloaded: list[str] = []
runtime_ext = self._runtime_extension_artifact_for_config(
config_path_obj,
resources,
)
if resources.rails:
rail_types_to_remove: set[type] = set()
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_rails,
)
rail_classes = load_runtime_rails(
runtime_ext,
session_id=self._runtime_extension_session_id(),
)
rail_types_to_remove = set(rail_classes)
else:
for spec in resources.rails:
if spec.type == "builtin" and spec.name:
from openjiuwen.harness.harness_config.builder import (
_BUILTIN_RAIL_REGISTRY,
_load_dotted_path,
)
dotted = _BUILTIN_RAIL_REGISTRY.get(spec.name)
if dotted:
rail_types_to_remove.add(_load_dotted_path(dotted))
elif spec.type == "package" and spec.module and spec.class_name:
dotted = f"{spec.module}.{spec.class_name}"
from openjiuwen.harness.harness_config.builder import (
_load_dotted_path,
)
rail_types_to_remove.add(_load_dotted_path(dotted))
elif spec.type == "entry_point" and spec.name:
from openjiuwen.harness.harness_config.builder import (
_load_from_entry_point,
)
rail_types_to_remove.add(
_load_from_entry_point(spec.name, "openjiuwen.rail")
)
for rail in list(self._registered_rails):
if type(rail) in rail_types_to_remove:
await self.unregister_rail(rail)
unloaded.append(f"rail:{type(rail).__name__}")
if resources.tools:
tool_ids_to_remove: list[str] = []
tool_names_to_remove: list[str] = []
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_tools,
)
tool_classes = load_runtime_tools(
runtime_ext,
session_id=self._runtime_extension_session_id(),
)
for cls in tool_classes:
tool_instance = cls()
tool_ids_to_remove.append(tool_instance.card.id)
tool_names_to_remove.append(tool_instance.card.name)
else:
for spec in resources.tools:
if spec.type == "builtin":
from openjiuwen.harness.harness_config.builder import (
_BUILTIN_TOOL_GROUPS,
)
names = spec.names or ([spec.name] if spec.name else [])
for group in names:
entry = _BUILTIN_TOOL_GROUPS.get(group)
if entry:
module_path, class_names, _ = entry
mod = importlib.import_module(module_path)
for cls_name in class_names:
cls = getattr(mod, cls_name)
tool_instance = cls()
tool_ids_to_remove.append(tool_instance.card.id)
tool_names_to_remove.append(tool_instance.card.name)
elif spec.type == "package" and spec.module and spec.class_name:
from openjiuwen.harness.harness_config.builder import (
_load_dotted_path,
)
cls = _load_dotted_path(f"{spec.module}.{spec.class_name}")
tool_instance = cls()
tool_ids_to_remove.append(tool_instance.card.id)
tool_names_to_remove.append(tool_instance.card.name)
elif spec.type == "entry_point" and spec.name:
from openjiuwen.harness.harness_config.builder import (
_load_from_entry_point,
)
cls = _load_from_entry_point(spec.name, "openjiuwen.tool")
tool_instance = cls()
tool_ids_to_remove.append(tool_instance.card.id)
tool_names_to_remove.append(tool_instance.card.name)
for tool_id in tool_ids_to_remove:
Runner.resource_mgr.remove_tool(tool_id)
unloaded.append(f"tool_id:{tool_id}")
for tool_name in tool_names_to_remove:
self.ability_manager.remove(tool_name)
unloaded.append(f"tool:{tool_name}")
if resources.skills and resources.skills.dirs:
from openjiuwen.harness.rails.skills.skill_use_rail import (
SkillUseRail,
)
if runtime_ext is not None:
from openjiuwen.auto_harness.infra.runtime_extension_loader import (
load_runtime_skill_dirs,
)
skill_dirs = load_runtime_skill_dirs(runtime_ext)
else:
source_dir = config_path_obj.parent
skill_dirs = [
str((source_dir / d).resolve())
for d in resources.skills.dirs
]
for r in self._registered_rails:
if isinstance(r, SkillUseRail):
cur = r.skills_dir
if isinstance(cur, str):
if cur in skill_dirs:
r.skills_dir = []
else:
remaining = [
d for d in cur if d not in skill_dirs
]
r.skills_dir = remaining
if not r.skills_dir:
r.clear_skills()
else:
original_cache = r.enable_cache
r.enable_cache = False
try:
await r.reload_skills()
finally:
r.enable_cache = original_cache
break
for sd in skill_dirs:
unloaded.append(f"skill_dir:{sd}")
return unloaded
def _runtime_extension_session_id(self) -> str:
"""Return a stable namespace key for runtime extension imports."""
return (
self._bound_session_id
or self.card.id
or self.card.name
or "deep_agent"
)
@staticmethod
def _runtime_extension_artifact_for_config(
config_path: Path,
resources: Any,
) -> Any | None:
"""Build runtime extension metadata for auto-harness package configs.
Runtime extensions are stored outside the installed Python package but
still declare modules under ``openjiuwen.extensions.harness.<name>``.
The normal harness_config package resolver cannot import those modules,
so they must be loaded through the runtime extension loader.
"""
extension_name = config_path.parent.name
prefix = f"openjiuwen.extensions.harness.{extension_name}"
modules: list[str] = []
for spec in [
*(resources.rails or []),
*(resources.tools or []),
]:
module = getattr(spec, "module", None)
if (
getattr(spec, "type", None) == "package"
and isinstance(module, str)
):
modules.append(module)
if not any(
module == prefix or module.startswith(f"{prefix}.")
for module in modules
):
return None
from openjiuwen.auto_harness.schema import RuntimeExtensionArtifact
return RuntimeExtensionArtifact(
extension_name=extension_name,
runtime_path=str(config_path.parent),
config_path=str(config_path),
)
def enqueue_harness_config(
self, config_path: str,
) -> None:
"""Schedule a harness_config.yaml for loading on next stream() call."""
self._pending_harness_configs.append(config_path)
async def _drain_pending_harness_configs(
self,
) -> None:
"""Load any enqueued harness configs before processing a query."""
while self._pending_harness_configs:
path = self._pending_harness_configs.pop(0)
try:
loaded = await self.load_harness_config(path)
logger.info(
"Auto-loaded harness config %s: %s",
path,
loaded,
)
except Exception:
logger.exception(
"Failed to load harness config: %s",
path,
)
async def _register_rail_selective(self, rail: AgentRail) -> None:
"""Route rail callbacks to the correct agent and record the rail as registered."""
callbacks = rail.get_callbacks()
for event, callback in callbacks.items():
if event in _BRIDGE_EVENTS:
if self._react_agent is not None:
await self._react_agent.register_callback(event, callback, rail.priority)
continue
if event in _OUTER_ONLY_EVENTS or event in _DEEP_EVENTS:
await self.register_callback(event, callback, rail.priority)
continue
logger.warning(
f"Unknown rail event {event}, registering on outer DeepAgent"
)
await self.register_callback(event, callback, rail.priority)
self._registered_rails.append(rail)
async def _run_single_round_invoke(
self, ctx: AgentCallbackContext, session: Optional[Session]
) -> Dict[str, Any]:
"""Invoke inner ReActAgent exactly once."""
modified = ctx.inputs
if not isinstance(modified, InvokeInputs):
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg="ctx.inputs must be InvokeInputs for single-round invoke.",
)
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
return await self._react_agent.invoke(
self._to_effective_inputs(modified),
session,
)
async def _setup_task_loop(
self,
session: Session,
) -> Tuple[LoopCoordinator, TaskLoopController]:
"""Create or reuse Controller infrastructure for a task loop.
Sets instance attributes and returns the key objects
needed by the loop body.
Args:
session: Current session.
Returns:
(coordinator, controller)
"""
session_id = session.get_session_id()
if (
self._loop_controller is not None
and self._bound_session_id == session_id
):
coordinator = self._loop_coordinator
if coordinator is None:
evaluators = (
self._task_completion_rail.build_evaluators()
if self._task_completion_rail is not None
else []
)
coordinator = LoopCoordinator(evaluators=evaluators)
self._loop_coordinator = coordinator
coordinator.reset()
return coordinator, self._loop_controller
if self._loop_controller is not None:
await self._force_cleanup_controller()
evaluators = (
self._task_completion_rail.build_evaluators()
if self._task_completion_rail is not None
else []
)
coordinator = LoopCoordinator(evaluators=evaluators)
coordinator.reset()
queues = LoopQueues()
config = ControllerConfig(suppress_completion_signal=True)
context_engine = ContextEngine()
controller = TaskLoopController()
controller.init(
card=self.card,
config=config,
ability_manager=self.ability_manager,
context_engine=context_engine,
)
from openjiuwen.harness.task_loop.session_spawn_executor import (
SESSION_SPAWN_TASK_TYPE,
build_session_spawn_executor,
)
controller.add_task_executor(
DEEP_TASK_TYPE, build_deep_executor(self),
).add_task_executor(
SESSION_SPAWN_TASK_TYPE, build_session_spawn_executor(self)
)
handler = TaskLoopEventHandler(self)
handler.interaction_queues = queues
controller.set_event_handler(handler)
toolkit = self._session_toolkit
if toolkit is not None and hasattr(handler, "set_session_toolkit"):
handler.set_session_toolkit(toolkit)
self._loop_coordinator = coordinator
self._loop_controller = controller
self._loop_session = session
return coordinator, controller
@staticmethod
def _read_runtime_state(
session: Session,
) -> Optional[DeepAgentState]:
"""Read the cached runtime state from session."""
state = getattr(session, _SESSION_RUNTIME_ATTR, None)
if state is None:
return None
if not isinstance(state, DeepAgentState):
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg=(
"Invalid deepagent runtime state "
"type on session."
),
)
return state
@staticmethod
def _write_runtime_state(
session: Session,
state: DeepAgentState,
) -> None:
"""Write runtime state to session cache."""
setattr(session, _SESSION_RUNTIME_ATTR, state)
@staticmethod
def _clear_runtime_state(session: Session) -> None:
"""Clear runtime state from session cache."""
if hasattr(session, _SESSION_RUNTIME_ATTR):
delattr(session, _SESSION_RUNTIME_ATTR)
def load_state(self, session: Session) -> DeepAgentState:
"""Load deepagent runtime state from session.
Returns the cached runtime object when available;
otherwise loads from persisted session state and
primes the cache.
Args:
session: Current session.
Returns:
Current DeepAgentState (never None).
"""
state = self._read_runtime_state(session)
if state is not None:
return state
data = session.get_state(_SESSION_STATE_KEY)
loaded = (
DeepAgentState.from_session_dict(data)
if isinstance(data, dict)
else DeepAgentState()
)
self._write_runtime_state(session, loaded)
return loaded
def save_state(
self,
session: Session,
state: Optional[DeepAgentState] = None,
) -> None:
"""Persist deepagent state to session.
When ``state`` is provided it becomes the new
runtime snapshot; otherwise the currently cached
state is persisted.
Args:
session: Current session.
state: State to save; if None the cached
state is used.
"""
target = (
state
if state is not None
else self._read_runtime_state(session)
)
if target is None:
return
self._write_runtime_state(session, target)
session.update_state(
{_SESSION_STATE_KEY: target.to_session_dict()}
)
def clear_state(
self,
session: Session,
clear_persisted: bool = False,
) -> None:
"""Clear deepagent runtime cache from session.
Args:
session: Current session.
clear_persisted: When True, also clears the
persisted session snapshot.
"""
self._clear_runtime_state(session)
if clear_persisted:
session.update_state({_SESSION_STATE_KEY: None})
def switch_mode(self, session: Session, mode: str) -> None:
"""Switch agent mode, updating session-scoped PlanModeState.
Args:
session: Current session.
mode: Target mode string — ``"plan"`` or ``"auto"``.
"""
state = self.load_state(session)
if state.plan_mode.mode == mode:
state.plan_mode.pre_plan_mode = state.plan_mode.mode
logger.info("[DeepAgent] mode in state is the same, no need to switch, mode: {}".format(mode))
return
state.plan_mode.pre_plan_mode = state.plan_mode.mode
state.plan_mode.mode = mode
self.save_state(session, state)
def restore_mode_after_plan_exit(self, session: Session) -> None:
"""Restore the mode that was active before entering plan mode.
Does **not** clear ``plan_slug`` so that the execution phase can
still reference the plan file.
Args:
session: Current session.
"""
state = self.load_state(session)
state.plan_mode.mode = state.plan_mode.pre_plan_mode or "normal"
state.plan_mode.pre_plan_mode = None
self.save_state(session, state)
def get_plan_file_path(self, session: Session) -> Path | None:
"""Derive the plan file path from the slug stored in session state.
Args:
session: Current session.
Returns:
Resolved ``Path`` to the plan Markdown file, or ``None`` if
no slug has been set or the workspace is unavailable.
"""
from openjiuwen.harness.tools import resolve_plan_file_path
state = self.load_state(session)
slug = state.plan_mode.plan_slug
if not slug or not self._deep_config or not self._deep_config.workspace:
return None
return resolve_plan_file_path(
self._deep_config.workspace.root_path, slug
)
def _has_remaining_tasks(self, session: Session) -> bool:
"""Check whether the task plan still has pending tasks."""
st = self.load_state(session)
if st.task_plan is None:
return False
return st.task_plan.get_next_task() is not None
def _has_pending_session_spawn(self) -> bool:
"""Check if there are pending SESSION_SPAWN tasks.
Returns:
True if there are SUBMITTED/WORKING SESSION_SPAWN tasks.
"""
if self._session_toolkit is None:
return False
pending_tasks = self._session_toolkit.list_all()
return any(r.status == "running" for r in pending_tasks)
async def _force_cleanup_controller(self) -> None:
"""Force cleanup of existing controller (on session switch)."""
if self._loop_controller is None:
return
self._log_loop("forcing controller cleanup due to session switch")
if self._loop_session is not None:
try:
await self._loop_controller.unbind_session(self._loop_session)
except Exception as e:
logger.warning(f"unbind_session failed during force cleanup: {e}")
try:
await self._loop_controller.stop()
except Exception as e:
logger.warning(f"controller.stop() failed during force cleanup: {e}")
self._loop_coordinator = None
self._loop_controller = None
self._loop_session = None
self._bound_session_id = None
@staticmethod
def _log_loop(msg: str, detail: str = "") -> None:
"""Log an outer-loop event respecting sensitivity."""
if UserConfig.is_sensitive():
logger.info(f"[OuterLoop] {msg}")
else:
logger.info(f"[OuterLoop] {msg}{detail}")
async def schedule_auto_invoke_on_spawn_done(
self,
query: str,
delay: float = 0.5,
) -> None:
"""Schedule delayed auto-invoke after SESSION_SPAWN completes (no active invoke).
Called by TaskLoopEventHandler. Waits ``delay`` to merge concurrent completions,
then invokes with a summary prompt if still idle and ``_loop_session`` is set.
Args:
query: The query to invoke with.
delay: Delay in seconds to merge multiple concurrent spawn completions.
"""
await asyncio.sleep(delay)
self._auto_invoke_scheduled = False
if self._invoke_active:
return
if self._loop_session is None:
logger.warning("[AutoInvoke] session was cleaned up during delay, skipping")
return
try:
await self.invoke(
{"query": query},
session=self._loop_session,
)
except Exception as e:
logger.error(f"[AutoInvoke] auto-invoke failed: {e}", exc_info=True)
async def _run_task_loop(
self, ctx: AgentCallbackContext, session: Session,
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for the outer task loop. Shared by invoke and stream.
Args:
ctx: Callback context with InvokeInputs.
session: Current session.
Yields:
Result dict from each iteration.
"""
modified = ctx.inputs
if not isinstance(modified, InvokeInputs):
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg="ctx.inputs must be InvokeInputs for task-loop.",
)
coordinator, controller = await self._setup_task_loop(session)
timeout = self._deep_config.completion_timeout if self._deep_config else 600.0
session_id = session.get_session_id()
if self._bound_session_id != session_id:
await controller.bind_session(session)
self._bound_session_id = session_id
try:
current_query = modified.query
outer_round = 0
while coordinator.should_continue():
outer_round += 1
new_follow_ups = controller.drain_follow_up()
_state = self.load_state(session)
if new_follow_ups:
_state.pending_follow_ups.extend(
new_follow_ups
)
is_follow_up = bool(
_state.pending_follow_ups
)
if _state.pending_follow_ups:
current_query = (
_state.pending_follow_ups.pop(0)
)
self.save_state(session, _state)
query_preview = str(current_query)[:120]
self._log_loop(
f"round={outer_round} started",
f", query={query_preview}",
)
await controller.submit_round(
session, current_query,
is_follow_up=is_follow_up,
run_kind=modified.run_kind,
run_context=modified.run_context,
)
result = await controller.wait_round_completion(timeout=timeout)
result_type = result.get("result_type", "N/A")
output_preview = str(result.get("output", ""))[:200]
self._log_loop(
f"round={outer_round} completed, result_type={result_type}",
f", output={output_preview}",
)
yield result
coordinator.increment_iteration()
coordinator.set_last_result(result)
_state = self.load_state(session)
_state.stop_condition_state = coordinator.get_state()
self.save_state(session, _state)
if result.get("result_type") == "interrupt":
self._log_loop(
f"round={outer_round} interrupted"
)
break
if coordinator.is_aborted:
self._log_loop(f"round={outer_round} aborted")
break
if (
controller.has_follow_up()
or _state.pending_follow_ups
):
continue
if not self._has_remaining_tasks(session):
self._log_loop("no remaining tasks, loop finished")
break
current_query = modified.query
stop_reason = coordinator.stop_reason
if stop_reason:
self._log_loop(
f"loop stopped by: {stop_reason}"
)
finally:
_state = self.load_state(session)
_state.stop_condition_state = None
self.save_state(session, _state)
if self._has_pending_session_spawn():
self._log_loop("pending SESSION_SPAWN tasks, controller kept alive")
else:
await controller.unbind_session(session)
await controller.stop()
self._loop_coordinator = None
self._loop_controller = None
self._loop_session = None
self._bound_session_id = None
self._log_loop("all tasks completed, controller cleaned up")
async def _run_task_loop_invoke(
self,
ctx: AgentCallbackContext,
session: Optional[Session],
) -> Dict[str, Any]:
"""Run the outer task loop, return last result.
Args:
ctx: Callback context with InvokeInputs.
session: Current session.
Returns:
Result dict from the last iteration.
"""
if session is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg=(
"session is required for "
"task-loop mode."
),
)
last_result: Dict[str, Any] = {}
try:
async for result in self._run_task_loop(
ctx, session
):
last_result = result
except Exception as e:
logger.error(f"Task loop invoke error: {e}", exc_info=True)
raise
return last_result
async def _run_task_loop_stream(
self,
ctx: AgentCallbackContext,
session: Optional[Session],
stream_modes: Optional[List[StreamMode]],
) -> AsyncIterator[Any]:
"""Stream the outer task loop with per-token chunks.
Uses the same coroutine pattern as ReActAgent.stream():
background task runs _run_task_loop (which triggers
invoke with _streaming=True), foreground reads from
session.stream_iterator().
Session lifecycle (pre_run/post_run) is managed by the
caller (Runner or direct caller), not by this method.
Only the stream emitter is closed here to send END_FRAME
and unblock stream_iterator().
Args:
ctx: Callback context with InvokeInputs.
session: Current session.
stream_modes: Stream mode filters.
Yields:
OutputSchema chunks (llm_reasoning / llm_output / answer).
"""
_ = stream_modes
if session is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg=(
"session is required for "
"task-loop mode."
),
)
async def _stream_process() -> None:
try:
async for result in self._run_task_loop(ctx, session):
await self._write_round_result_to_stream(result, session)
except Exception as e:
logger.error(f"Task loop stream error: {e}", exc_info=True)
error_result = {"output": str(e), "result_type": "error"}
await self._write_round_result_to_stream(error_result, session)
finally:
await session.close_stream()
task = asyncio.create_task(_stream_process())
self._stream_process_task = task
try:
async for chunk in session.stream_iterator():
yield chunk
await task
except asyncio.CancelledError:
await self._cancel_stream_process_task()
raise
finally:
if self._stream_process_task is task:
self._stream_process_task = None
async def _write_round_result_to_stream(
self,
result: Dict[str, Any],
session: Session,
) -> None:
"""Write a task-loop round result to the session stream.
Delegates to the inner ReActAgent which already handles
normal answers, HITL interrupts, and workflow interrupts
consistently.
Args:
result: Result dict from the inner ReActAgent.
session: Current session to write into.
"""
if self._react_agent is not None:
await self._react_agent.write_invoke_result_to_stream(
result, session
)
async def _run_single_round_stream(
self,
ctx: AgentCallbackContext,
session: Optional[Session],
stream_modes: Optional[List[StreamMode]],
) -> AsyncIterator[Any]:
"""Stream from inner ReActAgent exactly once."""
modified = ctx.inputs
if not isinstance(modified, InvokeInputs):
raise build_error(
StatusCode.DEEPAGENT_CONTEXT_PARAM_ERROR,
error_msg="ctx.inputs must be InvokeInputs for single-round stream.",
)
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
async for chunk in self._react_agent.stream(
self._to_effective_inputs(modified),
session,
stream_modes,
):
yield chunk
async def invoke(
self,
inputs: Any,
session: Optional[Session] = None,
) -> Dict[str, Any]:
"""Execute DeepAgent in single-round or task-loop mode."""
await self._ensure_initialized()
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
invoke_inputs = self._normalize_inputs(inputs)
ctx = AgentCallbackContext(agent=self, inputs=invoke_inputs, session=session)
self._invoke_active = True
try:
result: Dict[str, Any]
async with ctx.lifecycle(
AgentCallbackEvent.BEFORE_INVOKE,
AgentCallbackEvent.AFTER_INVOKE,
):
if (
self._deep_config is not None
and self._deep_config.enable_task_loop
and not self._is_resume_input(invoke_inputs)
):
result = await self._run_task_loop_invoke(ctx, session)
else:
result = await self._run_single_round_invoke(ctx, session)
invoke_inputs.result = result
if session is not None:
self.save_state(session)
self.clear_state(session)
return result
finally:
self._invoke_active = False
async def stream(
self,
inputs: Any,
session: Optional[Session] = None,
stream_modes: Optional[List[StreamMode]] = None,
) -> AsyncIterator[Any]:
"""Stream execute DeepAgent in single-round or task-loop mode."""
await self._ensure_initialized()
await self._drain_pending_harness_configs()
if self._react_agent is None:
raise build_error(
StatusCode.DEEPAGENT_RUNTIME_ERROR,
error_msg="DeepAgent not configured. Call configure() first.",
)
invoke_inputs = self._normalize_inputs(inputs)
ctx = AgentCallbackContext(agent=self, inputs=invoke_inputs, session=session)
self._invoke_active = True
try:
stream_result: Optional[Dict[str, Any]] = None
stream_output_parts: List[str] = []
async with ctx.lifecycle(
AgentCallbackEvent.BEFORE_INVOKE,
AgentCallbackEvent.AFTER_INVOKE,
):
if (
self._deep_config is not None
and self._deep_config.enable_task_loop
and not self._is_resume_input(invoke_inputs)
):
async for chunk in self._run_task_loop_stream(
ctx, session, stream_modes
):
chunk_result = self._result_from_stream_chunk(
chunk, stream_output_parts
)
if chunk_result is not None:
stream_result = chunk_result
yield chunk
else:
async for chunk in self._run_single_round_stream(
ctx, session, stream_modes
):
chunk_result = self._result_from_stream_chunk(
chunk, stream_output_parts
)
if chunk_result is not None:
stream_result = chunk_result
yield chunk
if stream_result is None and stream_output_parts:
stream_result = {
"output": "".join(stream_output_parts),
"result_type": "answer",
}
if stream_result is not None:
invoke_inputs.result = stream_result
if session is not None:
self.save_state(session)
self.clear_state(session)
finally:
self._invoke_active = False
async def follow_up(
self,
msg: str,
task_id: Optional[str] = None,
session: Optional[Session] = None,
) -> None:
"""Publish a FollowUpEvent to the FOLLOW_UP topic.
The event is handled concurrently with INPUT
and does not block the current iteration.
The handler pushes the message into
LoopQueues.follow_up, which the outer
loop drains after each iteration.
Args:
msg: Follow-up message text.
task_id: Optional task to associate.
session: Current session.
"""
controller = self._loop_controller
if controller is None:
return
sess = session or self._loop_session
if sess is None:
return
event = FollowUpEvent.from_text(msg)
if task_id:
event.metadata = {"task_id": task_id}
await controller.event_queue.publish_event_async(
self.card.id, sess, event
)
async def steer(
self,
msg: str,
session: Optional[Session] = None,
) -> None:
"""Publish a TaskInteractionEvent to TASK_INTERACTION topic.
The event is handled concurrently with INPUT.
The handler pushes the message into
LoopQueues.steering, which the executor
drains before the next inner invoke.
Args:
msg: Steering instruction text.
session: Current session.
"""
controller = self._loop_controller
if controller is None:
return
sess = session or self._loop_session
if sess is None:
return
from openjiuwen.core.controller.schema.dataframe import (
TextDataFrame,
)
event = TaskInteractionEvent(
interaction=[TextDataFrame(text=msg)]
)
await controller.event_queue.publish_event_async(
self.card.id, sess, event
)
async def _cancel_stream_process_task(self) -> None:
"""Cancel the in-flight task-loop stream background task, if any."""
task = self._stream_process_task
if task is None or task.done():
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
logger.debug(
"stream process task raised during cancel",
exc_info=True,
)
async def abort(
self,
session: Optional[Session] = None,
) -> None:
"""Request immediate abort of the task loop.
Sets the abort flag on the coordinator and
calls on_abort() on the event handler so the
outer loop can exit promptly. Also cancels the
background ``_stream_process`` task when streaming,
so consumer disconnect / interrupt stops promptly
instead of leaving a zombie ReAct loop running.
Args:
session: Current session (unused).
"""
_ = session
coordinator = self._loop_coordinator
controller = self._loop_controller
if coordinator is not None and controller is not None:
coordinator.request_abort()
handler = cast(
TaskLoopEventHandler,
controller.event_handler,
)
await handler.on_abort()
await self._cancel_stream_process_task()
__all__ = [
"DeepAgent",
"DeepAgentConfig",
]