#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
工作流构建器 - 从低码工作流 JSON 构建可执行的 Workflow

支持两种输入:
1. 画布 IR:nodes、edges、model_references、plugins。
2. DSL 导出:components、connections,以及 dependencies.workflows 内嵌子工作流
"""

from __future__ import annotations

import copy
import json
import os
from typing import Any, Dict, List, Optional

from pydantic import ValidationError

from openjiuwen.core.common.logging import logger
from openjiuwen_studio.core.common import dsl as studio_dsl
from openjiuwen_studio.core.common.exceptions import JiuWenComponentException
from openjiuwen_studio.core.common.status_code import StatusCode
from openjiuwen_studio.core.executor.workflow.context import Context
from openjiuwen_studio.core.executor.workflow.workflow import Workflow as ExecutorWorkflow
from openjiuwen_studio.core.manager.convertor.components.code import code_convert
from openjiuwen_studio.core.manager.convertor.components.common import (
    exception_config_convert,
    input_params_convert,
    outputs_convert,
)
from openjiuwen_studio.core.manager.convertor.components.empty import empty_convert
from openjiuwen_studio.core.manager.convertor.components.end import end_convert
from openjiuwen_studio.core.manager.convertor.components.input import input_convert
from openjiuwen_studio.core.manager.convertor.components.intent import (
    _intent_inputs_convert,
    _intent_outputs_convert,
)
from openjiuwen_studio.core.manager.convertor.components.llm import _llm_output_config_convert
from openjiuwen_studio.core.manager.convertor.components.loop import (
    loop_break_convert,
    loop_continue_convert,
    loop_convert,
)
from openjiuwen_studio.core.manager.convertor.components.output import output_convert
from openjiuwen_studio.core.manager.convertor.components.questioner import _output_and_extract_field_convert
from openjiuwen_studio.core.manager.convertor.components.set_variable import set_variable_convert
from openjiuwen_studio.core.manager.convertor.components.start import start_convert
from openjiuwen_studio.core.manager.convertor.components.sub_workflow import sub_workflow_convert
from openjiuwen_studio.core.manager.convertor.components.switch import switch_convert
from openjiuwen_studio.core.manager.convertor.components.text_editor import text_editor_convert
from openjiuwen_studio.core.manager.convertor.components.variable_merge import variable_merge_convert
from openjiuwen_studio.core.manager.convertor.connection import connection_convert
from openjiuwen_studio.core.manager.convertor.validators import validate_canvas_nodes
from openjiuwen_studio.core.manager.convertor.workflow import (
    _friendly_validation_message,
    extract_inputs_and_outputs_from_canvas,
)
from openjiuwen_studio.core.manager.internal.workflow import WorkflowCanvas
from openjiuwen_studio.core.manager.utils.utils import convert_to_properties_format
from openjiuwen_studio.core.utils.exception import log_exception
from openjiuwen_studio.schemas.node import BaseValue, Node
from openjiuwen_studio.schemas.workflow import WorkflowBase

from .workflow_dependency_loader import (
    DependencyWorkflowLoader,
    collect_workflow_registry,
    unwrap_workflow_document,
    workflow_dict_to_dl_workflow,
    looks_like_dsl_workflow_export,
)


_PARAM_TYPE_IR = {
    "1": "string",
    "2": "integer",
    "3": "number",
    "4": "boolean",
    "5": "object",
    "6": "array",
    "7": "array",
    "string": "string",
    "int": "integer",
    "integer": "integer",
    "float": "number",
    "number": "number",
    "bool": "boolean",
    "boolean": "boolean",
    "object": "object",
    "array": "array",
}

_HTTP_METHOD_IR = {
    "1": "GET",
    "2": "POST",
    "3": "PUT",
    "4": "DELETE",
    "get": "GET",
    "post": "POST",
    "put": "PUT",
    "delete": "DELETE",
}

_SEND_METHOD_IR = {
    "0": "",
    "1": "header",
    "2": "query",
    "3": "body",
    "header": "header",
    "query": "query",
    "body": "body",
}


def _get_bool_env(name: str, default: bool = False) -> bool:
    raw = (os.environ.get(name, "true" if default else "false") or "").strip().lower()
    return raw in {"1", "true", "yes", "on"}


def _resolve_llm_api_key_from_env(base_url: str) -> str:
    import re
    from urllib.parse import urlparse
    
    url = (base_url or "").strip().strip('"').strip("'")
    if not url:
        return ""
    parsed = urlparse(url)
    host = (parsed.hostname or "").replace(".", "_")
    path = (parsed.path or "").strip("/").replace("/", "_")
    parts = [part for part in (host, path) if part]
    raw = "_".join(parts) if parts else url
    slug = re.sub(r"[^A-Za-z0-9]+", "_", raw).strip("_").upper()
    if not slug:
        return ""
    env_key = f"LLM_KEY__{slug}"
    return (os.environ.get(env_key) or "").strip()


def _parse_runtime_userdata_api_keys() -> Dict[str, str]:
    """
    从 RUNTIME_USERDATA 环境变量中解析 API keys

    支持格式:
    1. JSON 字符串:'{"api_keys": {"qwen":"sk-abcdefg", "openai":"sk-123456"}}'
    2. Python 字典字符串:"{'api_keys': {'qwen':'sk-abcdefg'}}"

    Returns:
        API keys 字典,如 {"qwen": "sk-abcdefg", "openai": "sk-123456"}
    """

    userdata_str = os.environ.get("RUNTIME_USERDATA")
    if not userdata_str:
        return {}

    try:
        userdata = json.loads(userdata_str)
        if isinstance(userdata, dict) and "api_keys" in userdata:
            api_keys = userdata["api_keys"]
            if isinstance(api_keys, dict):
                return api_keys
    except (json.JSONDecodeError, TypeError):
        try:
            normalized_str = userdata_str.replace("'", '"')
            userdata = json.loads(normalized_str)
            if isinstance(userdata, dict) and "api_keys" in userdata:
                api_keys = userdata["api_keys"]
                if isinstance(api_keys, dict):
                    return api_keys
        except Exception as e:
            logger.debug(f"Failed to parse RUNTIME_USERDATA with normalized string: {e}")

    return {}


def _is_key_match(key_name_lower: str, target_lower: str, require_non_empty: bool = True) -> bool:
    """检查 key_name 是否与目标字符串匹配。

    匹配规则:
    1. 完全匹配(不区分大小写)
    2. key_name 包含在 target 中
    3. target 包含在 key_name 中
    """
    if require_non_empty and not target_lower:
        return False
    if key_name_lower == target_lower:
        return True
    if key_name_lower in target_lower:
        return True
    if target_lower in key_name_lower:
        return True
    return False


def _match_api_key_from_runtime_keys_for_model_reference(
    model_type: str,
    provider: str,
    ref_key: str,
    runtime_api_keys: Dict[str, str],
) -> tuple[Optional[str], str]:
    """根据模型信息从 runtime_api_keys 中匹配 API key。

    匹配优先级:
    1. model_type 完全匹配或部分匹配
    2. model_type 的最后一部分匹配(如 "qwen" 匹配 "qwen/max")
    3. provider 匹配
    4. ref_key(引用键)匹配
    5. ref_key 的 provider 部分匹配

    Returns:
        (matched_key_name, api_key) 元组,如果未匹配则返回 (None, "")
    """
    model_type_lower = model_type.lower()
    provider_lower = provider.lower()
    ref_key_lower = ref_key.lower()
    ref_key_provider_lower = ref_key.split("/", 1)[0].lower() if ref_key else ""

    for key_name, key_value in runtime_api_keys.items():
        key_name_lower = key_name.lower()

        # 检查 model_type 匹配(包括最后一部分匹配)
        if _is_key_match(key_name_lower, model_type_lower):
            return key_name, str(key_value).strip()
        if key_name_lower == model_type_lower.split("/")[-1]:
            return key_name, str(key_value).strip()

        # 检查 provider 匹配
        if provider_lower and _is_key_match(key_name_lower, provider_lower):
            return key_name, str(key_value).strip()

        # 检查 ref_key 匹配
        if ref_key_lower and _is_key_match(key_name_lower, ref_key_lower):
            return key_name, str(key_value).strip()

        # 检查 ref_key 的 provider 部分匹配
        if ref_key_provider_lower and _is_key_match(key_name_lower, ref_key_provider_lower):
            return key_name, str(key_value).strip()

    return None, ""


def inject_api_keys_into_model_references(model_references: Optional[Dict[str, Any]]) -> Dict[str, Any]:
    runtime_api_keys = _parse_runtime_userdata_api_keys()
    logger.info(f"inject_api_keys_into_model_references: runtime_api_keys keys={list(runtime_api_keys.keys())}")
    out: Dict[str, Any] = {}
    for key, ref in (model_references or {}).items():
        if not isinstance(ref, dict):
            continue
        cell = dict(ref)
        base_url = str(cell.get("base_url") or "").strip()
        provider = str(cell.get("provider") or cell.get("client_provider") or "").strip()
        ref_key = str(key or "").strip()
        ref_key_provider = ref_key.split("/", 1)[0].strip() if ref_key else ""
        
        api_key = str(cell.get("api_key") or "").strip()
        original_api_key = api_key
        
        if not api_key and base_url:
            env_api_key = _resolve_llm_api_key_from_env(base_url)
            if env_api_key:
                api_key = env_api_key
                logger.info(f"inject_api_keys: got api_key from env for {key}")
        
        if not api_key:
            model_type = str(cell.get("model_type") or cell.get("name") or "").strip()
            if model_type and runtime_api_keys:
                matched_key_name, api_key = _match_api_key_from_runtime_keys_for_model_reference(
                    model_type, provider, ref_key, runtime_api_keys
                )
                if api_key:
                    logger.info(f"inject_api_keys: matched api_key for {key} using key_name={matched_key_name}")
        
        if api_key:
            cell["api_key"] = api_key
            if not original_api_key:
                logger.info(f"inject_api_keys: successfully injected api_key for model_reference key={key}")
        else:
            model_type_str = cell.get('model_type') or cell.get('name')
            logger.warning(
                f"inject_api_keys: no api_key found for model_reference key={key}, "
                f"provider={provider}, model_type={model_type_str}"
            )
        out[str(key)] = cell
    return out


def _normalize_plugin_type(plugin_type: Any) -> str:
    v = str(plugin_type or "").lower()
    if v in {"1", "service", "api", "cloud_api", "plugin_type_cloud_api"}:
        return studio_dsl.PluginType.SERVICE
    if v in {"2", "code", "cloud_code", "plugin_type_cloud_code"}:
        return studio_dsl.PluginType.CODE
    return studio_dsl.PluginType.CODE if "code" in v else studio_dsl.PluginType.SERVICE


def _build_param(param: Dict[str, Any]) -> studio_dsl.Param:
    return studio_dsl.Param(
        name=param.get("name") or "",
        description=param.get("desc") or param.get("description") or "",
        type=_PARAM_TYPE_IR.get(str(param.get("type") or "").lower(), "string"),
        required=bool(param.get("is_required") or param.get("required")),
        method=_SEND_METHOD_IR.get(str(param.get("method") or "").lower(), ""),
        default_value=param.get("value") if param.get("value") is not None else param.get("default_value"),
        runtime=bool(param.get("is_runtime", True)),
    )


def _build_plugin_code_config(plugin: Dict[str, Any], tool: Dict[str, Any]) -> Dict[str, Any]:
    return studio_dsl.ToolCompConfig(
        type=studio_dsl.PluginType.CODE,
        tool=studio_dsl.PluginCodeConfig(
            tool_id=str(tool.get("tool_id") or tool.get("id") or ""),
            name=tool.get("name") or "",
            description=tool.get("desc") or tool.get("description") or "",
            language=tool.get("language") or "python",
            code=tool.get("code") or "",
            input_params=[_build_param(p) for p in (tool.get("request_params") or tool.get("inputs") or [])],
            output_params=[
                studio_dsl.ParamConfig(
                    name=p.get("name") or "",
                    type=_PARAM_TYPE_IR.get(str(p.get("type") or "").lower(), "string"),
                )
                for p in (tool.get("response_params") or tool.get("outputs") or [])
            ],
        ).model_dump(),
    ).model_dump()


def _build_plugin_service_config(plugin: Dict[str, Any], tool: Dict[str, Any]) -> Dict[str, Any]:
    base_url = str(plugin.get("url") or plugin.get("base_url") or "").rstrip("/")
    path = str(tool.get("path") or tool.get("url") or "")
    if base_url and path and not path.startswith(("http://", "https://")):
        path = f"{base_url}/{path.lstrip('/')}"

    headers = {}
    for header in tool.get("headers") or []:
        hn = header.get("name")
        if hn:
            headers[hn] = header.get("value")

    request_params = list(tool.get("request_params") or [])
    plugin_default_params = list(plugin.get("inputs") or plugin.get("request_params") or [])
    merged: Dict[str, Dict[str, Any]] = {str(p.get("name")): p for p in request_params if p.get("name")}
    for param in plugin_default_params:
        name = param.get("name")
        if name and name not in merged:
            merged[name] = param

    method = _HTTP_METHOD_IR.get(str(tool.get("method") or "").lower(), str(tool.get("method") or "GET").upper())

    return studio_dsl.ToolCompConfig(
        type=studio_dsl.PluginType.SERVICE,
        tool=studio_dsl.RestfulApiSchema(
            tool_id=str(tool.get("tool_id") or tool.get("id") or ""),
            name=tool.get("name") or "",
            description=tool.get("desc") or tool.get("description") or "",
            path=path,
            method=method,
            params=[_build_param(p) for p in merged.values()],
            response=[_build_param(p) for p in (tool.get("response_params") or [])],
            headers=headers,
        ).model_dump(),
    ).model_dump()


def build_plugin_tool_config_map(dependency_plugins: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    result: Dict[str, Dict[str, Any]] = {}
    for plugin in dependency_plugins or []:
        if not isinstance(plugin, dict):
            continue
        plugin_id = str(plugin.get("plugin_id") or plugin.get("id") or "")
        plugin_version = str(plugin.get("plugin_version") or plugin.get("version") or "draft")
        ptype = _normalize_plugin_type(plugin.get("plugin_type"))
        tools = plugin.get("tools") or plugin.get("tool_list") or []
        ver = plugin_version or "draft"
        for tool in tools:
            if not isinstance(tool, dict):
                continue
            tool_id = str(tool.get("tool_id") or tool.get("id") or "")
            if not tool_id:
                continue
            config = (
                _build_plugin_service_config(plugin, tool)
                if ptype == studio_dsl.PluginType.SERVICE
                else _build_plugin_code_config(plugin, tool)
            )
            for key in (f"{plugin_id}:{tool_id}:{ver}", f"{plugin_id}:{tool_id}", tool_id):
                result[key] = config
    return result


def _model_config_from_references(
    model_id: str,
    canvas_model_type: str,
    model_references: Dict[str, Any],
    node_label: str,
) -> studio_dsl.ModelConfig:
    mid = str(model_id).strip()
    ref: Optional[Dict[str, Any]] = None
    if mid and model_references:
        if mid in model_references:
            r = model_references[mid]
            ref = r if isinstance(r, dict) else None
        if ref is None:
            for r in model_references.values():
                if isinstance(r, dict) and str(r.get("model_id")) == mid:
                    ref = r
                    break
    if not ref:
        raise ValueError(f"model_references 缺少模型 id={mid!r},无法实例化节点 {node_label!r}")

    pr = ref.get("parameters")
    temp, top_p = 0.7, 0.9
    if isinstance(pr, dict):
        temp = float(pr.get("temperature", temp))
        top_p = float(pr.get("top_p", top_p))
    timeout = int(ref.get("timeout") or 60)
    api_key = str(ref.get("api_key") or "").strip()
    base_url = str(ref.get("base_url") or "").strip()
    provider = str(ref.get("provider") or ref.get("model_provider") or "openai")
    model_name = canvas_model_type or str(ref.get("model_type") or ref.get("name") or "")
    
    if not api_key:
        logger.warning(
            f"_model_config_from_references: API key is empty for "
            f"model_id={mid}, node={node_label}, provider={provider}, base_url={base_url}"
        )
    
    return studio_dsl.ModelConfig(
        model_client_config=studio_dsl.ModelClientConfig(
            client_provider=provider,
            api_key=api_key,
            api_base=base_url,
            timeout=timeout,
            verify_ssl=_get_bool_env("LLM_SSL_VERIFY", True),
        ),
        request_config=studio_dsl.ModelRequestConfig(
            model_name=model_name,
            temperature=temp,
            top_p=top_p,
        ),
    )


def _llm_convert_export(node: Any, space_id: str, model_references: Dict[str, Any]) -> studio_dsl.Component:
    data = node.data
    inputs = data.inputs
    if inputs is None or inputs.input_parameters is None:
        raise ValueError("llm node missing inputs.input_parameters")
    if inputs.llm_param is None:
        raise ValueError("llm node missing llm_param")
    if data.outputs is None:
        raise ValueError("llm node missing outputs")

    llm_params = inputs.llm_param
    model_cfg = _model_config_from_references(
        str(llm_params.model.id), llm_params.model.type, model_references, node.id
    )
    llm_configs = studio_dsl.LLMConfig(
        model=model_cfg,
        response_format_type=data.output_format,
        output_config=_llm_output_config_convert(data.outputs),
        template_content=[
            {"role": "system", "content": llm_params.system_prompt.content},
            {"role": "user", "content": llm_params.prompt.content},
        ],
        enable_history=inputs.enable_history,
    )
    return studio_dsl.Component(
        id=getattr(node, "id", ""),
        type=studio_dsl.ComponentType.COMPONENT_TYPE_LLM,
        type_version="1.0.0",
        inputs=input_params_convert(inputs.input_parameters),
        outputs=outputs_convert(data.outputs),
        configs=llm_configs.model_dump(),
        name=data.title,
    )


def _questioner_convert_export(node: Any, space_id: str, model_references: Dict[str, Any]) -> studio_dsl.Component:
    data = node.data
    inputs = data.inputs
    if inputs is None or inputs.input_parameters is None or inputs.llm_param is None:
        raise ValueError("questioner node missing inputs")
    if data.outputs is None:
        raise ValueError("questioner node missing outputs")

    llm_params = inputs.llm_param
    model_cfg = _model_config_from_references(
        str(llm_params.model.id), llm_params.model.type, model_references, node.id
    )
    converted_outputs, converted_fields = _output_and_extract_field_convert(data.outputs)
    questioner_configs = studio_dsl.QuestionerConfig(
        model=model_cfg,
        field_names=converted_fields,
        max_response=inputs.max_response,
        with_chat_history=inputs.enable_history,
    )
    return studio_dsl.Component(
        id=getattr(node, "id", ""),
        type=studio_dsl.ComponentType.COMPONENT_TYPE_QUESTION,
        type_version="1.0.0",
        inputs=input_params_convert(inputs.input_parameters),
        outputs=converted_outputs,
        configs=questioner_configs.model_dump(),
        name=data.title,
    )


def _intent_convert_export(node: Any, space_id: str, model_references: Dict[str, Any]) -> studio_dsl.Component:
    data = node.data
    inputs = data.inputs
    if inputs is None or inputs.llm_param is None:
        raise ValueError("intent node missing llm_param")
    if data.outputs is None:
        raise ValueError("intent node missing outputs")

    llm_params = inputs.llm_param
    user_prompt = llm_params.prompt.content if llm_params.prompt and llm_params.prompt.content else ""
    intents = inputs.intents or []
    category_list = [f"分类{i}" for i in range(1, len(intents) + 1)]
    category_name_list = [intent.name for intent in intents]

    model_cfg = _model_config_from_references(
        str(llm_params.model.id), llm_params.model.type, model_references, node.id
    )
    converted_configs = studio_dsl.IntentDetectionConfig(
        user_prompt=user_prompt,
        category_list=category_list,
        category_name_list=category_name_list,
        model=model_cfg,
        enable_history=inputs.enable_history,
    )
    branches = [studio_dsl.Branch(branch_id=inputs.default_intent)]
    branches.extend(studio_dsl.Branch(branch_id=intent.id) for intent in intents)

    return studio_dsl.Component(
        id=node.id,
        name=data.title,
        type=studio_dsl.ComponentType.COMPONENT_TYPE_INTENT,
        type_version="1.0.0",
        inputs=_intent_inputs_convert(inputs),
        outputs=_intent_outputs_convert(data.outputs),
        configs=converted_configs.model_dump(),
        branches=branches,
    )


def _plugin_convert_export(
    node: Any,
    space_id: str,
    plugin_tool_configs: Dict[str, Dict[str, Any]],
) -> studio_dsl.Component:
    data = node.data
    inputs = data.inputs
    if inputs is None:
        raise TypeError("plugin node inputs is none")

    convert_inputs: Dict[str, Any] = {}
    if inputs.input_parameters is not None:
        fixed: Dict[str, BaseValue] = {}
        for key, value in inputs.input_parameters.items():
            bv = BaseValue.model_validate(value) if isinstance(value, dict) else value
            if bv.type == "constant" and bv.schema is not None and bv.schema.type in ("object", "array"):
                c = bv.content
                if c is None or (isinstance(c, str) and not c.strip()):
                    bv = bv.model_copy(
                        update={"content": "{}" if bv.schema.type == "object" else "[]"}
                    )
            fixed[key] = bv
        convert_inputs = input_params_convert(fixed)

    exception_conf = studio_dsl.ExceptConfig()
    if data.exception_config is not None:
        exception_conf = exception_config_convert(data.exception_config)
    pp = data.inputs.plugin_param
    lookup_keys = (
        f"{pp.plugin_id}:{pp.tool_id}:{pp.plugin_version or 'draft'}",
        f"{pp.plugin_id}:{pp.tool_id}",
        str(pp.tool_id),
    )
    tool_dump = next((plugin_tool_configs[k] for k in lookup_keys if k in plugin_tool_configs), None)
    if not tool_dump:
        raise ValueError(f"IR plugins 中未找到插件工具,已尝试 keys={list(lookup_keys)}(节点 {node.id})")

    configs = studio_dsl.ToolCompConfig(
        type=tool_dump.get("type"),
        tool=tool_dump.get("tool") or {},
        exception_config=exception_conf,
    ).model_dump()

    return studio_dsl.Component(
        id=node.id,
        type=studio_dsl.ComponentType.COMPONENT_TYPE_PLUGIN,
        type_version="1.0.0",
        description="",
        inputs=convert_inputs,
        configs=configs,
        name=data.title,
    )


_CONVERT_ERROR_CODES = {
    studio_dsl.ComponentType.COMPONENT_TYPE_START: StatusCode.START_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_LLM: StatusCode.LLM_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_END: StatusCode.END_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_IF: StatusCode.IF_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_LOOP: StatusCode.LOOP_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_INPUT: StatusCode.INPUT_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_OUTPUT: StatusCode.OUTPUT_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_QUESTION: StatusCode.QUESTION_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_CONTINUE: StatusCode.CONTINUE_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_BREAK: StatusCode.BREAK_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_TEXT_EDITOR: StatusCode.TEXTEDITOR_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_INTENT: StatusCode.INTENT_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_SUB_WORKFLOW: StatusCode.SUBWORKFLOW_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_START: StatusCode.EMPTY_START_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_END: StatusCode.EMPTY_END_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_CODE: StatusCode.CODE_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_VARIABLE_MERGE: StatusCode.VARIABLE_MERGE_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_SET_VARIABLE: StatusCode.SET_VARIABLE_COMPONENT_CONVERT_FAILED.code,
    studio_dsl.ComponentType.COMPONENT_TYPE_PLUGIN: StatusCode.PLUGIN_COMPONENT_CONVERT_FAILED.code,
}


def component_convert_for_export(
    edges: List,
    nodes: List,
    space_id: str,
    sub_convert: bool,
    *,
    model_references: Dict[str, Any],
    plugin_tool_configs: Dict[str, Dict[str, Any]],
) -> List[studio_dsl.Component]:
    def convert_loop(n: Any, s: str, sub: bool) -> studio_dsl.Component:
        if sub:
            raise TypeError("loop component can not contain sub loop component")
        c = loop_convert(n)
        blocks = n.blocks
        if not blocks:
            raise ValueError("loop blocks is empty")
        sub_nodes = [Node(**block) for block in blocks]
        sub_comps = component_convert_for_export(
            n.edges,
            sub_nodes,
            s,
            True,
            model_references=model_references,
            plugin_tool_configs=plugin_tool_configs,
        )
        start_id: List[str] = []
        end_id: List[str] = []
        for block in n.blocks:
            sub_node = Node(**block)
            nt = int(sub_node.type)
            if nt == studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_START:
                start_id.append(sub_node.id)
            elif nt == studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_END:
                end_id.append(sub_node.id)
        if not n.edges:
            raise ValueError("loop edges is empty")
        sub_edges = connection_convert(n.edges)
        c.configs = studio_dsl.LoopConfig(
            loop_body=studio_dsl.BaseFlow(
                start_id=start_id,
                end_id=end_id,
                components=sub_comps,
                connections=sub_edges,
            )
        ).model_dump()
        return c

    converters = {
        studio_dsl.ComponentType.COMPONENT_TYPE_START: lambda n, s, sub: start_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_LLM: lambda n, s, sub: _llm_convert_export(n, s, model_references),
        studio_dsl.ComponentType.COMPONENT_TYPE_END: lambda n, s, sub: end_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_IF: lambda n, s, sub: switch_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_LOOP: convert_loop,
        studio_dsl.ComponentType.COMPONENT_TYPE_INPUT: lambda n, s, sub: input_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_OUTPUT: lambda n, s, sub: output_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_QUESTION: lambda n, s, sub: _questioner_convert_export(
            n, s, model_references
        ),
        studio_dsl.ComponentType.COMPONENT_TYPE_CONTINUE: lambda n, s, sub: loop_continue_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_BREAK: lambda n, s, sub: loop_break_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_TEXT_EDITOR: lambda n, s, sub: text_editor_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_INTENT: lambda n, s, sub: _intent_convert_export(
            n, s, model_references
        ),
        studio_dsl.ComponentType.COMPONENT_TYPE_SUB_WORKFLOW: lambda n, s, sub: sub_workflow_convert(n, s),
        studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_START: lambda n, s, sub: empty_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_EMPTY_END: lambda n, s, sub: empty_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_CODE: lambda n, s, sub: code_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_VARIABLE_MERGE: lambda n, s, sub: variable_merge_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_SET_VARIABLE: lambda n, s, sub: set_variable_convert(n),
        studio_dsl.ComponentType.COMPONENT_TYPE_PLUGIN: lambda n, s, sub: _plugin_convert_export(
            n, s, plugin_tool_configs
        ),
    }

    try:
        components: List[studio_dsl.Component] = []
        for node in nodes:
            node_type = int(node.type)
            try:
                converter = converters.get(node_type)
                if not converter:
                    msg = f"不支持的画布组件类型: {node_type}"
                    logger.error(msg)
                    raise JiuWenComponentException(
                        code=StatusCode.COMPONENT_CONVERT_FAILED.code,
                        message=StatusCode.COMPONENT_CONVERT_FAILED.errmsg.format(msg=msg),
                        component_id=node.id,
                        component_type=node_type,
                        error_stage="convert",
                    )
                comp = converter(node, space_id, sub_convert)
            except Exception as ce:
                code = _CONVERT_ERROR_CODES.get(node_type, StatusCode.COMPONENT_CONVERT_FAILED.code)
                raise JiuWenComponentException(
                    code=code,
                    message=str(ce),
                    component_id=node.id,
                    component_type=node_type,
                    error_stage="convert",
                ) from ce
            if comp:
                components.append(comp)
        return components
    except (TypeError, ValueError, AttributeError) as e:
        log_exception(e)
        raise ValueError(f"Invalid workflow schema or input: {e}") from e


def workflow_convert_for_export(
    workflow_info: Any,
    *,
    skip_validation: bool,
    model_references: Dict[str, Any],
    plugin_tool_configs: Dict[str, Dict[str, Any]],
) -> studio_dsl.Workflow:
    try:
        workflow_schema = json.loads(getattr(workflow_info, "workflow_schema", "{}") or "{}")
        try:
            canvas = WorkflowCanvas(**workflow_schema)
        except ValidationError as e:
            errs = e.errors() or []
            err = errs[0] if errs else {"msg": str(e), "loc": []}
            hint, _, component_type, component_id = _friendly_validation_message(err, workflow_schema)
            code = StatusCode.COMPONENT_CONFIG_INVALID.code
            if component_type == studio_dsl.ComponentType.COMPONENT_TYPE_LLM:
                code = StatusCode.LLM_COMPONENT_CONFIG_INVALID.code
            raise JiuWenComponentException(
                code=code,
                message=StatusCode.COMPONENT_CONFIG_INVALID.errmsg.format(msg=hint),
                component_id=component_id,
                component_type=component_type,
                error_stage="validate",
            ) from e
        if not skip_validation:
            validate_canvas_nodes(canvas)

        components = component_convert_for_export(
            canvas.edges,
            canvas.nodes,
            workflow_info.space_id,
            False,
            model_references=model_references,
            plugin_tool_configs=plugin_tool_configs,
        )

        input_properties, input_requires = convert_to_properties_format(workflow_info.input_parameters)
        inputs = {"type": "object", "properties": input_properties, "required": input_requires}
        output_properties, _ = convert_to_properties_format(workflow_info.output_parameters)

        start_id: List[str] = []
        end_id: List[str] = []
        for c in components:
            if c.type == studio_dsl.ComponentType.COMPONENT_TYPE_START:
                start_id.append(c.id)
            elif c.type == studio_dsl.ComponentType.COMPONENT_TYPE_END:
                end_id.append(c.id)

        version = getattr(workflow_info, "workflow_version", "1.0.0") or "1.0.0"
        return studio_dsl.Workflow(
            inputs=inputs,
            outputs=output_properties,
            start_id=start_id,
            end_id=end_id,
            id=getattr(workflow_info, "workflow_id", ""),
            name=getattr(workflow_info, "name", "Unnamed Workflow"),
            version=version,
            description=getattr(workflow_info, "desc", "") or "",
            components=components,
            connections=connection_convert(canvas.edges),
        )
    except (json.JSONDecodeError, TypeError, AttributeError) as e:
        raise ValueError(f"Invalid workflow schema or input: {str(e)}") from e


def _apply_dsl_top_level_overrides(
    ir: Dict[str, Any],
    workflow_id: Optional[str],
    name: Optional[str],
    desc: Optional[str],
) -> Dict[str, Any]:
    out = dict(ir)
    if workflow_id:
        out["id"] = workflow_id
    if name is not None:
        out["name"] = name
    if desc is not None:
        out["description"] = desc
    return out


async def build_core_workflow_from_dsl_dict(
    ir: Dict[str, Any],
    *,
    workflow_id: Optional[str] = None,
    name: Optional[str] = None,
    desc: Optional[str] = None,
    space_id: str = "default",
    current_user: Optional[Dict[str, Any]] = None,
    skip_validation: bool = True,
) -> Any:
    """由 DSL 形态 JSON(含 dependencies.workflows)构建可运行 Workflow,子工作流不查库。"""
    del skip_validation
    ir = unwrap_workflow_document(ir)
    if not looks_like_dsl_workflow_export(ir):
        raise ValueError("build_core_workflow_from_dsl_dict 需要 components + connections 的 DSL 导出")
    registry = collect_workflow_registry(ir)
    ir2 = _apply_dsl_top_level_overrides(ir, workflow_id, name, desc)
    dl_workflow = workflow_dict_to_dl_workflow(ir2)
    user = current_user or {"user_id": "local", "space_id": space_id}
    loader = DependencyWorkflowLoader(registry, space_id, user)
    executor_wf = ExecutorWorkflow(dl_workflow, space_id, user)
    return await executor_wf.compile(Context(), loader=loader)


async def build_core_workflow_from_ir_dict(
    ir: Dict[str, Any],
    *,
    workflow_id: Optional[str] = None,
    name: Optional[str] = None,
    desc: Optional[str] = None,
    space_id: str = "default",
    current_user: Optional[Dict[str, Any]] = None,
    skip_validation: bool = True,
) -> Any:
    """从 IR 字典构建可执行的 Workflow"""
    ir_snapshot = copy.deepcopy(ir)
    ir0 = unwrap_workflow_document(ir_snapshot)
    if looks_like_dsl_workflow_export(ir0):
        return await build_core_workflow_from_dsl_dict(
            ir0,
            workflow_id=workflow_id,
            name=name,
            desc=desc,
            space_id=space_id,
            current_user=current_user,
            skip_validation=skip_validation,
        )

    src = ir0
    wid = (workflow_id or src.get("workflow_id") or src.get("id") or "").strip() or "local_workflow"
    wname = (name or src.get("name") or src.get("workflow_name") or "Workflow from IR").strip() or "Workflow"
    wdesc = (desc or src.get("description") or src.get("desc") or "").strip() or ""
    wver = str(src.get("workflow_version") or src.get("version") or "draft")

    nodes_snapshot = copy.deepcopy(src.get("nodes") or [])
    edges_snapshot = copy.deepcopy(src.get("edges") or [])
    canvas = {"nodes": nodes_snapshot, "edges": edges_snapshot}
    input_parameters, output_parameters = extract_inputs_and_outputs_from_canvas(canvas)
    refs = inject_api_keys_into_model_references(copy.deepcopy(src.get("model_references")))
    plugin_map = build_plugin_tool_config_map(src.get("plugins") or [])

    wf_base = WorkflowBase(
        workflow_id=wid,
        space_id=space_id,
        workflow_version=wver,
        name=wname,
        desc=wdesc or None,
        schema=json.dumps(canvas, ensure_ascii=False),
        input_parameters=input_parameters,
        output_parameters=output_parameters,
    )

    dl_workflow = workflow_convert_for_export(
        wf_base,
        skip_validation=skip_validation,
        model_references=refs,
        plugin_tool_configs=plugin_map,
    )

    user = current_user or {"user_id": "local", "space_id": space_id}
    executor_wf = ExecutorWorkflow(dl_workflow, space_id, user)
    return await executor_wf.compile(Context(), loader=None)