AI Workflow & Agentic Engine
The workflow module provides a graph-based orchestration engine for building AI pipelines and autonomous agents on top of the csilk AI unified interface.
Architecture
flowchart TB
subgraph "Public API (csilk/app/workflow.h)"
WF_NEW["csilk_wf_new()"]
WF_ADD["csilk_wf_add()"]
WF_ADD_AI["csilk_wf_add_ai()"]
WF_RUN["csilk_wf_run()"]
WF_BIND["csilk_wf_bind/on/on_loop()"]
WF_ROUTE["csilk_wf_route()"]
WF_REG_TOOL["csilk_wf_register_tool()"]
end
subgraph "Execution Engine (workflow.c)"
SCHED["DAG Scheduler\n(libuv thread pool)"]
ARENA["Per-execution Arena\n(zero-free memory)"]
TEMPLATE["Template Engine\n({{node.value.path}})"]
AGENT["Agent Loop\n(tool calling + LLM)"]
TRACE["Tracing System\n(nanosecond precision)"]
MONITOR["WebSocket Monitor\n(live broadcast)"]
BUDGET["Budget Enforcer\n(token cap)"]
end
subgraph "Persistence (workflow_wal.c)"
WAL["Binary WAL\n(fdatasync)"]
RESUME["Resume Engine\n(replay + continue)"]
end
subgraph "Declarative Layer (workflow_loader.c)"
YAML["YAML Loader\n(libyaml parser)"]
JSON["JSON Loader\n(cJSON parser)"]
REG["Global Handler Registry"]
end
WF_ADD_AI --> AGENT
WF_RUN --> SCHED
SCHED --> ARENA
SCHED --> TEMPLATE
SCHED --> TRACE
SCHED --> MONITOR
SCHED --> BUDGET
WF_RUN --> WAL
WAL --> RESUME
YAML --> JSON
JSON --> WF_NEW
JSON --> WF_ADD
JSON --> WF_ADD_AI
REG --> JSON
Core Concepts
Workflow Graph
A workflow is a directed acyclic graph (DAG) where:
- Nodes are execution units (C functions or built-in AI nodes).
- Edges define data flow and control flow between nodes.
- Entry nodes start execution (nodes with 0 incoming edges or marked explicitly).
- Data flows along edges as
csilk_data_tcontainers (type + value + metadata).
csilk_wf_t* wf = csilk_wf_new("ResearchPipeline");
// Add nodes
csilk_wf_node_t* n1 = csilk_wf_add(wf, "search", search_handler, NULL);
csilk_wf_node_t* n2 = csilk_wf_add_ai(wf, "summarize", &ai_config);
// Connect
csilk_wf_bind(n1, n2);
// Run
csilk_wf_run(wf, &input, on_complete);
Node Types
| Type | Creation API | Behavior |
|---|---|---|
| Custom Handler | csilk_wf_add() |
Executes a csilk_wf_handler_t C function |
| AI Node | csilk_wf_add_ai() |
Built-in handler that calls LLM with template injection + tool loop |
| Entry Node | csilk_wf_node_set_entry() |
Starts workflow execution (auto-detected by 0 incoming edges) |
AI Nodes and Template Injection
AI nodes accept a csilk_ai_config_t with template-based prompts:
csilk_wf_node_t* n = csilk_wf_add_ai(wf, "formatter", &(csilk_ai_config_t){
.model = "gpt-4",
.system_msg = "You are a technical writer.",
.prompt = "Summarize: {{search.value}}. Previous draft: {{draft.value.content}}",
.temperature = 0.3,
.max_tokens = 2048
});
Template syntax:
{{node_id.value}}— inserts the full output value of another node{{node_id.value.path.to.field}}— JSONPath extraction from JSON output{{input.value}}/{{input.value.path}}— references the initial workflow input
Edge Types
| API | Edge Type | Behavior |
|---|---|---|
csilk_wf_bind(from, to) |
Default | Always trigger to when from completes |
csilk_wf_on(from, "condition", to) |
Conditional | Trigger only when output type matches condition |
csilk_wf_on_loop(from, "cond", to) |
Loop Back | Same as conditional, but does NOT increment incoming count (prevents deadlock) |
csilk_wf_on_error(from, to) |
Error Fallback | Trigger if from handler returns NULL |
Join Policies
When a node has multiple incoming edges:
| Policy | API | Behavior |
|---|---|---|
| AND (default) | CSILK_WF_JOIN_AND |
Execute only after ALL predecessors complete |
| OR | CSILK_WF_JOIN_OR |
Execute after ANY predecessor completes |
csilk_wf_node_set_join(node, CSILK_WF_JOIN_OR);
Agentic Workflows
Tool Calling Loop
The built-in AI node handler (ai_node_handler()) implements an autonomous agent loop:
- Send prompt + registered tools to the LLM.
- If the response contains
tool_calls, execute them in parallel via libuv thread pool. - Inject tool results back as new messages.
- Repeat (up to 10 iterations) until the LLM responds with content.
// Register a tool
csilk_wf_register_tool(wf, "get_weather",
"Get current weather for a location",
"{\"type\":\"object\",\"properties\":{\"location\":{\"type\":\"string\"}}}",
weather_fn, NULL);
// AI node will automatically use registered tools
csilk_wf_add_ai(wf, "agent", &ai_config);
Dynamic Routing
Functional routing allows runtime decision of the next node:
const char* my_router(csilk_data_t* output) {
if (strstr(output->value, "ERROR")) return "fallback";
if (strstr(output->value, "REJECT")) return "retry_node";
return "next_node";
}
csilk_wf_route(node, my_router);
Agentic Loop with Conditional Edges
flowchart LR
N1["n1: Action"] --> N2["n2: Check"]
N2 -- "fail" --> N1
N2 -- "pass" --> N3["n3: Done"]
csilk_wf_bind(n1, n2);
csilk_wf_on_loop(n2, "fail", n1); // loop back without incrementing count
csilk_wf_on(n2, "pass", n3);
Memory Management
Each workflow execution creates a per-execution arena allocator:
- All node outputs, string duplications, and metadata are arena-allocated.
- The entire arena is freed at once when execution completes.
- No manual
free()needed inside handlers. - Arena is thread-safe (mutex-protected for parallel tool calls).
csilk_data_t* my_handler(csilk_wf_ctx_t* ctx, csilk_data_t* input, void* user_data) {
char* result = csilk_wf_strdup(ctx, "hello");
return csilk_wf_data_new(ctx, "text/plain", result);
}
Safety & Guardrails
Token Budget
Enforces a maximum total token consumption (prompt + completion) across all AI nodes:
csilk_wf_set_budget(wf, 10000); // stop after 10K total tokens
Node Timeout
Per-node timeout prevents hanging:
csilk_wf_node_set_timeout(node, 5000); // 5 second timeout
Workflow TTL
Global time-to-live for the entire execution:
csilk_wf_set_ttl(wf, 30); // 30 second TTL
Step Limit
Hard safety cap at MAX_WORKFLOW_STEPS (1000) total node executions per run.
Observability
Execution Tracing
Every node execution is recorded with nanosecond precision:
csilk_wf_run_traced(wf, &input, on_complete_with_trace);
// Trace emitted as callback: void on_trace(csilk_data_t* result, csilk_wf_trace_t* trace)
Trace fields per node: node_id, start_time, end_time, duration_us, input_dump, output_dump, model, prompt_tokens, completion_tokens, error.
Export trace to JSON:
char* json = csilk_wf_trace_to_json(trace);
Admin Dashboard Integration
Live workflow execution status is exposed via the unified admin dashboard:
- GET /admin/stats: Returns workflow metrics (
workflow_count,node_count,active_executions) as JSON. - GET /admin/ws: WebSocket endpoint broadcasting real-time workflow lifecycle events.
WebSocket Monitoring
Live workflow events broadcast via WebSocket:
csilk_wf_register_monitor(wf, upgraded_ws_context);
Events: workflow_start, node_queued, node_start, node_finish, workflow_end.
Mermaid Visualization
Export the workflow graph as a Mermaid diagram string:
char* mermaid = csilk_wf_to_mermaid(wf);
// Output: "graph TD\n search --> summarize\n summarize -. error .-> fallback\n"
Persistence & Resilience
Write-Ahead Log (WAL)
Each workflow execution records a binary WAL file:
| Event Type | Recorded Data |
|---|---|
WF_EV_START |
Initial input |
WF_EV_NODE_START |
Node ID |
WF_EV_NODE_FINISH |
Node ID + output type + output value |
WF_EV_END |
(empty) |
WAL format (binary, packed header):
[MAGIC:4][TYPE:1][TIMESTAMP:4][PAYLOAD_LEN:4][PAYLOAD...]
Resume
Interrupted executions can be resumed from the WAL:
csilk_wf_set_persistence(wf, "/var/log/workflows");
// ... later, after crash:
csilk_wf_resume(wf, "execution-uuid", on_complete);
The resume engine replays the WAL to reconstruct state, then continues unfinished nodes.
Declarative Workflow Definitions
JSON Format
Workflows can be defined declaratively and loaded at runtime:
{
"name": "ResearchAssistant",
"steps": [
{"id": "search", "type": "handler", "handler": "web_search", "entry": true},
{"id": "summarize", "type": "ai", "config": {
"model": "gpt-4",
"system_msg": "Summarize the following.",
"prompt": "{{search.value}}"
}},
{"id": "fallback", "type": "handler", "handler": "default_response"}
],
"connections": [
{"from": "search", "to": "summarize"},
{"from": "summarize", "to": "fallback", "condition": null}
]
}
csilk_wf_register_handler("web_search", web_search_handler);
csilk_wf_t* wf = csilk_wf_from_json(json_string);
csilk_wf_run(wf, &input, on_complete);
YAML Format
Same structure via YAML files:
csilk_wf_t* wf = csilk_wf_load_yaml("workflow.yaml");
Implementation Details
Scheduler
The scheduler in execute_node() runs each handler via uv_queue_work() on libuv's thread pool:
- Pre-execution: broadcast
node_queued, log WAL start event, create trace node. - Execution:
worker_cbruns the handler on a thread-pool thread. - Post-execution:
after_worker_cbprocesses output, checks budget, evaluates edges, schedules downstream nodes.
Agent Loop
The ai_node_handler() at src/app/workflow.c:505 implements:
- Resolve template placeholders via
resolve_templates(). - Build message array (system + user).
- Call
csilk_ai_chat(). - If
tool_callspresent:- Dispatch each tool to
uv_queue_work()for parallel execution. - Wait for all to complete via
uv_cond_wait(). - Inject tool results as new messages.
- Repeat (up to 10 iterations).
- Dispatch each tool to
- Return final content as
csilk_data_t.
WAL Append
The _wf_wal_append() function in workflow_wal.c uses raw POSIX I/O (open/write/fdatasync/close) with O_APPEND for crash-safe sequential logging.
File Layout
| File | Purpose |
|---|---|
include/csilk/app/workflow.h |
Public API (355 lines) |
include/csilk/app/workflow_wal.h |
WAL types and header format |
src/app/workflow.c |
Core engine (1163 lines) |
src/app/workflow_wal.c |
WAL implementation (44 lines) |
src/app/workflow_loader.c |
Declarative loader (268 lines) |
tests/test_workflow_agentic.c |
Agentic loop test |
tests/test_workflow_monitor.c |
Monitor integration test |
examples/example_ai_workflow.c |
Full workflow example |