工作流(LangGraph)
1. 概述
工作流模块提供基于 LangGraph 的工作流编排能力。它提供领域无关的基类,可扩展用于任何场景。
核心组件:
- BaseState — 通用工作流状态(TypedDict)
- BaseWorkflow — 抽象工作流基类
- BaseLangGraphTask — 抽象任务执行基类
- 路由工具 — 步数限制和 Agent 重复限制检查
- WorkflowRegistry — 工作流注册与发现
- WorkflowVisualizer — Mermaid 图表生成
2. BaseState
BaseState 是一个 TypedDict,定义了任何 LangGraph 工作流的最小状态字段。
class BaseState(TypedDict, total=False):
# 任务标识
task_id: str
task_label: str
session_id: str
# 流程控制
iteration: int
step_count: int
max_iterations: int
# 历史记录(自动累积)
agent_history: Annotated[List[str], add]
# 结果
success: bool
error_message: Optional[str]
扩展状态
领域专用工作流应继承 BaseState:
class MyDomainState(BaseState, total=False):
document_content: str
analysis_result: dict
3. BaseWorkflow
BaseWorkflow 是所有工作流的抽象基类。子类实现 build_graph() 定义图结构。
class BaseWorkflow(ABC, Generic[StateType]):
def __init__(self, config: dict, trace=None):
"""
Args:
config: 配置字典(包含 max_step 等)
trace: 可选的 Trace 实例,用于记录执行过程
"""
@abstractmethod
def build_graph(self) -> StateGraph:
"""构建工作流图。返回未编译的 StateGraph。"""
def compile(self):
"""编译图。返回可直接调用 ainvoke() 的 LangGraph 应用。"""
def visualize(self) -> str:
"""生成 Mermaid 格式的流程图。"""
示例
from langgraph.graph import StateGraph, END
from akg_agents.core_v2.langgraph_base import BaseWorkflow, BaseState
class MyWorkflow(BaseWorkflow[BaseState]):
def __init__(self, agents: dict, config: dict):
super().__init__(config)
self.agents = agents
def build_graph(self) -> StateGraph:
workflow = StateGraph(BaseState)
workflow.add_node("agent_a", self.agents["a"].run)
workflow.add_node("agent_b", self.agents["b"].run)
workflow.add_edge("agent_a", "agent_b")
workflow.add_edge("agent_b", END)
workflow.set_entry_point("agent_a")
return workflow
4. BaseLangGraphTask
BaseLangGraphTask 提供任务执行框架。子类实现工作流初始化和状态准备。
class BaseLangGraphTask(ABC):
def __init__(self, task_id: str, config: dict, workflow_name: str = "default"):
@abstractmethod
def _init_workflow(self):
"""初始化 self.workflow 和 self.app。"""
@abstractmethod
def _prepare_initial_state(self, init_info: Optional[dict]) -> Dict[str, Any]:
"""返回工作流的初始状态字典。"""
async def run(self, init_info=None) -> Tuple[bool, dict]:
"""执行任务。返回 (是否成功, 最终状态)。"""
示例
from akg_agents.core_v2.langgraph_base import BaseLangGraphTask
class MyTask(BaseLangGraphTask):
def __init__(self, task_id: str, config: dict):
super().__init__(task_id, config)
self._init_workflow()
def _init_workflow(self):
self.workflow = MyWorkflow(agents={...}, config=self.config)
self.app = self.workflow.compile()
def _prepare_initial_state(self, init_info):
return {
"task_id": self.task_id,
"iteration": 0,
"step_count": 0,
"agent_history": [],
}
5. 路由工具
路由函数用于控制工作流执行流程。
| 函数 | 说明 |
|---|---|
check_step_limit(step_count, max_step) |
步数超限时返回 True。 |
check_agent_repeat_limit(agent_history, agent_name, max_repeats) |
Agent 连续调用超过 max_repeats 次时返回 True(默认 3 次)。 |
get_illegal_agents(step_count, max_step, agent_history, repeat_limits) |
返回被禁止的 Agent 名称集合。返回 {"*"} 表示全部禁止(步数超限)。 |
在路由中使用
from akg_agents.core_v2.langgraph_base import get_illegal_agents
def my_router(state):
illegal = get_illegal_agents(
state["step_count"], 20,
state["agent_history"],
{"coder": 3, "designer": 2}
)
if "*" in illegal:
return END
# 路由到下一个 Agent...
6. WorkflowRegistry
WorkflowRegistry 管理工作流类,支持 scope。
from akg_agents.core_v2.workflows import WorkflowRegistry, register_workflow
@register_workflow
class MyWorkflow(BaseWorkflow):
...
# 带 scope
@register_workflow(scopes=["op"])
class KernelWorkflow(BaseWorkflow):
...
API 参考
| 方法 | 说明 |
|---|---|
WorkflowRegistry.register(cls, name, scopes) |
注册工作流类。 |
WorkflowRegistry.get_workflow_class(name) |
根据名称获取工作流类。 |
WorkflowRegistry.list_workflows(scope) |
列出已注册工作流,可按 scope 过滤。 |
WorkflowRegistry.get_tool_config(workflow_name) |
获取工作流的工具配置(用于 ToolExecutor 分发)。 |
WorkflowRegistry.is_registered(name, scope) |
检查工作流是否已注册(在指定 scope 中)。 |
WorkflowRegistry.clear() |
移除所有已注册工作流。 |
7. WorkflowVisualizer
从编译后的工作流生成 Mermaid 图表:
from akg_agents.core_v2.langgraph_base import WorkflowVisualizer
mermaid_str = workflow.visualize()
print(mermaid_str)
8. 节点追踪
track_node 工具用于在工作流中追踪节点执行:
from akg_agents.core_v2.langgraph_base import track_node
# 在工作流节点中使用,追踪执行过程