Workflow (LangGraph)
1. Overview
The Workflow module provides LangGraph-based workflow orchestration for AKG Agents. It offers domain-agnostic base classes that can be extended for any scenario.
Key components:
- BaseState — Generic workflow state (TypedDict)
- BaseWorkflow — Abstract workflow base class
- BaseLangGraphTask — Abstract task execution base class
- Router utilities — Step limit and agent repeat limit checks
- WorkflowRegistry — Workflow registration and discovery
- WorkflowVisualizer — Mermaid diagram generation
2. BaseState
BaseState is a TypedDict defining the minimal state fields for any LangGraph workflow.
class BaseState(TypedDict, total=False):
# Task identification
task_id: str
task_label: str
session_id: str
# Flow control
iteration: int
step_count: int
max_iterations: int
# History (auto-accumulating)
agent_history: Annotated[List[str], add]
# Results
success: bool
error_message: Optional[str]
Extending State
Domain-specific workflows should extend BaseState:
class MyDomainState(BaseState, total=False):
document_content: str
analysis_result: dict
3. BaseWorkflow
BaseWorkflow is the abstract base class for all workflows. Subclasses implement build_graph() to define the graph structure.
class BaseWorkflow(ABC, Generic[StateType]):
def __init__(self, config: dict, trace=None):
"""
Args:
config: Configuration dict (includes max_step, etc.)
trace: Optional Trace instance for execution recording
"""
@abstractmethod
def build_graph(self) -> StateGraph:
"""Build the workflow graph. Returns an uncompiled StateGraph."""
def compile(self):
"""Compile the graph. Returns a LangGraph app for ainvoke()."""
def visualize(self) -> str:
"""Generate a Mermaid diagram string."""
Example
from langgraph.graph import StateGraph, END
from akg_agents.core_v2.langgraph_base import BaseWorkflow, BaseState
class MyWorkflow(BaseWorkflow[BaseState]):
def __init__(self, agents: dict, config: dict):
super().__init__(config)
self.agents = agents
def build_graph(self) -> StateGraph:
workflow = StateGraph(BaseState)
workflow.add_node("agent_a", self.agents["a"].run)
workflow.add_node("agent_b", self.agents["b"].run)
workflow.add_edge("agent_a", "agent_b")
workflow.add_edge("agent_b", END)
workflow.set_entry_point("agent_a")
return workflow
4. BaseLangGraphTask
BaseLangGraphTask provides the task execution framework. Subclasses implement workflow initialization and state preparation.
class BaseLangGraphTask(ABC):
def __init__(self, task_id: str, config: dict, workflow_name: str = "default"):
@abstractmethod
def _init_workflow(self):
"""Initialize self.workflow and self.app."""
@abstractmethod
def _prepare_initial_state(self, init_info: Optional[dict]) -> Dict[str, Any]:
"""Return the initial state dict for the workflow."""
async def run(self, init_info=None) -> Tuple[bool, dict]:
"""Execute the task. Returns (success, final_state)."""
Example
from akg_agents.core_v2.langgraph_base import BaseLangGraphTask
class MyTask(BaseLangGraphTask):
def __init__(self, task_id: str, config: dict):
super().__init__(task_id, config)
self._init_workflow()
def _init_workflow(self):
self.workflow = MyWorkflow(agents={...}, config=self.config)
self.app = self.workflow.compile()
def _prepare_initial_state(self, init_info):
return {
"task_id": self.task_id,
"iteration": 0,
"step_count": 0,
"agent_history": [],
}
5. Router Utilities
Router functions help control workflow execution flow.
| Function | Description |
|---|---|
check_step_limit(step_count, max_step) |
Returns True if step count exceeds the limit. |
check_agent_repeat_limit(agent_history, agent_name, max_repeats) |
Returns True if an agent has been called consecutively more than max_repeats times (default: 3). |
get_illegal_agents(step_count, max_step, agent_history, repeat_limits) |
Returns a set of forbidden agent names. Returns {"*"} if all agents are forbidden (step limit exceeded). |
Usage in Routers
from akg_agents.core_v2.langgraph_base import get_illegal_agents
def my_router(state):
illegal = get_illegal_agents(
state["step_count"], 20,
state["agent_history"],
{"coder": 3, "designer": 2}
)
if "*" in illegal:
return END
# Route to next agent...
6. WorkflowRegistry
WorkflowRegistry manages workflow classes with scope support.
from akg_agents.core_v2.workflows import WorkflowRegistry, register_workflow
@register_workflow
class MyWorkflow(BaseWorkflow):
...
# With scope
@register_workflow(scopes=["op"])
class KernelWorkflow(BaseWorkflow):
...
API Reference
| Method | Description |
|---|---|
WorkflowRegistry.register(cls, name, scopes) |
Register a workflow class. |
WorkflowRegistry.get_workflow_class(name) |
Get a workflow class by name. |
WorkflowRegistry.list_workflows(scope) |
List registered workflows, optionally filtered by scope. |
WorkflowRegistry.get_tool_config(workflow_name) |
Get tool configuration for a workflow (for ToolExecutor dispatch). |
WorkflowRegistry.is_registered(name, scope) |
Check if a workflow is registered (in a given scope). |
WorkflowRegistry.clear() |
Remove all registered workflows. |
7. WorkflowVisualizer
Generate Mermaid diagrams from compiled workflows:
from akg_agents.core_v2.langgraph_base import WorkflowVisualizer
mermaid_str = workflow.visualize()
print(mermaid_str)
8. Node Tracking
The track_node utility helps track node execution within workflows:
from akg_agents.core_v2.langgraph_base import track_node
# Used within workflow nodes to track execution