# -*- coding: UTF-8 -*-
# Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.

import asyncio
from contextlib import contextmanager
from typing import (
    Any,
    AsyncIterator,
    Optional,
    TYPE_CHECKING,
    Union,
)

import anyio
from openjiuwen.core.common.exception.codes import StatusCode
from openjiuwen.core.common.exception.errors import build_error
from openjiuwen.core.common.logging import LogEventType, runner_logger as logger
from openjiuwen.core.context_engine import ModelContext
from openjiuwen.core.runner.callback import AsyncCallbackFramework
from openjiuwen.core.runner.drunner.dmessage_queue.dsubscription.reply_topic_subscription import ReplyTopicSubscription
from openjiuwen.core.runner.drunner.dmessage_queue.message_queue_factory import MessageQueueFactory
from openjiuwen.core.runner.message_queue_base import LocalMessageQueue
from openjiuwen.core.runner.resources_manager.resource_manager import ResourceMgr
from openjiuwen.core.runner.runner_config import (
    DEFAULT_RUNNER_CONFIG,
    get_runner_config,
    RunnerConfig,
    set_runner_config,
)
from openjiuwen.core.runner.team_runner import (
    _TeamRunnerClassMixin,
    _TeamRunnerMixin,
)
from openjiuwen.core.session import Config
from openjiuwen.core.session.checkpointer import CheckpointerFactory
from openjiuwen.core.session.stream import BaseStreamMode
from openjiuwen.core.runner.spawn import (
    MessageType,
    SpawnAgentConfig,
    SpawnConfig,
    SpawnedProcessHandle,
    spawn_process,
)
from openjiuwen.core.single_agent import (
    BaseAgent,
    create_agent_session,
    LegacyBaseAgent,
    Session as AgentSession,
)
from openjiuwen.core.single_agent.schema.agent_card import AgentCard
from openjiuwen.core.workflow import (
    create_workflow_session,
    generate_workflow_key,
    Session as WorkflowSession,
    Workflow,
)

if TYPE_CHECKING:
    from openjiuwen.agent_teams.runtime import TeamRuntimeManager


class _RunnerImpl(_TeamRunnerMixin):
    """
    Runner implementation class.
    """
    _DEFAULT_RUNNER_ID = "global"

    _DEFAULT_AGENT_SESSION_ID = "default_session"

    _AGENT_CONVERSATION_ID = "conversation_id"

    @staticmethod
    def _get_spawn_logging_config() -> dict[str, Any]:
        from openjiuwen.core.common.logging.log_config import get_log_config_snapshot

        return get_log_config_snapshot()

    def __init__(self, runner_id: str = _DEFAULT_RUNNER_ID, config: RunnerConfig = None):
        """
        Initialize the Runner with configuration.

        Args:
            runner_id: Runner unique id.
            config: Configuration for the runner. If None, defaults will be used.
        """
        self._runner_id = runner_id
        self._resource_manager = ResourceMgr()
        self._message_queue = LocalMessageQueue()
        if config is not None:
            set_runner_config(config)
        else:
            set_runner_config(DEFAULT_RUNNER_CONFIG)
        # Distributed system related components
        self.system_reply_sub: ReplyTopicSubscription | None = None
        self._distribute_message_queue = None
        self._callback_framework = AsyncCallbackFramework()
        self._root_task_group = None
        self._root_task_group_owner = None
        self._root_task_group_ready = None
        self._root_task_group_stop = None
        self._team_runtime_manager: Optional["TeamRuntimeManager"] = None

    @property
    def resource_mgr(self) -> ResourceMgr:
        """Get the resource manager for workflow, agent, agent_team, tool, model, prompt..."""
        return self._resource_manager

    @property
    def pubsub(self):
        """Get the local message queue for publish/subscribe communication."""
        return self._message_queue

    @property
    def dist_pubsub(self):
        """Get the distributed message queue for cross-process communication."""
        return self._distribute_message_queue

    @property
    def callback_framework(self) -> AsyncCallbackFramework:
        """Get the callback framework for asynchronous callbacks."""
        return self._callback_framework

    @staticmethod
    def _is_remote_agent(agent_instance: Any) -> bool:
        try:
            from openjiuwen.core.runner.drunner.remote_client.remote_agent import RemoteAgent
        except ModuleNotFoundError as exc:
            if exc.name != "a2a":
                raise
            return False
        return isinstance(agent_instance, RemoteAgent)

    def get_root_task_group(self):
        """Get the runner-owned root task group, if Runner has been started."""
        return self._root_task_group

    async def _root_task_group_owner_loop(
        self,
        ready: asyncio.Event,
        stop: asyncio.Event,
    ) -> None:
        try:
            async with anyio.create_task_group() as task_group:
                self._root_task_group = task_group
                ready.set()
                await stop.wait()
                task_group.cancel_scope.cancel()
        finally:
            self._root_task_group = None
            ready.set()

    async def _ensure_root_task_group(self) -> None:
        if self._root_task_group is not None and self._root_task_group_owner is not None:
            return
        # Import the manager so lower layers can schedule via manager.create_task().
        from openjiuwen.core.common.task_manager.manager import get_task_manager

        get_task_manager()
        self._root_task_group_ready = asyncio.Event()
        self._root_task_group_stop = asyncio.Event()
        self._root_task_group_owner = asyncio.create_task(
            self._root_task_group_owner_loop(
                self._root_task_group_ready,
                self._root_task_group_stop,
            )
        )
        await self._root_task_group_ready.wait()
        if self._root_task_group is None and self._root_task_group_owner.done():
            await self._root_task_group_owner

    async def _close_root_task_group(self) -> None:
        if self._root_task_group_owner is None:
            return
        from openjiuwen.core.common.task_manager.manager import get_task_manager

        owner = self._root_task_group_owner
        try:
            await get_task_manager().cancel_all()
            if self._root_task_group_stop is not None:
                self._root_task_group_stop.set()
            if self._root_task_group is not None:
                self._root_task_group.cancel_scope.cancel()
            with anyio.move_on_after(5, shield=True):
                await owner
            if not owner.done():
                owner.cancel()
                try:
                    with anyio.move_on_after(1, shield=True):
                        await owner
                except asyncio.CancelledError:
                    pass
        except Exception as e:
            logger.warning(
                "Failed to close runner root task group",
                event_type=LogEventType.RUNNER_STOP,
                runner_id=self._runner_id,
                exception=e,
            )
        finally:
            self._root_task_group = None
            self._root_task_group_owner = None
            self._root_task_group_ready = None
            self._root_task_group_stop = None

    @contextmanager
    def _root_task_group_scope(self):
        if self._root_task_group is None:
            yield
            return
        from openjiuwen.core.common.task_manager.context import get_task_group, reset_task_group, set_task_group

        if get_task_group() is not None:
            yield
            return
        token = set_task_group(self._root_task_group)
        try:
            yield
        finally:
            reset_task_group(token)

    def _enter_root_task_group_context(self):
        """Bind :attr:`_root_task_group` into ContextVar; return token for :meth:`_exit_root_task_group_context`.

        Async generators must use this with ``try``/``finally`` instead of :meth:`_root_task_group_scope`:
        ``contextlib``'s sync ``yield`` + ``GeneratorExit`` can call ``ContextVar.reset`` in an invalid
        context (e.g. A2A streaming on a uvicorn worker thread).
        """
        if self._root_task_group is None:
            return None
        from openjiuwen.core.common.task_manager.context import get_task_group, set_task_group

        if get_task_group() is not None:
            return None
        return set_task_group(self._root_task_group)

    def _exit_root_task_group_context(self, token) -> None:
        if token is None:
            return
        from openjiuwen.core.common.task_manager.context import reset_task_group

        try:
            reset_task_group(token)
        except ValueError:
            logger.debug(
                "ContextVar reset skipped for root task group (invalid token context); "
                "runner_id=%s",
                self._runner_id,
            )

    def set_config(self, config: RunnerConfig):
        """Set the runner configuration with provided config object.

        Args:
            config: The RunnerConfig object containing configuration settings
        """
        logger.info(f"set runner {self._runner_id} config {config}")
        set_runner_config(config)

    def get_config(self):
        """Retrieve the current runner configuration.

        Returns:
            RunnerConfig: The current configuration object
        """
        return get_runner_config()

    async def start(self) -> bool:
        """Start the runner and its associated components, such as message queue."""
        result = True
        logger.info("Begin to start runner", event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
        await self._ensure_root_task_group()

        try:
            with self._root_task_group_scope():
                # Initialize checkpointer if configured
                checkpointer_config = get_runner_config().checkpointer_config
                if checkpointer_config is not None:
                    logger.info(f"Begin to initializing checkpointer with type: {checkpointer_config.type}",
                                event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
                    try:
                        # Lazy import checkpointer providers based on type
                        if checkpointer_config.type == "redis":
                            try:
                                # Import Redis checkpointer provider to ensure it's registered
                                from openjiuwen.extensions.checkpointer.redis import checkpointer as _  # noqa: F401
                            except ImportError as e:
                                logger.error("Redis checkpointer not available. "
                                             "Please install redis dependencies",
                                             event_type=LogEventType.RUNNER_START,
                                             runner_id=self._runner_id, exception=e)
                                raise

                        checkpointer = await CheckpointerFactory.create(checkpointer_config)
                        CheckpointerFactory.set_default_checkpointer(checkpointer)
                        logger.info(f"Succeed to initializing checkpointer with type: {checkpointer_config.type}",
                                    event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
                    except Exception as e:
                        logger.error(f"Failed to initializing checkpointer with type: {checkpointer_config.type}",
                                     event_type=LogEventType.RUNNER_START, runner_id=self._runner_id, exception=e)
                        logger.error("Failed to start runner",
                                     event_type=LogEventType.RUNNER_START, runner_id=self._runner_id, exception=e)
                        raise

                if get_runner_config().distributed_mode:
                    # start dmq
                    self._distribute_message_queue = MessageQueueFactory.create(
                        get_runner_config().distributed_config.message_queue_config)
                    self._distribute_message_queue.start()
                    # start reply topic sub
                    self.system_reply_sub = ReplyTopicSubscription(self._distribute_message_queue)
                    self.system_reply_sub.activate()
                    result = await self._message_queue.start()
                if result:
                    logger.info("Succeed to start runner",
                                event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
                else:
                    logger.error("Failed to start runner, message queue start failed",
                                 event_type=LogEventType.RUNNER_START, runner_id=self._runner_id)
                return result
        except Exception:
            await self._close_root_task_group()
            raise

    async def stop(self):
        """Stop the runner and clean up resources."""
        logger.info("Begin to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id)
        try:
            with self._root_task_group_scope():
                if get_runner_config().distributed_mode:
                    # 1. Stop ReplyTopicSubscription, clean up collector
                    if self.system_reply_sub:
                        await self.system_reply_sub.deactivate()
                        self.system_reply_sub = None
                    # 2. Stop MQ
                    if self._distribute_message_queue:
                        await self._distribute_message_queue.stop()
                        self._distribute_message_queue = None

                result = await self._message_queue.stop()
            logger.info("Succeed to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id)
            return result
        except Exception as e:
            logger.warning("Failed to stop runner", event_type=LogEventType.RUNNER_STOP, runner_id=self._runner_id,
                           exception=e)
            return False
        finally:
            await self._resource_manager.release()
            await self._close_root_task_group()

    async def run_workflow(self,
                           workflow: str | Workflow,
                           inputs: Any,
                           *,
                           session: Optional[str | WorkflowSession | AgentSession] = None,
                           context: ModelContext = None,
                           envs: Optional[dict[str, Any]] = None):
        """
        Execute a workflow with given inputs.

        Args:
            workflow: Workflow name or Workflow instance to execute
            inputs: Input data for the workflow
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides,
        """
        with self._root_task_group_scope():
            workflow_instance, workflow_session = await self._prepare_workflow(workflow, session)
            return await workflow_instance.invoke(inputs, session=workflow_session, context=context)

    async def run_workflow_streaming(self,
                                     workflow: str | Workflow,
                                     inputs: Any,
                                     *,
                                     session: Optional[str | WorkflowSession | AgentSession] = None,
                                     context: ModelContext = None,
                                     stream_modes: list[BaseStreamMode] = None,
                                     envs: Optional[dict[str, Any]] = None):
        """
        Execute a workflow with streaming output support.

        Args:
            workflow: Workflow name or Workflow instance to execute
            inputs: Input data for the workflow
            session: Existing session ID or Session instance for context persistence
            context: model context
            stream_modes: Types of streaming data to output
            envs: Environment variables or configuration overrides
        """
        token = self._enter_root_task_group_context()
        try:
            workflow_instance, workflow_session = await self._prepare_workflow(workflow, session)
            async for chunk in workflow_instance.stream(inputs, session=workflow_session,
                                                        stream_modes=stream_modes, context=context):
                yield chunk
        finally:
            self._exit_root_task_group_context(token)

    async def run_agent(self,
                        agent: str | BaseAgent | LegacyBaseAgent,
                        inputs: Any,
                        *,
                        session: Optional[str | AgentSession] = None,
                        context: ModelContext = None,
                        envs: Optional[dict[str, Any]] = None,
                        ):
        """
        Execute a single agent with given inputs.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides
        """
        with self._root_task_group_scope():
            agent_instance, agent_session = await self._prepare_agent(agent, inputs, session)
            if self._is_remote_agent(agent_instance):
                res = await agent_instance.invoke(inputs)
            elif isinstance(agent_instance, LegacyBaseAgent):
                # ControllerAgent handles its own session lifecycle
                res = await agent_instance.invoke(inputs, session=None)
            else:
                res = await agent_instance.invoke(inputs, agent_session)
                await agent_session.post_run()
            return res

    async def run_agent_streaming(self,
                                  agent: str | BaseAgent | LegacyBaseAgent,
                                  inputs: Any,
                                  *,
                                  session: Optional[str | AgentSession] = None,
                                  context: ModelContext = None,
                                  stream_modes: list[BaseStreamMode] = None,
                                  envs: Optional[dict[str, Any]] = None):
        """
           Execute a single agent with streaming output support.

           Args:
               agent: Agent name or BaseAgent instance to execute
               inputs: Input data for the agent
               session: Existing session ID or Session instance for context persistence
               context: model context
               stream_modes: Types of streaming data to output
               envs: Environment variables or configuration override
        """
        token = self._enter_root_task_group_context()
        try:
            agent_instance, agent_session = await self._prepare_agent(agent, inputs, session)
            if self._is_remote_agent(agent_instance):
                async for chunk in agent_instance.stream(inputs):
                    yield chunk
            elif isinstance(agent_instance, LegacyBaseAgent):
                # ControllerAgent handles its own session lifecycle
                async for chunk in agent_instance.stream(inputs, session=None):
                    yield chunk
            else:
                async for chunk in agent_instance.stream(inputs, session=agent_session):
                    yield chunk
                await agent_session.post_run()
        finally:
            self._exit_root_task_group_context(token)

    async def release(self, session_id: str, *, force: bool = False):
        """
        Release resources associated with a session.

        For agent team sessions, this automatically cleans up dynamic
        tables (tasks, messages, etc.) in addition to releasing the
        checkpoint; non-team sessions take the simple checkpoint-only
        path.

        Args:
            session_id: ID of the session to clean up.
            force: When ``True``, stop any team still active on this
                session before cleaning. Default ``False`` raises
                ``AGENT_TEAM_BUSY_INVALID`` so callers explicitly choose
                between graceful and forced teardown.
        """
        if await self._maybe_release_team_session(session_id, force=force):
            return
        await CheckpointerFactory.get_checkpointer().release(session_id)

    @classmethod
    def _is_called_by_agent(cls, session: AgentSession) -> bool:
        return session and isinstance(session, AgentSession)

    @classmethod
    def _create_workflow_session(cls, session):
        # Convert workflow session
        if not session:
            workflow_session = create_workflow_session()
        elif isinstance(session, str):
            workflow_session = create_workflow_session(session_id=session)
        elif isinstance(session, AgentSession):
            workflow_session = session.create_workflow_session()
        else:
            workflow_session = session
        return workflow_session

    async def _prepare_agent(self, agent: Union[str, BaseAgent], inputs: Any,
                             session: Optional[str | AgentSession] = None):
        if isinstance(session, AgentSession):
            if isinstance(agent, str):
                agent_instance = await self._resource_manager.get_agent(agent_id=agent)
                if agent_instance is None:
                    raise build_error(StatusCode.RUNNER_RUN_AGENT_ERROR, agent=agent, reason="agent not exist")
                await session.pre_run(inputs=inputs)
                return agent_instance, session
            await session.pre_run(inputs=inputs)
            return agent, session
        session_id = inputs.get(self._AGENT_CONVERSATION_ID,
                                session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID)
        if isinstance(agent, str):
            agent_instance = await self._resource_manager.get_agent(agent_id=agent)
            if agent_instance is None:
                raise build_error(StatusCode.RUNNER_RUN_AGENT_ERROR, agent=agent, reason="agent not exist")
            if self._is_remote_agent(agent_instance):
                if self._AGENT_CONVERSATION_ID not in inputs:
                    inputs[self._AGENT_CONVERSATION_ID] = session_id
                return agent_instance, None

            agent_session = self._create_agent_session(agent_instance, session_id)
            await agent_session.pre_run(inputs=inputs)
            return agent_instance, agent_session

        agent_session = self._create_agent_session(agent, session_id)
        await agent_session.pre_run(inputs=inputs)
        return agent, agent_session

    async def spawn_agent(
        self,
        agent_config: SpawnAgentConfig,
        inputs: Any,
        *,
        session: Optional[str | AgentSession] = None,
        context: ModelContext = None,
        envs: Optional[dict[str, Any]] = None,
        spawn_config: Optional[SpawnConfig] = None,
    ) -> SpawnedProcessHandle:
        """
        Spawn a child process to run an agent.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides
            spawn_config: Configuration for spawned process management

        Returns:
            SpawnedProcessHandle for managing the spawned process
        """
        if not isinstance(agent_config, SpawnAgentConfig):
            raise TypeError("Runner.spawn_agent now requires SpawnAgentConfig.")
        normalized_inputs = inputs if isinstance(inputs, dict) else {"data": inputs}
        session_id = normalized_inputs.get(
            self._AGENT_CONVERSATION_ID,
            session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID,
        )
        logging_config = (
            agent_config.logging_config
            if agent_config.logging_config is not None
            else self._get_spawn_logging_config()
        )
        spawn_payload = agent_config.model_copy(update={"session_id": session_id, "logging_config": logging_config})
        handle = await spawn_process(
            agent_config=spawn_payload.model_dump(mode="json"),
            inputs=normalized_inputs,
            config=spawn_config,
        )
        if spawn_config is not None:
            await handle.start_health_check()
        return handle

    async def spawn_agent_streaming(
        self,
        agent_config: SpawnAgentConfig,
        inputs: Any,
        *,
        session: Optional[str | AgentSession] = None,
        context: ModelContext = None,
        stream_modes: list[BaseStreamMode] = None,
        envs: Optional[dict[str, Any]] = None,
        spawn_config: Optional[SpawnConfig] = None,
    ) -> AsyncIterator[tuple[SpawnedProcessHandle, Any]]:
        """
        Spawn a child process to run an agent with streaming output.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            stream_modes: Types of streaming data to output
            envs: Environment variables or configuration overrides
            spawn_config: Configuration for spawned process management

        Yields:
            Tuples of (SpawnedProcessHandle, message) as messages arrive
        """
        if not isinstance(agent_config, SpawnAgentConfig):
            raise TypeError("Runner.spawn_agent_streaming now requires SpawnAgentConfig.")
        normalized_inputs = inputs if isinstance(inputs, dict) else {"data": inputs}
        session_id = normalized_inputs.get(
            self._AGENT_CONVERSATION_ID,
            session if isinstance(session, str) else self._DEFAULT_AGENT_SESSION_ID,
        )
        logging_config = (
            agent_config.logging_config
            if agent_config.logging_config is not None
            else self._get_spawn_logging_config()
        )
        spawn_payload = agent_config.model_copy(update={"session_id": session_id, "logging_config": logging_config})

        handle = await spawn_process(
            agent_config=spawn_payload.model_dump(mode="json"),
            inputs=normalized_inputs,
            config=spawn_config,
        )

        yield handle, None

        while handle.is_alive:
            message = await handle.receive_message()
            if message is None:
                break

            if message.type == MessageType.STREAM_CHUNK:
                yield handle, message.payload
            elif message.type == MessageType.DONE:
                yield handle, message.payload
                break
            elif message.type == MessageType.ERROR:
                yield handle, message.payload
                break
            elif message.type == MessageType.OUTPUT:
                yield handle, message.payload

    async def _prepare_workflow(self, workflow: Union[str, Workflow],
                                session: str | AgentSession | WorkflowSession) -> tuple[Workflow, WorkflowSession]:
        if isinstance(workflow, str):
            workflow_key = workflow
        else:
            workflow_key = generate_workflow_key(workflow.card.id, workflow.card.version)

        workflow_session = self._create_workflow_session(session)
        if isinstance(workflow, str):
            workflow_instance = await self._resource_manager.get_workflow(workflow_id=workflow_key,
                                                                          session=workflow_session)
        else:
            workflow_instance = workflow
        return workflow_instance, workflow_session

    @staticmethod
    def _create_agent_session(agent, session_id):
        envs = None
        if hasattr(agent, "card"):
            config = agent.config
            card = agent.card
        else:
            # LegacyBaseAgent does not have card attribute
            config = agent.config()
            card = AgentCard(id=config.get_agent_config().id)
        if isinstance(config, Config):
            envs = getattr(config, "_env", None)
        agent_session = create_agent_session(session_id=session_id, envs=envs, card=card)
        return agent_session


# Global runner instance
GLOBAL_RUNNER = _RunnerImpl(config=DEFAULT_RUNNER_CONFIG)


class _ClassProperty:
    """Descriptor for class-level properties."""
    
    def __init__(self, name: str):
        self.name = name
    
    def __get__(self, obj, objtype=None):
        return getattr(GLOBAL_RUNNER, self.name)


class Runner(_TeamRunnerClassMixin):
    """
    Runner singleton class that proxies all calls to the global runner instance.

    This class provides a singleton interface for accessing the global runner instance.
    All method calls and property accesses are automatically proxied to GLOBAL_RUNNER.

    Example:
        >>> from openjiuwen.core.runner import Runner
        >>> await Runner.start()
        >>> resource_mgr = Runner.resource_mgr
        >>> await Runner.run_agent(agent, inputs)
    """
    
    # Properties
    resource_mgr: ResourceMgr = _ClassProperty("resource_mgr")  # type: ignore[assignment]
    """Get the resource manager for workflow, agent, agent_team, tool, model, prompt..."""
    
    pubsub = _ClassProperty("pubsub")
    """Get the local message queue for publish/subscribe communication."""
    
    dist_pubsub = _ClassProperty("dist_pubsub")
    """Get the distributed message queue for cross-process communication."""

    system_reply_sub: ReplyTopicSubscription = _ClassProperty("system_reply_sub")
    """Get the reply topic subscription for distributed system reply messages."""
    
    callback_framework: AsyncCallbackFramework = _ClassProperty("callback_framework")  # type: ignore[assignment]
    """Get the callback framework for asynchronous callbacks."""
    
    # Methods
    @classmethod
    def get_root_task_group(cls):
        """Get the runner-owned root task group."""
        return GLOBAL_RUNNER.get_root_task_group()

    @classmethod
    def set_config(cls, config: RunnerConfig) -> None:
        """Set the runner configuration with provided config object.

        Args:
            config: The RunnerConfig object containing configuration settings
        """
        GLOBAL_RUNNER.set_config(config)
    
    @classmethod
    def get_config(cls) -> RunnerConfig:
        """Retrieve the current runner configuration.

        Returns:
            RunnerConfig: The current configuration object
        """
        return GLOBAL_RUNNER.get_config()
    
    @classmethod
    async def start(cls) -> bool:
        """Start the runner and its associated components, such as message queue."""
        return await GLOBAL_RUNNER.start()
    
    @classmethod
    async def stop(cls):
        """Stop the runner and clean up resources."""
        return await GLOBAL_RUNNER.stop()
    
    @classmethod
    async def run_workflow(
        cls,
        workflow: str | Workflow,
        inputs: Any,
        *,
            session: Optional[str | WorkflowSession | AgentSession] = None,
        context: Optional[ModelContext] = None,
        envs: Optional[dict[str, Any]] = None
    ) -> Any:
        """
        Execute a workflow with given inputs.

        Args:
            workflow: Workflow name or Workflow instance to execute
            inputs: Input data for the workflow
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides
        """
        return await GLOBAL_RUNNER.run_workflow(
            workflow=workflow,
            inputs=inputs,
            session=session,
            context=context,
            envs=envs
        )
    
    @classmethod
    async def run_workflow_streaming(
        cls,
        workflow: str | Workflow,
        inputs: Any,
        *,
            session: Optional[str | WorkflowSession | AgentSession] = None,
        context: Optional[ModelContext] = None,
        stream_modes: Optional[list[BaseStreamMode]] = None,
        envs: Optional[dict[str, Any]] = None
    ) -> AsyncIterator[Any]:
        """
        Execute a workflow with streaming output support.

        Args:
            workflow: Workflow name or Workflow instance to execute
            inputs: Input data for the workflow
            session: Existing session ID or Session instance for context persistence
            context: model context
            stream_modes: Types of streaming data to output
            envs: Environment variables or configuration overrides
        """
        async for chunk in GLOBAL_RUNNER.run_workflow_streaming(
            workflow=workflow,
            inputs=inputs,
            session=session,
            context=context,
            stream_modes=stream_modes,
            envs=envs
        ):
            yield chunk
    
    @classmethod
    async def run_agent(
        cls,
        agent: str | BaseAgent | LegacyBaseAgent,
        inputs: Any,
        *,
            session: Optional[str | AgentSession] = None,
        context: Optional[ModelContext] = None,
        envs: Optional[dict[str, Any]] = None,
    ) -> Any:
        """
        Execute a single agent with given inputs.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides
        """
        return await GLOBAL_RUNNER.run_agent(
            agent=agent,
            inputs=inputs,
            session=session,
            context=context,
            envs=envs
        )
    
    @classmethod
    async def run_agent_streaming(
        cls,
        agent: str | BaseAgent | LegacyBaseAgent,
        inputs: Any,
        *,
            session: Optional[str | AgentSession] = None,
        context: Optional[ModelContext] = None,
        stream_modes: Optional[list[BaseStreamMode]] = None,
        envs: Optional[dict[str, Any]] = None
    ) -> AsyncIterator[Any]:
        """
        Execute a single agent with streaming output support.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            stream_modes: Types of streaming data to output
            envs: Environment variables or configuration override
        """
        async for chunk in GLOBAL_RUNNER.run_agent_streaming(
            agent=agent,
            inputs=inputs,
            session=session,
            context=context,
            stream_modes=stream_modes,
            envs=envs
        ):
            yield chunk

    @classmethod
    async def spawn_agent(
        cls,
        agent_config: SpawnAgentConfig,
        inputs: Any,
        *,
        session: Optional[str | AgentSession] = None,
        context: Optional[ModelContext] = None,
        envs: Optional[dict[str, Any]] = None,
        spawn_config: Optional[SpawnConfig] = None,
    ) -> SpawnedProcessHandle:
        """
        Spawn a child process to run an agent.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            envs: Environment variables or configuration overrides
            spawn_config: Configuration for spawned process management

        Returns:
            SpawnedProcessHandle for managing the spawned process
        """
        return await GLOBAL_RUNNER.spawn_agent(
            agent_config=agent_config,
            inputs=inputs,
            session=session,
            context=context,
            envs=envs,
            spawn_config=spawn_config,
        )

    @classmethod
    async def spawn_agent_streaming(
        cls,
        agent_config: SpawnAgentConfig,
        inputs: Any,
        *,
        session: Optional[str | AgentSession] = None,
        context: Optional[ModelContext] = None,
        stream_modes: Optional[list[BaseStreamMode]] = None,
        envs: Optional[dict[str, Any]] = None,
        spawn_config: Optional[SpawnConfig] = None,
    ) -> AsyncIterator[tuple[SpawnedProcessHandle, Any]]:
        """
        Spawn a child process to run an agent with streaming output.

        Args:
            agent: Agent name or BaseAgent instance to execute
            inputs: Input data for the agent
            session: Existing session ID or Session instance for context persistence
            context: model context
            stream_modes: Types of streaming data to output
            envs: Environment variables or configuration overrides
            spawn_config: Configuration for spawned process management

        Yields:
            Tuples of (SpawnedProcessHandle, message) as messages arrive
        """
        async for handle, message in GLOBAL_RUNNER.spawn_agent_streaming(
            agent_config=agent_config,
            inputs=inputs,
            session=session,
            context=context,
            stream_modes=stream_modes,
            envs=envs,
            spawn_config=spawn_config,
        ):
            yield handle, message

    @classmethod
    async def release(cls, session_id: str, *, force: bool = False) -> None:
        """
        Release resources associated with a session.

        Args:
            session_id: ID of the session to clean up.
            force: When ``True``, stop any team still active on this
                session before cleaning. See ``_RunnerImpl.release``.
        """
        await GLOBAL_RUNNER.release(session_id, force=force)