import { setTimeout as sleep } from "node:timers/promises";
import {
applyModelEventToAssembler,
assembleAssistantMessage,
cloneMessages,
createModelMessageAssemblerState,
type CanonicalToolCall,
type CanonicalToolSchema,
PROMPT_TOO_LONG_ANTHROPIC_PATTERN,
PROMPT_TOO_LONG_OPENAI_PATTERN,
REQUEST_TOO_LARGE_PATTERN,
type CanonicalMessage,
type CanonicalModelError,
type CanonicalModelRequest,
type CanonicalUsage,
} from "../../model/index.js";
import type {
PilotDeckReadFileStateMap,
PilotDeckSubagentForkApi,
PilotDeckToolResult,
PilotDeckToolRuntimeContext,
PilotDeckWriteSnapshotMap,
} from "../../tool/index.js";
import {
SUBAGENT_DEFINITIONS,
getSubagentDefinition,
} from "../sub/builtinSubagentTypes.js";
import { buildPlanModeAgentToolSchema } from "../../tool/builtin/agent.js";
import { agentError } from "../protocol/errors.js";
import type { AgentEvent } from "../protocol/events.js";
import type { AgentPermissionDenial, AgentTurnResult } from "../protocol/result.js";
import type { AgentRuntimeConfig } from "../runtime/AgentRuntimeConfig.js";
import type { AgentRuntimeDependencies } from "../runtime/AgentRuntimeDependencies.js";
import type { LifecycleDispatchResult } from "../../lifecycle/index.js";
import type { PilotDeckHookEvent } from "../../extension/hooks/protocol/events.js";
import { NullContextRuntime } from "../../context/NullContextRuntime.js";
import type { AgentContextRuntime } from "../../context/ContextRuntime.js";
import type { ContextRecoveryDecision } from "../../context/index.js";
import type { PermissionMode, PermissionRule, PermissionRuleSet } from "../../permission/index.js";
import { collectToolCalls } from "./collectToolCalls.js";
import { createMissingToolResult, ensureToolResultPairing } from "./ensureToolResultPairing.js";
import { projectToolResults } from "./projectToolResults.js";
const TOOL_EVENT_PUMP_INTERVAL_MS = 500;
const SUBAGENT_STATUS_HEARTBEAT_MS = 2_000;
type ActiveSubagentStatus = {
subagentId: string;
subagentType?: string;
startedAtMs: number;
lastHeartbeatMs: number;
currentToolCallId?: string;
currentToolName?: string;
};
export type AgentLoopInput = {
sessionId: string;
turnId: string;
messages: CanonicalMessage[];
maxTurns?: number;
permissionMode?: PermissionMode;
permissionRules?: Partial<PermissionRuleSet>;
abortSignal?: AbortSignal;
onDurableMessage?: (message: CanonicalMessage) => void | Promise<void>;
};
export type AgentLoopRunResult = {
result: AgentTurnResult;
messages: CanonicalMessage[];
};
export type AgentLoopSeedState = {
readFileState?: PilotDeckReadFileStateMap;
writeSnapshots?: PilotDeckWriteSnapshotMap;
};
export class AgentLoop {
private readonly readFileState: PilotDeckReadFileStateMap;
private readonly writeSnapshots: PilotDeckWriteSnapshotMap;
constructor(
private readonly config: AgentRuntimeConfig,
private readonly dependencies: AgentRuntimeDependencies,
seedState?: AgentLoopSeedState,
) {
this.readFileState = cloneReadFileStateMap(seedState?.readFileState);
this.writeSnapshots = cloneWriteSnapshotMap(seedState?.writeSnapshots);
}
snapshotFileState(): AgentLoopSeedState {
return {
readFileState: cloneReadFileStateMap(this.readFileState),
writeSnapshots: cloneWriteSnapshotMap(this.writeSnapshots),
};
}
async *run(input: AgentLoopInput): AsyncGenerator<AgentEvent, AgentLoopRunResult, unknown> {
this.applyPermissionOverrides(input.permissionMode, input.permissionRules);
const startedAt = this.now().toISOString();
let messages = [...input.messages];
let turnCount = 1;
let usage: CanonicalUsage = {};
let permissionDenials: AgentPermissionDenial[] = [];
let structuredOutput: unknown;
let finalMessage: CanonicalMessage | undefined;
const captureTurn = async (errored: boolean): Promise<void> => {
const hook = this.dependencies.context?.captureTurn;
if (!hook) return;
try {
await hook.call(this.dependencies.context, {
sessionId: input.sessionId,
turnId: input.turnId,
messages,
errored,
});
} catch {
}
};
* Single-shot reactive truncate-and-retry guard. Set true after the loop
* already truncated for a `prompt_too_long` once; subsequent PTL errors
* fall through to fallback / fail (legacy single-shot semantics).
*/
let hasAttemptedCompact = false;
* Single-shot guard for `max_output_reached` retries. The loop bumps
* `config.maxOutputTokens` (capped at `OUTPUT_TOKEN_RETRY_CEILING`) once
* and retries; a second hit falls through to the recovery policy.
*/
let hasAttemptedOutputRetry = false;
const MAX_JSON_SELF_CORRECT_RETRIES = 3;
let jsonSelfCorrectCount = 0;
* Circuit breaker: consecutive turns where ALL tool calls are
* `invalid_tool_input` errors. When the model is stuck in a loop
* (e.g. qwen repeatedly emitting empty-param bash calls), terminate
* early instead of burning tokens. Resets on any turn with at least
* one successful tool call.
*/
const MAX_CONSECUTIVE_ALL_INVALID_TURNS = 3;
let consecutiveAllInvalidTurns = 0;
const stickyInfo = this.dependencies.router.invalidateSticky?.(input.sessionId);
let previousTier: string | undefined = stickyInfo?.previousTier;
while (true) {
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const ctx = this.dependencies.context;
if (ctx?.tryAutoCompact) {
try {
const compact = await ctx.tryAutoCompact({
messages,
abortSignal: input.abortSignal,
});
if (compact.type === "compacted") {
messages = compact.messages;
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "auto_compact",
};
}
yield {
type: "context_budget",
sessionId: input.sessionId,
turnId: input.turnId,
snapshot: compact.snapshot,
};
} catch {
}
yield* this.drainEventBuffer();
}
let request = await this.createModelRequest(messages, input);
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
this.dispatchLifecycle(input, "PreModelRequest", {
provider: request.provider,
model: request.model,
}).catch(() => {});
yield {
type: "model_request_started",
sessionId: input.sessionId,
turnId: input.turnId,
model: request.model,
provider: request.provider,
};
const decision = await this.dependencies.router.decide({
request,
sessionId: input.sessionId,
isMainAgent: !this.config.isSubagent,
metadata: previousTier ? { previousTier } : undefined,
});
const getMaxCtx = this.dependencies.getModelMaxContextTokens;
const agentMaxCtx = this.config.maxContextTokens;
if (ctx?.tryAutoCompact && getMaxCtx && agentMaxCtx) {
const routedMaxCtx = getMaxCtx(decision.provider, decision.model);
if (routedMaxCtx !== undefined && routedMaxCtx < agentMaxCtx) {
try {
const recompact = await ctx.tryAutoCompact({
messages,
abortSignal: input.abortSignal,
maxContextTokens: routedMaxCtx,
});
if (recompact.type === "compacted") {
messages = recompact.messages;
request = await this.createModelRequest(messages, input);
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "auto_compact",
};
}
yield {
type: "context_budget",
sessionId: input.sessionId,
turnId: input.turnId,
snapshot: recompact.snapshot,
};
} catch {
}
}
}
const assembler = createModelMessageAssemblerState();
try {
for await (const event of this.dependencies.router.execute(decision, request, {
sessionId: input.sessionId,
turnId: input.turnId,
projectPath: this.config.cwd,
abortSignal: input.abortSignal,
})) {
yield { type: "model_event", sessionId: input.sessionId, turnId: input.turnId, event };
applyModelEventToAssembler(assembler, event);
if (event.type === "error") {
break;
}
}
if (!stickyInfo?.orchestrating) previousTier = undefined;
} catch (error) {
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const stopFailureMsg = error instanceof Error ? error.message : String(error);
await this.dispatchLifecycle(input, "StopFailure", { error: stopFailureMsg });
yield { type: "stop_failure", sessionId: input.sessionId, turnId: input.turnId, error: stopFailureMsg };
const result = this.createTurnResult(input, {
type: "error",
stopReason: "model_error",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
errors: [agentError("agent_model_error", stopFailureMsg)],
});
yield { type: "turn_failed", sessionId: input.sessionId, turnId: input.turnId, error: result.errors![0]! };
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const assembled = assembleAssistantMessage(assembler);
usage = mergeUsage(usage, assembled.usage);
finalMessage = assembled.message;
messages.push(assembled.message);
yield { type: "assistant_message", sessionId: input.sessionId, turnId: input.turnId, message: assembled.message };
await input.onDurableMessage?.(assembled.message);
const toolCalls = collectToolCalls(assembled.message);
if (assembled.error) {
if (toolCalls.length > 0) {
const projected = projectToolResults(
toolCalls.map((call) => createMissingToolResult(call, this.now, "Model error interrupted tool execution.")),
);
messages.push(...projected);
yield { type: "tool_results_projected", sessionId: input.sessionId, turnId: input.turnId, message: projected[0]! };
for (const msg of projected) {
await input.onDurableMessage?.(msg);
}
}
if (
this.config.jsonSelfCorrect &&
assembled.error.code === "invalid_tool_arguments" &&
jsonSelfCorrectCount < MAX_JSON_SELF_CORRECT_RETRIES
) {
jsonSelfCorrectCount++;
messages.push({
role: "user",
content: [{
type: "text",
text: "Your previous tool call contained invalid JSON in the arguments and could not be parsed. "
+ "Please retry with valid JSON. Common issues: missing quotes around keys/values, "
+ "trailing commas, unescaped special characters in strings.",
}],
metadata: { synthetic: true, purpose: "json_self_correct" },
});
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "model_error",
};
continue;
}
const reactive = await this.tryReactiveRecover(input, assembled.error, messages, hasAttemptedCompact);
if (reactive && reactive.type === "truncate_head_and_retry") {
messages = stripTrailingErrorPair(messages);
messages = truncateHeadKeepRatio(messages, reactive.keepRatio);
hasAttemptedCompact = true;
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "model_error",
};
continue;
}
if (reactive && reactive.type === "strip_images_and_retry") {
messages = stripTrailingErrorPair(messages);
messages = stripImagesFromMessages(messages);
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "model_error",
};
continue;
}
if (
assembled.error.code === "max_output_reached" &&
!hasAttemptedOutputRetry
) {
messages = stripTrailingErrorPair(messages);
const previous = this.config.maxOutputTokens ?? OUTPUT_TOKEN_RETRY_DEFAULT;
this.config.maxOutputTokens = Math.min(previous * 2, OUTPUT_TOKEN_RETRY_CEILING);
hasAttemptedOutputRetry = true;
yield {
type: "turn_continued",
sessionId: input.sessionId,
turnId: input.turnId,
reason: "model_error",
};
continue;
}
const classified = classifyModelError(assembled.error);
await this.dispatchLifecycle(input, "StopFailure", { error: assembled.error });
yield { type: "stop_failure", sessionId: input.sessionId, turnId: input.turnId, error: typeof assembled.error === "string" ? assembled.error : JSON.stringify(assembled.error) };
const result = this.createTurnResult(input, {
type: "error",
stopReason: classified.stopReason,
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
errors: [classified.error],
});
yield { type: "turn_failed", sessionId: input.sessionId, turnId: input.turnId, error: result.errors![0]! };
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
if (toolCalls.length === 0) {
const stopHooks = await this.dispatchLifecycle(input, "Stop", {
stopHookActive: false,
lastAssistantMessage: textFromMessage(assembled.message),
});
yield { type: "stop_requested", sessionId: input.sessionId, turnId: input.turnId };
messages.push(...stopHooks.messages);
const stopBlock = findLifecycleBlock(stopHooks);
if (stopBlock) {
const result = this.createTurnResult(input, {
type: "error",
stopReason: "tool_error",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
structuredOutput,
errors: [agentError("agent_unsupported_feature", stopBlock.reason)],
});
yield { type: "turn_failed", sessionId: input.sessionId, turnId: input.turnId, error: result.errors![0]! };
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const result = this.createTurnResult(input, {
type: "success",
stopReason: "completed",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
structuredOutput,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
yield { type: "tool_calls_detected", sessionId: input.sessionId, turnId: input.turnId, calls: toolCalls };
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
let results: PilotDeckToolResult[];
try {
const toolContext = this.createToolContext(input, messages);
if (assembled.finishReason === "length") {
toolContext.outputTruncated = true;
}
results = yield* this.executeToolsWithEventPump(
toolCalls,
toolContext,
input,
);
} catch (error) {
results = toolCalls.map((call) =>
createMissingToolResult(call, this.now, error instanceof Error ? error.message : String(error)),
);
}
if (input.abortSignal?.aborted) {
const result = this.createTurnResult(input, {
type: "aborted",
stopReason: "aborted_streaming",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
yield* this.drainEventBuffer();
const pairedResults = ensureToolResultPairing(toolCalls, results, this.now);
permissionDenials = [...permissionDenials, ...collectPermissionDenials(pairedResults)];
for (const result of pairedResults) {
if (result.type === "success" && result.metadata?.structuredOutput) {
structuredOutput = result.data;
}
const requestedMode = readRequestedMode(result.type === "success" ? result.data : undefined);
if (requestedMode) {
let effectiveMode = requestedMode;
if (requestedMode === "plan" && this.config.permissionMode !== "plan") {
this.config.permissionModeBeforePlan = this.config.permissionMode;
} else if (this.config.permissionMode === "plan" && requestedMode !== "plan") {
if (this.config.permissionModeBeforePlan) {
effectiveMode = this.config.permissionModeBeforePlan;
this.config.permissionModeBeforePlan = undefined;
}
}
this.config.permissionMode = effectiveMode;
this.config.permissionContext.mode = effectiveMode;
yield { type: "mode_change_requested", sessionId: input.sessionId, turnId: input.turnId, mode: effectiveMode };
}
yield { type: "tool_result", sessionId: input.sessionId, turnId: input.turnId, result };
}
const projected = projectToolResults(pairedResults);
const [toolResultMsg, ...supplementalMsgs] = projected;
const ctxApply = this.dependencies.context?.applyToolResults;
if (ctxApply) {
try {
const applied = await ctxApply.call(this.dependencies.context, {
sessionId: input.sessionId,
turnId: input.turnId,
toolResultMessage: toolResultMsg,
messages,
});
messages = applied.messages;
} catch {
messages.push(toolResultMsg);
}
} else {
messages.push(toolResultMsg);
}
for (const supplemental of supplementalMsgs) {
messages.push(supplemental);
}
yield { type: "tool_results_projected", sessionId: input.sessionId, turnId: input.turnId, message: toolResultMsg };
await input.onDurableMessage?.(toolResultMsg);
for (const supplemental of supplementalMsgs) {
await input.onDurableMessage?.(supplemental);
}
const lifecycleBlock = findToolLifecycleBlock(pairedResults);
if (lifecycleBlock) {
const result = this.createTurnResult(input, {
type: "error",
stopReason: "tool_error",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
structuredOutput,
errors: [agentError("agent_unsupported_feature", lifecycleBlock.reason)],
});
yield { type: "turn_failed", sessionId: input.sessionId, turnId: input.turnId, error: result.errors![0]! };
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const allInvalid = pairedResults.length > 0 && pairedResults.every(
(r) => r.type === "error" && r.error.code === "invalid_tool_input",
);
if (allInvalid) {
consecutiveAllInvalidTurns++;
if (consecutiveAllInvalidTurns >= MAX_CONSECUTIVE_ALL_INVALID_TURNS) {
const result = this.createTurnResult(input, {
type: "error",
stopReason: "tool_error",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
structuredOutput,
errors: [agentError(
"agent_tool_error_loop",
`Terminated: ${consecutiveAllInvalidTurns} consecutive turns with all tool calls failing input validation. The model appears stuck in a loop.`,
)],
});
yield { type: "turn_failed", sessionId: input.sessionId, turnId: input.turnId, error: result.errors![0]! };
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
} else {
consecutiveAllInvalidTurns = 0;
}
if (this.config.stopOnStructuredOutput && structuredOutput !== undefined) {
const result = this.createTurnResult(input, {
type: "success",
stopReason: "completed",
usage,
permissionDenials,
turns: turnCount,
startedAt,
finalMessage,
structuredOutput,
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
const nextTurnCount = turnCount + 1;
if (input.maxTurns && nextTurnCount > input.maxTurns) {
const result = this.createTurnResult(input, {
type: "max_turns",
stopReason: "max_turns",
usage,
permissionDenials,
turns: nextTurnCount,
startedAt,
finalMessage,
structuredOutput,
errors: [agentError("agent_max_turns_reached", `Reached maximum number of turns (${input.maxTurns}).`)],
});
await captureTurn(result.type === "error");
yield { type: "turn_completed", sessionId: input.sessionId, turnId: input.turnId, result };
return { result, messages };
}
turnCount = nextTurnCount;
yield { type: "turn_continued", sessionId: input.sessionId, turnId: input.turnId, reason: "next_turn" };
}
}
private async tryReactiveRecover(
input: AgentLoopInput,
error: CanonicalModelError,
messages: CanonicalMessage[],
hasAttemptedCompact: boolean,
): Promise<ContextRecoveryDecision | undefined> {
const ctx: AgentContextRuntime | undefined = this.dependencies.context;
if (!ctx?.recoverFromModelError) {
return undefined;
}
try {
return await ctx.recoverFromModelError({
sessionId: input.sessionId,
turnId: input.turnId,
error,
messages,
hasAttemptedCompact,
});
} catch {
return undefined;
}
}
private async createModelRequest(
messages: CanonicalMessage[],
input: AgentLoopInput,
): Promise<CanonicalModelRequest> {
const contextRuntime = this.dependencies.context ?? new NullContextRuntime();
const planTodo = this.dependencies.planTodoManager?.forSession(input.sessionId);
let tools = this.dependencies.tools.registry.toCanonicalSchemas();
if (this.config.permissionMode === "plan") {
tools = applyPlanModeToolOverrides(tools);
}
const prepared = await contextRuntime.prepareForModel({
sessionId: input.sessionId,
turnId: input.turnId,
cwd: this.config.cwd,
provider: this.config.provider,
model: this.config.model,
permissionMode: this.config.permissionMode,
additionalWorkingDirectories: this.config.permissionContext.additionalWorkingDirectories,
messages: cloneMessages(messages),
tools,
maxMessages: this.config.maxContextMessages,
customSystemPrompt: this.config.systemPrompt,
appendSystemPrompt: planTodo?.buildPromptAddendum(),
abortSignal: input.abortSignal,
});
this.dispatchLifecycle(input, "InstructionsLoaded", {
hasSystemPrompt: !!prepared.systemPrompt,
}).catch(() => {});
this.dependencies.eventEmitter?.({
type: "instructions_loaded",
sessionId: input.sessionId,
turnId: input.turnId,
hasSystemPrompt: !!prepared.systemPrompt,
});
return {
provider: this.config.provider,
model: this.config.model,
messages: prepared.messages,
systemPrompt: prepared.systemPrompt ?? this.config.systemPrompt,
tools: prepared.tools,
toolChoice: this.config.toolChoice,
maxOutputTokens: this.config.maxOutputTokens,
temperature: this.config.temperature,
thinking: this.config.thinking,
stream: true,
metadata: this.config.metadata,
cacheBreakpoints: prepared.cacheBreakpoints,
};
}
private createToolContext(
input: AgentLoopInput,
messages: CanonicalMessage[],
): PilotDeckToolRuntimeContext {
const planDirectoryPath = this.dependencies.planFileManager?.getPlanDirectoryPath();
const planTodo = this.dependencies.planTodoManager?.forSession(input.sessionId);
const permissionContext = {
...this.config.permissionContext,
cwd: this.config.cwd,
...(planDirectoryPath ? { planDirectoryPath } : {}),
};
return {
sessionId: input.sessionId,
turnId: input.turnId,
messageId: input.turnId,
cwd: this.config.cwd,
abortSignal: input.abortSignal,
subagentTimeoutMs: this.config.subagentTimeoutMs,
permissionMode: this.config.permissionMode,
permissionContext,
auditRecorder: this.dependencies.auditRecorder,
now: this.now,
env: this.config.env,
maxResultBytes: this.config.maxResultBytes,
model: {
stream: (request, signal) =>
this.dependencies.router.stream(request, {
sessionId: input.sessionId,
turnId: input.turnId,
projectPath: this.config.cwd,
abortSignal: signal,
isMainAgent: false,
}),
},
elicitation: this.dependencies.elicitation,
fileHistory: this.dependencies.fileHistory,
subagentDepth: this.config.subagentDepth ?? 0,
subagent: this.buildSubagentForkApi(input, messages),
modelMultimodal: this.config.modelMultimodal,
maxOutputTokens: this.config.maxOutputTokens,
readFileState: this.readFileState,
writeSnapshots: this.writeSnapshots,
fileUpdateNotifier: this.dependencies.fileUpdateNotifier,
...(planTodo ? { planTodo } : {}),
...(planDirectoryPath
? {
planDirectory: {
path: planDirectoryPath,
resolve: (filePath: string) =>
this.dependencies.planFileManager?.resolvePlanFilePath(filePath, this.config.cwd),
read: (filePath: string) =>
this.dependencies.planFileManager?.readPlanFile(filePath, this.config.cwd),
},
}
: {}),
};
}
private buildSubagentForkApi(
input: AgentLoopInput,
messages: CanonicalMessage[],
): PilotDeckSubagentForkApi {
const depth = this.config.subagentDepth ?? 0;
const maxDepth = this.config.maxSubagentDepth ?? 1;
return {
depth,
maxSubagentDepth: maxDepth,
listDefinitions: () =>
Object.values(SUBAGENT_DEFINITIONS).map((d) => ({
id: d.id,
description: d.description,
})),
isAllowedDefinition: (id: string) => getSubagentDefinition(id) !== undefined,
fork: async ({ definitionId, directive, subagentId, abortSignal, timeoutMs }) => {
const { SubAgentSession } = await import("../sub/SubAgentSession.js");
const def = getSubagentDefinition(definitionId);
if (!def) throw new Error(`Unknown subagent type: ${definitionId}`);
const composedAbort = composeAbortSignal({
parent: abortSignal,
timeoutMs,
});
const subagentSessionId = `${this.config.cwd}::sub::${subagentId}`;
const transcriptHooks = this.dependencies.subagentTranscript;
const sidechain = transcriptHooks?.subagentTranscriptResolver?.(subagentId);
const transcriptRelativePath = sidechain?.transcriptRelativePath ?? "";
await transcriptHooks?.recordSubagentStarted?.({
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
prompt: directive,
transcriptRelativePath,
subagentSessionId,
});
await this.dispatchLifecycle(input, "SubagentStart", {
subagentId,
subagentType: def.id,
});
this.dependencies.eventEmitter?.({
type: "subagent_started",
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
});
const subSession = new SubAgentSession({
definition: def,
directive,
parentMessages: messages,
parentConfig: {
...this.config,
subagentDepth: depth + 1,
isSubagent: true,
},
parentDependencies: this.dependencies,
parentReadFileState: this.readFileState,
parentWriteSnapshots: this.writeSnapshots,
parentSessionId: input.sessionId,
parentTurnId: input.turnId,
subagentSessionId,
subagentId,
abortSignal: composedAbort.signal,
sidechainTranscript: sidechain
? {
recordAcceptedInput: sidechain.recordAcceptedInput.bind(sidechain),
recordDurableMessage: sidechain.recordDurableMessage.bind(sidechain),
}
: undefined,
});
let report;
let errored = false;
try {
report = await subSession.run();
if (composedAbort.timedOut()) {
throw new Error(`Subagent timed out after ${timeoutMs}ms.`);
}
} catch (err) {
composedAbort.cleanup();
errored = true;
await transcriptHooks?.recordSubagentCompleted?.({
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
summary: err instanceof Error ? err.message : String(err),
turns: 0,
durationMs: 0,
errored: true,
});
await this.dispatchLifecycle(input, "SubagentStop", {
subagentId,
subagentType: def.id,
success: false,
});
this.dependencies.eventEmitter?.({
type: "subagent_completed",
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
success: false,
durationMs: 0,
});
throw err;
}
composedAbort.cleanup();
await transcriptHooks?.recordSubagentCompleted?.({
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
summary: report.markdown,
usage: report.usage,
turns: report.turns,
durationMs: report.durationMs,
errored,
});
await this.dispatchLifecycle(input, "SubagentStop", {
subagentId,
subagentType: def.id,
success: !errored,
});
this.dependencies.eventEmitter?.({
type: "subagent_completed",
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: def.id,
success: !errored,
durationMs: report.durationMs,
});
return {
markdown: report.markdown,
usage: report.usage,
turns: report.turns,
durationMs: report.durationMs,
parsed: report.parsed as unknown as Record<string, string> | undefined,
};
},
};
}
private async dispatchLifecycle(
input: AgentLoopInput,
event: PilotDeckHookEvent,
payload: Record<string, unknown>,
): Promise<LifecycleDispatchResult> {
return this.dependencies.lifecycle?.dispatch({
event,
baseInput: {
sessionId: input.sessionId,
transcriptPath: "",
cwd: this.config.cwd,
permissionMode: this.config.permissionMode,
},
payload,
matchQuery: event,
signal: input.abortSignal,
env: this.config.env,
}) ?? {
effects: [],
messages: [],
events: [],
blockingErrors: [],
nonBlockingErrors: [],
};
}
private *drainEventBuffer(): Generator<AgentEvent> {
const events = this.dependencies.drainEvents?.() ?? [];
for (const event of events) {
yield event;
}
}
private async *executeToolsWithEventPump(
toolCalls: CanonicalToolCall[],
context: PilotDeckToolRuntimeContext,
input: AgentLoopInput,
): AsyncGenerator<AgentEvent, PilotDeckToolResult[], unknown> {
const activeSubagents = new Map<string, ActiveSubagentStatus>();
let results: PilotDeckToolResult[] | undefined;
let error: unknown;
let settled = false;
const execution = this.dependencies.tools.scheduler.executeAll(toolCalls, context)
.then((value) => {
results = value;
}, (err) => {
error = err;
})
.finally(() => {
settled = true;
});
while (!settled) {
await Promise.race([execution, sleep(TOOL_EVENT_PUMP_INTERVAL_MS)]);
yield* this.drainToolEventBufferForSubagentStatus(input, activeSubagents);
if (!settled) {
yield* this.emitSubagentHeartbeats(input, activeSubagents);
}
}
yield* this.drainToolEventBufferForSubagentStatus(input, activeSubagents);
if (error) throw error;
return results ?? [];
}
private *drainToolEventBufferForSubagentStatus(
input: AgentLoopInput,
activeSubagents: Map<string, ActiveSubagentStatus>,
): Generator<AgentEvent> {
const events = this.dependencies.drainEvents?.() ?? [];
for (const event of events) {
const statusEvent = this.updateSubagentStatusFromEvent(input, activeSubagents, event);
yield event;
if (statusEvent) {
yield statusEvent;
}
}
}
private updateSubagentStatusFromEvent(
input: AgentLoopInput,
activeSubagents: Map<string, ActiveSubagentStatus>,
event: AgentEvent,
): AgentEvent | undefined {
if (event.type === "subagent_started") {
const nowMs = this.now().getTime();
activeSubagents.set(event.subagentId, {
subagentId: event.subagentId,
subagentType: event.subagentType,
startedAtMs: nowMs,
lastHeartbeatMs: nowMs,
});
return undefined;
}
if (event.type === "subagent_completed") {
activeSubagents.delete(event.subagentId);
return undefined;
}
if (event.type !== "pre_tool_execute" && event.type !== "post_tool_execute") {
return undefined;
}
const subagentId = subagentIdFromSessionId(event.sessionId);
if (!subagentId) {
return undefined;
}
const nowMs = this.now().getTime();
const state = activeSubagents.get(subagentId) ?? {
subagentId,
startedAtMs: nowMs,
lastHeartbeatMs: nowMs,
};
if (event.type === "pre_tool_execute") {
state.currentToolCallId = event.toolCallId;
state.currentToolName = event.toolName;
} else {
state.currentToolCallId = undefined;
state.currentToolName = undefined;
}
state.lastHeartbeatMs = nowMs;
activeSubagents.set(subagentId, state);
return {
type: "subagent_status",
sessionId: input.sessionId,
turnId: input.turnId,
subagentId,
subagentType: state.subagentType,
status: event.type === "pre_tool_execute" ? "tool_started" : "tool_completed",
toolCallId: event.toolCallId,
toolName: event.toolName,
...(event.type === "post_tool_execute" ? { success: event.success } : {}),
durationMs: Math.max(0, nowMs - state.startedAtMs),
};
}
private *emitSubagentHeartbeats(
input: AgentLoopInput,
activeSubagents: Map<string, ActiveSubagentStatus>,
): Generator<AgentEvent> {
const nowMs = this.now().getTime();
for (const state of activeSubagents.values()) {
if (nowMs - state.lastHeartbeatMs < SUBAGENT_STATUS_HEARTBEAT_MS) {
continue;
}
state.lastHeartbeatMs = nowMs;
yield {
type: "subagent_status",
sessionId: input.sessionId,
turnId: input.turnId,
subagentId: state.subagentId,
subagentType: state.subagentType,
status: state.currentToolName ? "running" : "waiting_model",
toolCallId: state.currentToolCallId,
toolName: state.currentToolName,
durationMs: Math.max(0, nowMs - state.startedAtMs),
};
}
}
private createTurnResult(
input: AgentLoopInput,
options: Omit<AgentTurnResult, "sessionId" | "turnId" | "completedAt">,
): AgentTurnResult {
return {
...options,
sessionId: input.sessionId,
turnId: input.turnId,
completedAt: this.now().toISOString(),
};
}
private applyPermissionOverrides(
permissionMode?: PermissionMode,
permissionRules?: Partial<PermissionRuleSet>,
): void {
if (permissionMode) {
this.config.permissionMode = permissionMode;
this.config.permissionContext.mode = permissionMode;
}
if (!permissionRules) return;
mergeUserRules(this.config.permissionContext.rules.allow, permissionRules.allow);
mergeUserRules(this.config.permissionContext.rules.deny, permissionRules.deny);
mergeUserRules(this.config.permissionContext.rules.ask, permissionRules.ask);
}
private readonly now = (): Date => this.dependencies.now?.() ?? new Date();
}
function applyPlanModeToolOverrides(tools: CanonicalToolSchema[]): CanonicalToolSchema[] {
const override = buildPlanModeAgentToolSchema();
return tools.map((tool) => {
if (tool.name !== "agent") return tool;
return { ...tool, description: override.description, inputSchema: override.inputSchema };
});
}
function mergeUserRules(target: PermissionRule[], userRules: PermissionRule[] | undefined): void {
const nonUserRules = target.filter((rule) => rule.source !== "user");
target.splice(0, target.length, ...nonUserRules, ...(userRules ?? []));
}
function findLifecycleBlock(result: LifecycleDispatchResult): { reason: string; stopReason?: string } | undefined {
return result.effects.find(
(effect): effect is { type: "block"; reason: string; stopReason?: string } => effect.type === "block",
);
}
function findToolLifecycleBlock(results: PilotDeckToolResult[]): { reason: string; stopReason?: string } | undefined {
for (const result of results) {
const lifecycle = result.metadata?.lifecycle;
if (isRecord(lifecycle) && isRecord(lifecycle.blocked) && typeof lifecycle.blocked.reason === "string") {
return {
reason: lifecycle.blocked.reason,
stopReason: typeof lifecycle.blocked.stopReason === "string" ? lifecycle.blocked.stopReason : undefined,
};
}
}
return undefined;
}
function textFromMessage(message: CanonicalMessage): string {
return message.content
.filter((block) => block.type === "text")
.map((block) => block.text)
.join("\n");
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function cloneReadFileStateMap(
state: PilotDeckReadFileStateMap | undefined,
): PilotDeckReadFileStateMap {
const out: PilotDeckReadFileStateMap = new Map();
if (!state) return out;
for (const [key, value] of state.entries()) {
out.set(key, { ...value });
}
return out;
}
function cloneWriteSnapshotMap(
state: PilotDeckWriteSnapshotMap | undefined,
): PilotDeckWriteSnapshotMap {
const out: PilotDeckWriteSnapshotMap = new Map();
if (!state) return out;
for (const [key, value] of state.entries()) {
out.set(key, { ...value });
}
return out;
}
function subagentIdFromSessionId(sessionId: string): string | undefined {
const marker = "::sub::";
const index = sessionId.lastIndexOf(marker);
if (index < 0) return undefined;
const subagentId = sessionId.slice(index + marker.length).trim();
return subagentId.length > 0 ? subagentId : undefined;
}
const OUTPUT_TOKEN_RETRY_DEFAULT = 4_096;
const OUTPUT_TOKEN_RETRY_CEILING = 64_000;
function truncateHeadKeepRatio(messages: CanonicalMessage[], keepRatio: number): CanonicalMessage[] {
const ratio = Math.max(0.05, Math.min(1, keepRatio));
const keep = Math.max(1, Math.floor(messages.length * ratio));
return messages.slice(-keep);
}
* Drop the trailing `[assistant_message_with_partial_tool_call,
* synthetic_tool_result]` pair the loop just appended on a model error so a
* retry doesn't replay an unfinished tool call. Safe no-op if the trailing
* shape doesn't match.
*/
function stripTrailingErrorPair(messages: CanonicalMessage[]): CanonicalMessage[] {
const out = [...messages];
const last = out[out.length - 1];
if (
last &&
last.role === "user" &&
last.content.every((block) => block.type === "tool_result")
) {
out.pop();
}
const newLast = out[out.length - 1];
if (newLast && newLast.role === "assistant") {
out.pop();
}
return out;
}
* Strip all image blocks from messages, replacing them with a text placeholder.
* Used as a recovery strategy when a multimodal processor fails on corrupted images.
*/
function stripImagesFromMessages(messages: CanonicalMessage[]): CanonicalMessage[] {
return messages.map((msg) => {
const newContent = msg.content.map((block) => {
if (block.type === "image") {
return { type: "text" as const, text: "[Image removed: multimodal processor error recovery]" };
}
if (block.type === "tool_result" && block.content.some((c) => c.type === "image")) {
return {
...block,
content: block.content.map((c) =>
c.type === "image"
? { type: "text" as const, text: "[Image removed: multimodal processor error recovery]" }
: c,
),
};
}
return block;
});
return { ...msg, content: newContent };
});
}
function collectPermissionDenials(results: PilotDeckToolResult[]): AgentPermissionDenial[] {
return results.flatMap((result) => {
if (
result.type === "error" &&
(result.error.code === "permission_denied" ||
result.error.code === "permission_required" ||
result.error.code === "permission_cancelled")
) {
return [
{
toolCallId: result.toolCallId,
toolName: result.toolName,
errorCode: result.error.code,
},
];
}
return [];
});
}
function mergeUsage(first: CanonicalUsage, second: CanonicalUsage | undefined): CanonicalUsage {
if (!second) {
return first;
}
return {
inputTokens: add(first.inputTokens, second.inputTokens),
outputTokens: add(first.outputTokens, second.outputTokens),
cacheReadTokens: add(first.cacheReadTokens, second.cacheReadTokens),
cacheWriteTokens: add(first.cacheWriteTokens, second.cacheWriteTokens),
totalTokens: add(first.totalTokens, second.totalTokens),
};
}
function add(first: number | undefined, second: number | undefined): number | undefined {
if (first === undefined && second === undefined) {
return undefined;
}
return (first ?? 0) + (second ?? 0);
}
function readRequestedMode(value: unknown): AgentRuntimeConfig["permissionMode"] | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
const requestedMode = (value as Record<string, unknown>).requestedMode;
return isPermissionMode(requestedMode) ? requestedMode : undefined;
}
function isPermissionMode(value: unknown): value is AgentRuntimeConfig["permissionMode"] {
return (
value === "default" ||
value === "plan" ||
value === "acceptEdits" ||
value === "bypassPermissions" ||
value === "dontAsk"
);
}
function classifyModelError(error: CanonicalModelError): {
stopReason: AgentTurnResult["stopReason"];
error: ReturnType<typeof agentError>;
} {
if (isPromptTooLong(error)) {
return {
stopReason: "prompt_too_long",
error: agentError("agent_prompt_too_long", error.message, error),
};
}
return {
stopReason: "model_error",
error: agentError("agent_model_error", error.message, error),
};
}
function isPromptTooLong(error: CanonicalModelError): boolean {
if (error.code === "prompt_too_long" || error.recoverableViaCompact) {
return true;
}
if (PROMPT_TOO_LONG_ANTHROPIC_PATTERN.test(error.message)) {
return true;
}
if (PROMPT_TOO_LONG_OPENAI_PATTERN.test(error.message)) {
return true;
}
if (REQUEST_TOO_LARGE_PATTERN.test(error.message)) {
return true;
}
return false;
}
function composeAbortSignal(args: {
parent?: AbortSignal;
timeoutMs?: number;
}): { signal: AbortSignal | undefined; cleanup: () => void; timedOut: () => boolean } {
const { parent, timeoutMs } = args;
if (!parent && (!timeoutMs || timeoutMs <= 0)) {
return { signal: undefined, cleanup: () => {}, timedOut: () => false };
}
const controller = new AbortController();
const cleanupFns: Array<() => void> = [];
let timedOut = false;
if (parent) {
if (parent.aborted) {
controller.abort(parent.reason);
} else {
const onAbort = () => controller.abort(parent.reason);
parent.addEventListener("abort", onAbort, { once: true });
cleanupFns.push(() => parent.removeEventListener("abort", onAbort));
}
}
let timeout: ReturnType<typeof setTimeout> | undefined;
if (timeoutMs && timeoutMs > 0 && !controller.signal.aborted) {
timeout = setTimeout(() => {
timedOut = true;
controller.abort(new Error(`Subagent timed out after ${timeoutMs}ms.`));
}, timeoutMs);
cleanupFns.push(() => clearTimeout(timeout));
}
return {
signal: controller.signal,
cleanup: () => {
for (const fn of cleanupFns) fn();
},
timedOut: () => timedOut,
};
}