from __future__ import annotations
import copy
import json
import logging
from typing import Any, Dict
from openjiuwen.core.workflow.workflow import Workflow as InvokableWorkflow
from openjiuwen_studio.core.executor.workflow.context import Context
from openjiuwen_studio.core.executor.workflow.workflow import Workflow as ExecutorWorkflow
from openjiuwen_studio.schemas.workflow import WorkflowBase
from .workflow_builder import (
build_plugin_tool_config_map,
extract_inputs_and_outputs_from_canvas,
inject_api_keys_into_model_references,
workflow_convert_for_export,
)
from .config_adapter import ConfigAdapter
from .workflow_dependency_loader import (
DependencyWorkflowLoader,
collect_workflow_registry,
looks_like_dsl_workflow_export,
unwrap_workflow_document,
workflow_dict_to_dl_workflow,
)
logger = logging.getLogger(__name__)
class RuntimeWorkflowRunner:
"""Runtime-side workflow manager aligned with studio Agent executor semantics."""
def __init__(
self,
export_config: Dict[str, Any],
*,
current_user: Dict[str, Any] | None = None,
space_id: str = "default",
plugin_mgr: Any = None,
) -> None:
self._export_config = export_config or {}
self._space_id = space_id
self._current_user = current_user or {"user_id": "runtime", "space_id": space_id}
self._plugin_mgr = plugin_mgr
self._model_references = self._export_config.get("model_references", {})
self._plugin_tool_configs = build_plugin_tool_config_map(
self._export_config.get("dependencies", {}).get("plugins") or []
)
self._registry = collect_workflow_registry(self._export_config)
def _resolve_workflow(self, workflow_id: str, version: str) -> Dict[str, Any]:
version = str(version or "draft").strip() or "draft"
workflow_id = str(workflow_id or "").strip()
suffix = f"_{version}"
if workflow_id.endswith(suffix):
workflow_id = workflow_id[:-len(suffix)]
workflow = self._registry.get((workflow_id, version))
if workflow is None and version != "draft":
workflow = self._registry.get((workflow_id, "draft"))
if workflow is None:
raise ValueError(f"Workflow not found in export config: id={workflow_id}, version={version}")
return copy.deepcopy(workflow)
@staticmethod
def _disable_end_stream_output(canvas: Dict[str, Any]) -> Dict[str, Any]:
"""Agent runtime consumes workflow results as tool outputs, not as workflow UI streams.
Leaving end-node `stream_output=true` forces openjiuwen workflow invoke() down the
internal stream-actor path even for agent tool calls, which has proven flaky in the
runtime deployment environment and can stall before the first frame is emitted.
"""
if not isinstance(canvas, dict):
return canvas
nodes = canvas.get("nodes")
if not isinstance(nodes, list):
return canvas
patched = False
for node in nodes:
if not isinstance(node, dict):
continue
if str(node.get("type")) != "2":
continue
data = node.get("data")
if not isinstance(data, dict):
continue
inputs = data.get("inputs")
if isinstance(inputs, dict) and inputs.get("streaming") is True:
inputs["streaming"] = False
patched = True
if patched:
logger.info("RuntimeWorkflowRunner disabled end-node stream_output for agent workflow execution")
return canvas
def _build_dl_workflow(self, workflow_dict: Dict[str, Any], *, space_id: str) -> Any:
workflow_dict = copy.deepcopy(workflow_dict)
source = unwrap_workflow_document(workflow_dict)
if looks_like_dsl_workflow_export(source):
return workflow_dict_to_dl_workflow(source)
canvas_schema = workflow_dict.get("schema", {})
if isinstance(canvas_schema, str):
canvas = json.loads(canvas_schema or "{}")
elif isinstance(canvas_schema, dict):
canvas = copy.deepcopy(canvas_schema)
else:
canvas = {}
canvas = self._disable_end_stream_output(canvas)
input_parameters, output_parameters = extract_inputs_and_outputs_from_canvas(canvas)
workflow_base = WorkflowBase(
workflow_id=str(workflow_dict.get("workflow_id") or workflow_dict.get("id") or ""),
space_id=space_id,
workflow_version=str(workflow_dict.get("workflow_version") or workflow_dict.get("version") or "draft"),
name=str(workflow_dict.get("workflow_name") or workflow_dict.get("name") or "Workflow"),
desc=str(workflow_dict.get("description") or ""),
schema=json.dumps(canvas, ensure_ascii=False),
input_parameters=input_parameters,
output_parameters=output_parameters,
)
processed_model_references = ConfigAdapter.preprocess_model_references(
workflow_dict.get("schema", {}),
inject_api_keys_into_model_references(self._model_references),
)
return workflow_convert_for_export(
workflow_base,
skip_validation=True,
model_references=processed_model_references,
plugin_tool_configs=self._plugin_tool_configs,
)
async def get_flow(
self,
workflow_id: str,
version: str,
space_id: str,
current_user: Dict[str, Any],
) -> ExecutorWorkflow:
logger.info(
"RuntimeWorkflowRunner.get_flow start: workflow_id=%s, version=%s, space_id=%s",
workflow_id,
version,
space_id or self._space_id,
)
workflow_dict = self._resolve_workflow(workflow_id, version)
actual_space_id = space_id or self._space_id
actual_user = current_user or self._current_user
dl_workflow = self._build_dl_workflow(workflow_dict, space_id=actual_space_id)
flow = ExecutorWorkflow(
dl_workflow,
actual_space_id,
actual_user,
plugin_mgr=self._plugin_mgr,
workflow_mgr=self,
)
logger.info(
"RuntimeWorkflowRunner.get_flow done: workflow_id=%s, version=%s, flow_name=%s",
workflow_id,
version,
getattr(flow, "name", None),
)
return flow
async def get_compiled_workflow(
self,
context: Context,
workflow_id: str,
version: str,
space_id: str,
current_user: Dict[str, Any],
) -> InvokableWorkflow:
logger.info(
"RuntimeWorkflowRunner.get_compiled_workflow start: workflow_id=%s, version=%s, space_id=%s",
workflow_id,
version,
space_id or self._space_id,
)
workflow = await self.get_flow(workflow_id, version, space_id, current_user)
actual_space_id = space_id or self._space_id
actual_user = current_user or self._current_user
loader = DependencyWorkflowLoader(
self._registry,
actual_space_id,
actual_user,
)
compiled = await workflow.compile(context, loader=loader)
logger.info(
"RuntimeWorkflowRunner.get_compiled_workflow done: workflow_id=%s, version=%s",
workflow_id,
version,
)
return compiled