import type { Plugin } from "@opencode-ai/plugin";
import path from "node:path";
type HookOutput = {
hookSpecificOutput?: {
additionalContext?: string;
decision?: "allow" | "block";
reason?: string;
};
};
function appendMessage(output: { output?: string }, message: string): void {
output.output = `${output.output ?? ""}\n\n${message}`.trim();
}
function formatPluginError(scope: string, error: unknown): string {
const detail = error instanceof Error ? error.message : String(error);
return `[pypto-op-lint plugin-error] ${scope} 自动检查失败:${detail}`;
}
function parseHookOutput(raw: string): { additionalContext: string; decision: "allow" | "block"; reason: string } {
if (!raw.trim()) return { additionalContext: "", decision: "allow", reason: "" };
try {
const parsed = JSON.parse(raw) as HookOutput;
return {
additionalContext: parsed.hookSpecificOutput?.additionalContext ?? "",
decision: parsed.hookSpecificOutput?.decision ?? "allow",
reason: parsed.hookSpecificOutput?.reason ?? "",
};
} catch (error) {
return {
additionalContext: formatPluginError("post-edit 输出解析", error),
decision: "allow",
reason: "",
};
}
}
function resolvePath(baseDir: string, filePath: string): string {
if (!filePath) return "";
return path.isAbsolute(filePath) ? filePath : path.resolve(baseDir, filePath);
}
function isStateWriteCommand(command: string): boolean {
if (!command.includes(".orchestrator_state.json")) return false;
const segments = command.split(/&&|\|\||;/).map(s => s.trim());
const stateSegs = segments.filter(s => s.includes(".orchestrator_state.json"));
const readOnly = /^(cat|ls|stat|test|head|tail|grep|wc|file|find|read)\b/;
if (stateSegs.length > 0 && stateSegs.every(s => readOnly.test(s.replace(/^sudo\s+/, "")))) {
return false;
}
return true;
}
const OP_ARTIFACT_RE = /\b\w+_(?:impl|golden)\.py\b|\btest_\w+\.py\b/;
function isBashWriteToOpArtifact(command: string): boolean {
if (!OP_ARTIFACT_RE.test(command)) return false;
if (/>>?\s*\S*(?:\w+_(?:impl|golden)\.py|test_\w+\.py)/.test(command)) {
return true;
}
if (/\bpython3?\b\s+(?:-\w+\s+)*-c\b/.test(command)) {
const writeIndicators = [
/\.write\w*\(/,
/\.truncate\(/,
/open\([^)]*['"][wax]b?\+?['"]/,
/Path\([^)]*\)\.unlink\(/,
/shutil\.(copy\w*|move|rmtree)\(/,
/os\.(remove|unlink|rename)\(/,
];
if (writeIndicators.some((p) => p.test(command))) return true;
}
const segments = command.split(/&&|\|\||;|\|/).map((s) => s.trim());
for (const raw of segments) {
if (!OP_ARTIFACT_RE.test(raw)) continue;
const seg = raw.replace(/^sudo\s+/, "");
if (/^python3?\b.*\s-c\b/.test(seg)) continue;
if (/^(tee|cp|mv|rm|install|ln)\b/.test(seg)) return true;
if (/^sed\b.*-i\b/.test(seg)) return true;
}
return false;
}
export const PyptoOpLintPlugin: Plugin = async (input) => {
const $ = input.$;
const client = input.client;
const baseDir = input.worktree || input.directory || process.cwd();
const lintScript = new URL(
"../../.agents/hooks/pypto-op-lint/pypto_op_lint.py",
import.meta.url,
).pathname;
async function execHookJson(hook: string, payload: unknown): Promise<string> {
return await $`python3 ${lintScript} --hook ${hook}`
.env({
PYPTO_OP_LINT_HOOK_INPUT: JSON.stringify(payload),
})
.quiet()
.text();
}
const WRITE_TOOLS = new Set(["write", "edit", "multiedit"]);
const BASH_TOOLS = new Set(["bash", "shell"]);
function normTool(t: unknown): string {
return typeof t === "string"
? t.toLowerCase()
: typeof (t as { name?: unknown })?.name === "string"
? String((t as { name?: unknown }).name).toLowerCase()
: "";
}
function extractFilePath(args: any): string {
return String(
args?.file_path ?? args?.filePath ?? args?.path ?? args?.target_file ?? ""
);
}
const trace = process.env.PYPTO_OP_LINT_TRACE === "1";
function tdebug(label: string, payload: Record<string, unknown>): void {
if (!trace) return;
try {
console.error(`[pypto-op-lint:trace] ${label}`, JSON.stringify(payload));
} catch {
}
}
const LINT_SKIP_AGENTS = new Set(["build", "plan", "general", "explore", "scout"]);
const agentBySession = new Map<string, string>();
function rememberAgent(input: { sessionID?: unknown; agent?: unknown }): void {
if (typeof input.sessionID !== "string" || typeof input.agent !== "string") return;
agentBySession.set(input.sessionID, input.agent);
}
function shouldRunLint(input: { sessionID?: unknown }): boolean {
const sessionID = typeof input.sessionID === "string" ? input.sessionID : "";
const agent = sessionID ? agentBySession.get(sessionID) : undefined;
const run = agent === undefined || !LINT_SKIP_AGENTS.has(agent);
tdebug("gate", { sessionID, agent, run });
return run;
}
return {
"chat.message": async (chatInput, _output) => {
rememberAgent(chatInput);
},
"chat.params": async (chatInput, _output) => {
rememberAgent(chatInput);
},
"tool.execute.after": async (rawArgs, output) => {
if (!shouldRunLint(rawArgs)) return;
const { tool, args } = rawArgs as { tool: unknown; args: any };
const toolName = normTool(tool);
const filePath = extractFilePath(args);
tdebug("after.fired", { rawTool: tool, toolName, filePath });
if (
WRITE_TOOLS.has(toolName) &&
(filePath.endsWith("_impl.py") ||
filePath.endsWith("_golden.py") ||
/test_\w+\.py$/.test(filePath))
) {
tdebug("after.match", { toolName, filePath });
try {
const raw = await execHookJson("post-edit", {
tool_input: { file_path: filePath },
});
const parsed = parseHookOutput(raw);
if (parsed.additionalContext) appendMessage(output, parsed.additionalContext);
if (parsed.decision === "block") {
throw new Error(parsed.reason || "[pypto-op-lint] 产物写入后门禁未通过");
}
} catch (error) {
if (error instanceof Error) {
throw error;
}
throw new Error(formatPluginError("post-edit", error));
}
return;
}
if (
BASH_TOOLS.has(toolName) &&
/python3?\s+.*test_\w+\.py/.test(args?.command ?? "")
) {
try {
const result = output as Record<string, unknown>;
const combinedOutput = String(result.output ?? result.stdout ?? result.text ?? "");
const stderrText = String(result.stderr ?? "");
const exitCode = typeof result.exit_code === "number" ? result.exit_code
: typeof result.code === "number" ? result.code
: typeof result.exitCode === "number" ? result.exitCode
: 0;
const raw = await execHookJson("post-bash", {
tool_input: { command: args?.command ?? "" },
tool_result: {
stdout: combinedOutput,
stderr: stderrText,
exit_code: exitCode,
},
});
const context = parseHookOutput(raw).additionalContext;
if (context) appendMessage(output, context);
} catch (error) {
appendMessage(output, formatPluginError("post-bash", error));
}
}
return;
},
"tool.execute.before": async (rawArgs, output) => {
if (!shouldRunLint(rawArgs)) return;
const { tool } = rawArgs as { tool: unknown };
const toolName = normTool(tool);
const command = String(output.args?.command ?? "");
tdebug("before.fired", { rawTool: tool, toolName, hasCmd: !!command });
if (BASH_TOOLS.has(toolName) && isBashWriteToOpArtifact(command)) {
throw new Error(
"[pypto-op-lint] 禁止通过 bash/shell 写入算子产物文件 (_impl.py / _golden.py / test_*.py)。\n"
+ "原因: bash 写入不会触发 PostToolUse lint hook,违规不会以 in-band block 反馈给当前 agent,可能拖到 Verifier 阶段才暴露。\n"
+ "处理方式: 请使用 Write / Edit / MultiEdit 工具直接写入;这些工具会经 tool.execute.after 触发 post-edit hook,并把 S0/S1 violation 作为 decision: block 返回。\n"
+ "如果是查看文件 (cat/grep/diff/stat), 请改用对应只读命令; 这条规则只拦截写入。"
);
}
if (BASH_TOOLS.has(toolName) && isStateWriteCommand(command)) {
throw new Error(
"[pypto-op-lint] 禁止通过 bash/shell 直接写入 .orchestrator_state.json。"
+ "请使用 state_transition 工具更新状态文件。",
);
}
if (!WRITE_TOOLS.has(toolName)) return;
const filePath = extractFilePath(output.args ?? {});
const absPath = resolvePath(baseDir, filePath);
if (!absPath) return;
if (absPath.endsWith(`${path.sep}.orchestrator_state.json`)) {
throw new Error(
"[pypto-op-lint] 禁止直接修改 .orchestrator_state.json。"
+ "请使用 state_transition 工具进行阶段状态迁移。",
);
}
if (!absPath.endsWith("_impl.py")) return;
try {
await execHookJson("pre-edit-backup", {
tool_input: { file_path: absPath },
});
} catch (error) {
await client.app.log({
body: {
service: "pypto-op-lint",
level: "warn",
message: formatPluginError("pre-edit-backup", error),
extra: { tool, filePath },
},
});
}
},
};
};