import asyncio
from contextlib import contextmanager
from typing import (
Any,
AsyncIterator,
Optional,
TYPE_CHECKING,
Union,
)
import anyio
from openjiuwen.core.common.exception.codes import StatusCode
from openjiuwen.core.common.exception.errors import build_error
from openjiuwen.core.common.logging import LogEventType, runner_logger as logger
from openjiuwen.core.context_engine import ModelContext
from openjiuwen.core.runner.callback import AsyncCallbackFramework
from openjiuwen.core.runner.drunner.dmessage_queue.dsubscription.reply_topic_subscription import ReplyTopicSubscription
from openjiuwen.core.runner.drunner.dmessage_queue.message_queue_factory import MessageQueueFactory
from openjiuwen.core.runner.message_queue_base import LocalMessageQueue
from openjiuwen.core.runner.resources_manager.resource_manager import ResourceMgr
from openjiuwen.core.runner.runner_config import (
DEFAULT_RUNNER_CONFIG,
get_runner_config,
RunnerConfig,
set_runner_config,
)
from openjiuwen.core.runner.team_runner import (
_TeamRunnerClassMixin,
_TeamRunnerMixin,
)
from openjiuwen.core.session import Config
from openjiuwen.core.session.checkpointer import CheckpointerFactory
from openjiuwen.core.session.stream import BaseStreamMode
from openjiuwen.core.runner.spawn import (
MessageType,
SpawnAgentConfig,
SpawnConfig,
SpawnedProcessHandle,
spawn_process,
)
from openjiuwen.core.single_agent import (
BaseAgent,
create_agent_session,
LegacyBaseAgent,
Session as AgentSession,
)
from openjiuwen.core.single_agent.schema.agent_card import AgentCard
from openjiuwen.core.workflow import (
create_workflow_session,
generate_workflow_key,
Session as WorkflowSession,
Workflow,
)
if TYPE_CHECKING:
from openjiuwen.agent_teams.runtime import TeamRuntimeManager
class _RunnerImpl(_TeamRunnerMixin):
"""
Runner implementation class.
"""
_DEFAULT_RUNNER_ID = "global"
_DEFAULT_AGENT_SESSION_ID = "default_session"
_AGENT_CONVERSATION_ID = "conversation_id"
@staticmethod
def _get_spawn_logging_config() -> dict[str, Any]:
from openjiuwen.core.common.logging.log_config import get_log_config_snapshot
return get_log_config_snapshot()
def __init__(self, runner_id: str = _DEFAULT_RUNNER_ID, config: RunnerConfig = None):
"""
Initialize the Runner with configuration.
Args:
runner_id: Runner unique id.
config: Configuration for the runner. If None, defaults will be used.
"""
self._runner_id = runner_id
self._resource_manager = ResourceMgr()
self._message_queue = LocalMessageQueue()
if config is not None:
set_runner_config(config)
else:
set_runner_config(DEFAULT_RUNNER_CONFIG)
self.system_reply_sub: ReplyTopicSubscription | None = None
self._distribute_message_queue = None
self._callback_framework = AsyncCallbackFramework()
self._root_task_group = None
self._root_task_group_owner = None
self._root_task_group_ready = None
self._root_task_group_stop = None
self._team_runtime_manager: Optional["TeamRuntimeManager"] = None
@property
def resource_mgr(self) -> ResourceMgr:
"""Get the resource manager for workflow, agent, agent_team, tool, model, prompt..."""
return self._resource_manager
@property
def pubsub(self):
"""Get the local message queue for publish/subscribe communication."""
return self._message_queue
@property
def dist_pubsub(self):
"""Get the distributed message queue for cross-process communication."""
return self._distribute_message_queue
@property
def callback_framework(self) -> AsyncCallbackFramework:
"""Get the callback framework for asynchronous callbacks."""
return self._callback_framework
@staticmethod
def _is_remote_agent(agent_instance: Any) -> bool:
try:
from openjiuwen.core.runner.drunner.remote_client.remote_agent import RemoteAgent
except ModuleNotFoundError as exc:
if exc.name != "a2a":
raise
return False
return isinstance(agent_instance, RemoteAgent)
def get_root_task_group(self):
"""Get the runner-owned root task group, if Runner has been started."""
return self._root_task_group
async def _root_task_group_owner_loop(
self,
ready: asyncio.Event,
stop: asyncio.Event,
) -> None:
try:
async with anyio.create_task_group() as task_group:
self._root_task_group = task_group
ready.set()
await stop.wait()
task_group.cancel_scope.cancel()
finally:
self._root_task_group = None
ready.set()
async def _ensure_root_task_group(self) -> None:
if self._root_task_group is not None and self._root_task_group_owner is not None:
return
from openjiuwen.core.common.task_manager.manager import get_task_manager
get_task_manager()
self._root_task_group_ready = asyncio.Event()
self._root_task_group_stop = asyncio.Event()
self._root_task_group_owner = asyncio.create_task(
self._root_task_group_owner_loop(
self._root_task_group_ready,
self._root_task_group_stop,
)
)
await self._root_task_group_ready.wait()
if self._root_task_group is None and self._root_task_group_owner.done():
await self._root_task_group_owner
async def _close_root_task_group(self) -> None:
if self._root_task_group_owner is None:
return
from openjiuwen.core.common.task_manager.manager import get_task_manager
owner = self._root_task_group_owner
try:
await get_task_manager().cancel_all()
if self._root_task_group_stop is not None:
self._root_task_group_stop.set()
if self._root_task_group is not None:
self._root_task_group.cancel_scope.cancel()
with anyio.move_on_after(5, shield=True):
await owner
if not owner.done():
owner.cancel()
try:
with anyio.move_on_after(1, shield=True):
await owner
except asyncio.CancelledError:
pass
except Exception as e:
logger.warning(
"Failed to close runner root task group",
event_type=LogEventType.RUNNER_STOP,
runner_id=self._runner_id,
exception=e,
)
finally:
self._root_task_group = None
self._root_task_group_owner = None
self._root_task_group_ready = None
self._root_task_group_stop = None
@contextmanager
def _root_task_group_scope(self):
if self._root_task_group is None:
yield
return
from openjiuwen.core.common.task_manager.context import get_task_group, reset_task_group, set_task_group
if get_task_group() is not None:
yield
return
token = set_task_group(self._root_task_group)
try:
yield
finally:
reset_task_group(token)
def _enter_root_task_group_context(self):
"""Bind :attr:`_root_task_group` into ContextVar; return token for :meth:`_exit_root_task_group_context`.
Async generators must use this with ``try``/``finally`` instead of :meth:`_root_task_group_scope`:
``contextlib``'s sync ``yield`` + ``GeneratorExit`` can call ``ContextVar.reset`` in an invalid
context (e.g. A2A streaming on a uvicorn worker thread).
"""
if self._root_task_group is None:
return None
from openjiuwen.core.common.task_manager.context import get_task_group, set_task_group
if get_task_group() is not None:
return None
return set_task_group(self._root_task_group)
def _exit_root_task_group_context(self, token) -> None:
if token is None:
return
from openjiuwen.core.common.task_manager.context import reset_task_group
try:
reset_task_group(token)
except ValueError:
logger.debug(
"ContextVar reset skipped for root task group (invalid token context); "
"runner_id=%s",
self._runner_id,
)
def set_config(self, config: RunnerConfig):
"""Set the runner configuration with provided config object.
Args:
config: The RunnerConfig object containing configuration settings
"""
logger.info(f"set runner {self._runner_id} config {config}")
set_runner_config(config)
def get_config(self):
"""Retrieve the current runner configuration.
Returns:
RunnerConfig: The current configuration object
"""
return get_runner_config()
async def start(self) -> bool:
"""Start the runner and its associated components, such as message queue."""
result = True
logger.info("Begin to start runner", event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
await self._ensure_root_task_group()
try:
with self._root_task_group_scope():
checkpointer_config = get_runner_config().checkpointer_config
if checkpointer_config is not None:
logger.info(f"Begin to initializing checkpointer with type: {checkpointer_config.type}",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
try:
if checkpointer_config.type == "redis":
try:
from openjiuwen.extensions.checkpointer.redis import checkpointer as _
except ImportError as e:
logger.error("Redis checkpointer not available. "
"Please install redis dependencies",
event_type=LogEventType.RUNNER_START,
runner_id=self._runner_id, exception=e)
raise
checkpointer = await CheckpointerFactory.create(checkpointer_config)
CheckpointerFactory.set_default_checkpointer(checkpointer)
logger.info(f"Succeed to initializing checkpointer with type: {checkpointer_config.type}",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
except Exception as e:
logger.error(f"Failed to initializing checkpointer with type: {checkpointer_config.type}",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id, exception=e)
logger.error("Failed to start runner",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id, exception=e)
raise
if get_runner_config().distributed_mode:
self._distribute_message_queue = MessageQueueFactory.create(
get_runner_config().distributed_config.message_queue_config)
self._distribute_message_queue.start()
self.system_reply_sub = ReplyTopicSubscription(self._distribute_message_queue)
self.system_reply_sub.activate()
result = await self._message_queue.start()
if result:
logger.info("Succeed to start runner",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
else:
logger.error("Failed to start runner, message queue start failed",
event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
return result
except Exception:
await self._close_root_task_group()
raise
async def stop(self):
"""Stop the runner and clean up resources."""
logger.info("Begin to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id)
try:
with self._root_task_group_scope():
if get_runner_config().distributed_mode:
if self.system_reply_sub:
await self.system_reply_sub.deactivate()
self.system_reply_sub = None
if self._distribute_message_queue:
await self._distribute_message_queue.stop()
self._distribute_message_queue = None
result = await self._message_queue.stop()
logger.info("Succeed to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id)
return result
except Exception as e:
logger.warning("Failed to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id,
exception=e)
return False
finally:
await self._resource_manager.release()
await self._close_root_task_group()
async def run_workflow(self,
workflow: str | Workflow,
inputs: Any,
*,
session: Optional[str | WorkflowSession | AgentSession] = None,
context: ModelContext = None,
envs: Optional[dict[str, Any]] = None):
"""
Execute a workflow with given inputs.
Args:
workflow: Workflow name or Workflow instance to execute
inputs: Input data for the workflow
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides,
"""
with self._root_task_group_scope():
workflow_instance, workflow_session = await self._prepare_workflow(workflow, session)
return await workflow_instance.invoke(inputs, session=workflow_session, context=context)
async def run_workflow_streaming(self,
workflow: str | Workflow,
inputs: Any,
*,
session: Optional[str | WorkflowSession | AgentSession] = None,
context: ModelContext = None,
stream_modes: list[BaseStreamMode] = None,
envs: Optional[dict[str, Any]] = None):
"""
Execute a workflow with streaming output support.
Args:
workflow: Workflow name or Workflow instance to execute
inputs: Input data for the workflow
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration overrides
"""
token = self._enter_root_task_group_context()
try:
workflow_instance, workflow_session = await self._prepare_workflow(workflow, session)
async for chunk in workflow_instance.stream(inputs, session=workflow_session,
stream_modes=stream_modes, context=context):
yield chunk
finally:
self._exit_root_task_group_context(token)
async def run_agent(self,
agent: str | BaseAgent | LegacyBaseAgent,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: ModelContext = None,
envs: Optional[dict[str, Any]] = None,
):
"""
Execute a single agent with given inputs.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides
"""
with self._root_task_group_scope():
agent_instance, agent_session = await self._prepare_agent(agent, inputs, session)
if self._is_remote_agent(agent_instance):
res = await agent_instance.invoke(inputs)
elif isinstance(agent_instance, LegacyBaseAgent):
res = await agent_instance.invoke(inputs, session=None)
else:
res = await agent_instance.invoke(inputs, agent_session)
await agent_session.post_run()
return res
async def run_agent_streaming(self,
agent: str | BaseAgent | LegacyBaseAgent,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: ModelContext = None,
stream_modes: list[BaseStreamMode] = None,
envs: Optional[dict[str, Any]] = None):
"""
Execute a single agent with streaming output support.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration override
"""
token = self._enter_root_task_group_context()
try:
agent_instance, agent_session = await self._prepare_agent(agent, inputs, session)
if self._is_remote_agent(agent_instance):
async for chunk in agent_instance.stream(inputs):
yield chunk
elif isinstance(agent_instance, LegacyBaseAgent):
async for chunk in agent_instance.stream(inputs, session=None):
yield chunk
else:
async for chunk in agent_instance.stream(inputs, session=agent_session):
yield chunk
await agent_session.post_run()
finally:
self._exit_root_task_group_context(token)
async def release(self, session_id: str, *, force: bool = False):
"""
Release resources associated with a session.
For agent team sessions, this automatically cleans up dynamic
tables (tasks, messages, etc.) in addition to releasing the
checkpoint; non-team sessions take the simple checkpoint-only
path.
Args:
session_id: ID of the session to clean up.
force: When ``True``, stop any team still active on this
session before cleaning. Default ``False`` raises
``AGENT_TEAM_BUSY_INVALID`` so callers explicitly choose
between graceful and forced teardown.
"""
if await self._maybe_release_team_session(session_id, force=force):
return
await CheckpointerFactory.get_checkpointer().release(session_id)
@classmethod
def _is_called_by_agent(cls, session: AgentSession) -> bool:
return session and isinstance(session, AgentSession)
@classmethod
def _create_workflow_session(cls, session):
if not session:
workflow_session = create_workflow_session()
elif isinstance(session, str):
workflow_session = create_workflow_session(session_id=session)
elif isinstance(session, AgentSession):
workflow_session = session.create_workflow_session()
else:
workflow_session = session
return workflow_session
async def _prepare_agent(self, agent: Union[str, BaseAgent], inputs: Any,
session: Optional[str | AgentSession] = None):
if isinstance(session, AgentSession):
if isinstance(agent, str):
agent_instance = await self._resource_manager.get_agent(agent_id=agent)
if agent_instance is None:
raise build_error(StatusCode.RUNNER_RUN_AGENT_ERROR, agent=agent, reason="agent not exist")
await session.pre_run(inputs=inputs)
return agent_instance, session
await session.pre_run(inputs=inputs)
return agent, session
session_id = inputs.get(self._AGENT_CONVERSATION_ID,
session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID)
if isinstance(agent, str):
agent_instance = await self._resource_manager.get_agent(agent_id=agent)
if agent_instance is None:
raise build_error(StatusCode.RUNNER_RUN_AGENT_ERROR, agent=agent, reason="agent not exist")
if self._is_remote_agent(agent_instance):
if self._AGENT_CONVERSATION_ID not in inputs:
inputs[self._AGENT_CONVERSATION_ID] = session_id
return agent_instance, None
agent_session = self._create_agent_session(agent_instance, session_id)
await agent_session.pre_run(inputs=inputs)
return agent_instance, agent_session
agent_session = self._create_agent_session(agent, session_id)
await agent_session.pre_run(inputs=inputs)
return agent, agent_session
async def spawn_agent(
self,
agent_config: SpawnAgentConfig,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: ModelContext = None,
envs: Optional[dict[str, Any]] = None,
spawn_config: Optional[SpawnConfig] = None,
) -> SpawnedProcessHandle:
"""
Spawn a child process to run an agent.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides
spawn_config: Configuration for spawned process management
Returns:
SpawnedProcessHandle for managing the spawned process
"""
if not isinstance(agent_config, SpawnAgentConfig):
raise TypeError("Runner.spawn_agent now requires SpawnAgentConfig.")
normalized_inputs = inputs if isinstance(inputs, dict) else {"data": inputs}
session_id = normalized_inputs.get(
self._AGENT_CONVERSATION_ID,
session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID,
)
logging_config = (
agent_config.logging_config
if agent_config.logging_config is not None
else self._get_spawn_logging_config()
)
spawn_payload = agent_config.model_copy(update={"session_id": session_id, "logging_config": logging_config})
handle = await spawn_process(
agent_config=spawn_payload.model_dump(mode="json"),
inputs=normalized_inputs,
config=spawn_config,
)
if spawn_config is not None:
await handle.start_health_check()
return handle
async def spawn_agent_streaming(
self,
agent_config: SpawnAgentConfig,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: ModelContext = None,
stream_modes: list[BaseStreamMode] = None,
envs: Optional[dict[str, Any]] = None,
spawn_config: Optional[SpawnConfig] = None,
) -> AsyncIterator[tuple[SpawnedProcessHandle, Any]]:
"""
Spawn a child process to run an agent with streaming output.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration overrides
spawn_config: Configuration for spawned process management
Yields:
Tuples of (SpawnedProcessHandle, message) as messages arrive
"""
if not isinstance(agent_config, SpawnAgentConfig):
raise TypeError("Runner.spawn_agent_streaming now requires SpawnAgentConfig.")
normalized_inputs = inputs if isinstance(inputs, dict) else {"data": inputs}
session_id = normalized_inputs.get(
self._AGENT_CONVERSATION_ID,
session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID,
)
logging_config = (
agent_config.logging_config
if agent_config.logging_config is not None
else self._get_spawn_logging_config()
)
spawn_payload = agent_config.model_copy(update={"session_id": session_id, "logging_config": logging_config})
handle = await spawn_process(
agent_config=spawn_payload.model_dump(mode="json"),
inputs=normalized_inputs,
config=spawn_config,
)
yield handle, None
while handle.is_alive:
message = await handle.receive_message()
if message is None:
break
if message.type == MessageType.STREAM_CHUNK:
yield handle, message.payload
elif message.type == MessageType.DONE:
yield handle, message.payload
break
elif message.type == MessageType.ERROR:
yield handle, message.payload
break
elif message.type == MessageType.OUTPUT:
yield handle, message.payload
async def _prepare_workflow(self, workflow: Union[str, Workflow],
session: str | AgentSession | WorkflowSession) -> tuple[Workflow, WorkflowSession]:
if isinstance(workflow, str):
workflow_key = workflow
else:
workflow_key = generate_workflow_key(workflow.card.id, workflow.card.version)
workflow_session = self._create_workflow_session(session)
if isinstance(workflow, str):
workflow_instance = await self._resource_manager.get_workflow(workflow_id=workflow_key,
session=workflow_session)
else:
workflow_instance = workflow
return workflow_instance, workflow_session
@staticmethod
def _create_agent_session(agent, session_id):
envs = None
if hasattr(agent, "card"):
config = agent.config
card = agent.card
else:
config = agent.config()
card = AgentCard(id=config.get_agent_config().id)
if isinstance(config, Config):
envs = getattr(config, "_env", None)
agent_session = create_agent_session(session_id=session_id, envs=envs, card=card)
return agent_session
GLOBAL_RUNNER = _RunnerImpl(config=DEFAULT_RUNNER_CONFIG)
class _ClassProperty:
"""Descriptor for class-level properties."""
def __init__(self, name: str):
self.name = name
def __get__(self, obj, objtype=None):
return getattr(GLOBAL_RUNNER, self.name)
class Runner(_TeamRunnerClassMixin):
"""
Runner singleton class that proxies all calls to the global runner instance.
This class provides a singleton interface for accessing the global runner instance.
All method calls and property accesses are automatically proxied to GLOBAL_RUNNER.
Example:
>>> from openjiuwen.core.runner import Runner
>>> await Runner.start()
>>> resource_mgr = Runner.resource_mgr
>>> await Runner.run_agent(agent, inputs)
"""
resource_mgr: ResourceMgr = _ClassProperty("resource_mgr")
"""Get the resource manager for workflow, agent, agent_team, tool, model, prompt..."""
pubsub = _ClassProperty("pubsub")
"""Get the local message queue for publish/subscribe communication."""
dist_pubsub = _ClassProperty("dist_pubsub")
"""Get the distributed message queue for cross-process communication."""
system_reply_sub: ReplyTopicSubscription = _ClassProperty("system_reply_sub")
"""Get the reply topic subscription for distributed system reply messages."""
callback_framework: AsyncCallbackFramework = _ClassProperty("callback_framework")
"""Get the callback framework for asynchronous callbacks."""
@classmethod
def get_root_task_group(cls):
"""Get the runner-owned root task group."""
return GLOBAL_RUNNER.get_root_task_group()
@classmethod
def set_config(cls, config: RunnerConfig) -> None:
"""Set the runner configuration with provided config object.
Args:
config: The RunnerConfig object containing configuration settings
"""
GLOBAL_RUNNER.set_config(config)
@classmethod
def get_config(cls) -> RunnerConfig:
"""Retrieve the current runner configuration.
Returns:
RunnerConfig: The current configuration object
"""
return GLOBAL_RUNNER.get_config()
@classmethod
async def start(cls) -> bool:
"""Start the runner and its associated components, such as message queue."""
return await GLOBAL_RUNNER.start()
@classmethod
async def stop(cls):
"""Stop the runner and clean up resources."""
return await GLOBAL_RUNNER.stop()
@classmethod
async def run_workflow(
cls,
workflow: str | Workflow,
inputs: Any,
*,
session: Optional[str | WorkflowSession | AgentSession] = None,
context: Optional[ModelContext] = None,
envs: Optional[dict[str, Any]] = None
) -> Any:
"""
Execute a workflow with given inputs.
Args:
workflow: Workflow name or Workflow instance to execute
inputs: Input data for the workflow
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides
"""
return await GLOBAL_RUNNER.run_workflow(
workflow=workflow,
inputs=inputs,
session=session,
context=context,
envs=envs
)
@classmethod
async def run_workflow_streaming(
cls,
workflow: str | Workflow,
inputs: Any,
*,
session: Optional[str | WorkflowSession | AgentSession] = None,
context: Optional[ModelContext] = None,
stream_modes: Optional[list[BaseStreamMode]] = None,
envs: Optional[dict[str, Any]] = None
) -> AsyncIterator[Any]:
"""
Execute a workflow with streaming output support.
Args:
workflow: Workflow name or Workflow instance to execute
inputs: Input data for the workflow
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration overrides
"""
async for chunk in GLOBAL_RUNNER.run_workflow_streaming(
workflow=workflow,
inputs=inputs,
session=session,
context=context,
stream_modes=stream_modes,
envs=envs
):
yield chunk
@classmethod
async def run_agent(
cls,
agent: str | BaseAgent | LegacyBaseAgent,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: Optional[ModelContext] = None,
envs: Optional[dict[str, Any]] = None,
) -> Any:
"""
Execute a single agent with given inputs.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides
"""
return await GLOBAL_RUNNER.run_agent(
agent=agent,
inputs=inputs,
session=session,
context=context,
envs=envs
)
@classmethod
async def run_agent_streaming(
cls,
agent: str | BaseAgent | LegacyBaseAgent,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: Optional[ModelContext] = None,
stream_modes: Optional[list[BaseStreamMode]] = None,
envs: Optional[dict[str, Any]] = None
) -> AsyncIterator[Any]:
"""
Execute a single agent with streaming output support.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration override
"""
async for chunk in GLOBAL_RUNNER.run_agent_streaming(
agent=agent,
inputs=inputs,
session=session,
context=context,
stream_modes=stream_modes,
envs=envs
):
yield chunk
@classmethod
async def spawn_agent(
cls,
agent_config: SpawnAgentConfig,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: Optional[ModelContext] = None,
envs: Optional[dict[str, Any]] = None,
spawn_config: Optional[SpawnConfig] = None,
) -> SpawnedProcessHandle:
"""
Spawn a child process to run an agent.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
envs: Environment variables or configuration overrides
spawn_config: Configuration for spawned process management
Returns:
SpawnedProcessHandle for managing the spawned process
"""
return await GLOBAL_RUNNER.spawn_agent(
agent_config=agent_config,
inputs=inputs,
session=session,
context=context,
envs=envs,
spawn_config=spawn_config,
)
@classmethod
async def spawn_agent_streaming(
cls,
agent_config: SpawnAgentConfig,
inputs: Any,
*,
session: Optional[str | AgentSession] = None,
context: Optional[ModelContext] = None,
stream_modes: Optional[list[BaseStreamMode]] = None,
envs: Optional[dict[str, Any]] = None,
spawn_config: Optional[SpawnConfig] = None,
) -> AsyncIterator[tuple[SpawnedProcessHandle, Any]]:
"""
Spawn a child process to run an agent with streaming output.
Args:
agent: Agent name or BaseAgent instance to execute
inputs: Input data for the agent
session: Existing session ID or Session instance for context persistence
context: model context
stream_modes: Types of streaming data to output
envs: Environment variables or configuration overrides
spawn_config: Configuration for spawned process management
Yields:
Tuples of (SpawnedProcessHandle, message) as messages arrive
"""
async for handle, message in GLOBAL_RUNNER.spawn_agent_streaming(
agent_config=agent_config,
inputs=inputs,
session=session,
context=context,
stream_modes=stream_modes,
envs=envs,
spawn_config=spawn_config,
):
yield handle, message
@classmethod
async def release(cls, session_id: str, *, force: bool = False) -> None:
"""
Release resources associated with a session.
Args:
session_id: ID of the session to clean up.
force: When ``True``, stop any team still active on this
session before cleaning. See ``_RunnerImpl.release``.
"""
await GLOBAL_RUNNER.release(session_id, force=force)