* Time Traveling Stream Rules (TTSR) Manager
*
* Manages rules that get injected mid-stream when their condition pattern matches
* the agent's output. When a match occurs, the stream is aborted, the rule is
* injected as a system reminder, and the request is retried.
*/
import { logger } from "@oh-my-pi/pi-utils";
import type { Rule } from "../capability/rule";
import type { TtsrSettings } from "../config/settings";
export type TtsrMatchSource = "text" | "thinking" | "tool";
export interface TtsrMatchContext {
source: TtsrMatchSource;
toolName?: string;
filePaths?: string[];
streamKey?: string;
}
interface ToolScope {
toolName?: string;
pathGlob?: Bun.Glob;
pathPattern?: string;
}
interface TtsrScope {
allowText: boolean;
allowThinking: boolean;
allowAnyTool: boolean;
toolScopes: ToolScope[];
}
interface TtsrEntry {
rule: Rule;
conditions: RegExp[];
scope: TtsrScope;
globalPathGlobs?: Bun.Glob[];
}
interface InjectionRecord {
lastInjectedAt: number;
}
const DEFAULT_SETTINGS: Required<TtsrSettings> = {
enabled: true,
contextMode: "discard",
interruptMode: "always",
repeatMode: "once",
repeatGap: 10,
builtinRules: true,
disabledRules: [],
};
const DEFAULT_SCOPE: TtsrScope = {
allowText: true,
allowThinking: false,
allowAnyTool: true,
toolScopes: [],
};
export class TtsrManager {
readonly #settings: Required<TtsrSettings>;
readonly #rules = new Map<string, TtsrEntry>();
readonly #injectionRecords = new Map<string, InjectionRecord>();
readonly #buffers = new Map<string, string>();
#messageCount = 0;
constructor(settings?: TtsrSettings) {
this.#settings = { ...DEFAULT_SETTINGS, ...settings };
}
#canTrigger(ruleName: string): boolean {
const record = this.#injectionRecords.get(ruleName);
if (!record) {
return true;
}
if (this.#settings.repeatMode === "once") {
return false;
}
const gap = this.#messageCount - record.lastInjectedAt;
return gap >= this.#settings.repeatGap;
}
#compileConditions(rule: Rule): RegExp[] {
const compiled: RegExp[] = [];
for (const pattern of rule.condition ?? []) {
try {
compiled.push(new RegExp(pattern));
} catch (error) {
logger.warn("TTSR condition has invalid regex pattern, skipping condition", {
ruleName: rule.name,
pattern,
error: error instanceof Error ? error.message : String(error),
});
}
}
return compiled;
}
#compileGlobalPathGlobs(globs: Rule["globs"]): Bun.Glob[] | undefined {
if (!globs || globs.length === 0) {
return undefined;
}
const compiled = globs
.map(glob => glob.trim())
.filter(glob => glob.length > 0)
.map(glob => new Bun.Glob(glob));
return compiled.length > 0 ? compiled : undefined;
}
#parseToolScopeToken(token: string): ToolScope | undefined {
const match = /^(?:(?<prefix>tool)(?::(?<tool>[a-z0-9_-]+))?|(?<bare>[a-z0-9_-]+))(?:\((?<path>[^)]+)\))?$/i.exec(
token,
);
if (!match) {
return undefined;
}
const groups = match.groups;
const hasToolPrefix = groups?.prefix !== undefined;
const toolName = (groups?.tool ?? (hasToolPrefix ? undefined : groups?.bare))?.trim().toLowerCase();
const pathPattern = groups?.path?.trim();
if (!pathPattern) {
return { toolName };
}
return {
toolName,
pathPattern,
pathGlob: new Bun.Glob(pathPattern),
};
}
#buildScope(rule: Rule): TtsrScope {
if (!rule.scope || rule.scope.length === 0) {
return {
allowText: DEFAULT_SCOPE.allowText,
allowThinking: DEFAULT_SCOPE.allowThinking,
allowAnyTool: DEFAULT_SCOPE.allowAnyTool,
toolScopes: [...DEFAULT_SCOPE.toolScopes],
};
}
const scope: TtsrScope = {
allowText: false,
allowThinking: false,
allowAnyTool: false,
toolScopes: [],
};
for (const rawToken of rule.scope) {
const token = rawToken.trim();
const normalizedToken = token.toLowerCase();
if (token.length === 0) {
continue;
}
if (normalizedToken === "text") {
scope.allowText = true;
continue;
}
if (normalizedToken === "thinking") {
scope.allowThinking = true;
continue;
}
if (normalizedToken === "tool" || normalizedToken === "toolcall") {
scope.allowAnyTool = true;
continue;
}
const toolScope = this.#parseToolScopeToken(token);
if (!toolScope) {
logger.warn("TTSR scope token is invalid, skipping token", {
ruleName: rule.name,
token: rawToken,
});
continue;
}
if (!toolScope.toolName && !toolScope.pathGlob) {
scope.allowAnyTool = true;
continue;
}
scope.toolScopes.push(toolScope);
}
return scope;
}
#hasReachableScope(scope: TtsrScope): boolean {
return scope.allowText || scope.allowThinking || scope.allowAnyTool || scope.toolScopes.length > 0;
}
#bufferKey(context: TtsrMatchContext): string {
if (context.streamKey && context.streamKey.trim().length > 0) {
return context.streamKey;
}
if (context.source !== "tool") {
return context.source;
}
const toolName = context.toolName?.trim().toLowerCase();
return toolName ? `tool:${toolName}` : "tool";
}
#normalizePath(pathValue: string): string {
return pathValue.replaceAll("\\", "/");
}
#matchesGlob(glob: Bun.Glob, filePaths: string[] | undefined): boolean {
if (!filePaths || filePaths.length === 0) {
return false;
}
for (const filePath of filePaths) {
const normalized = this.#normalizePath(filePath);
if (glob.match(normalized)) {
return true;
}
const slashIndex = normalized.lastIndexOf("/");
const basename = slashIndex === -1 ? normalized : normalized.slice(slashIndex + 1);
if (basename !== normalized && glob.match(basename)) {
return true;
}
}
return false;
}
#matchesGlobalPaths(entry: TtsrEntry, context: TtsrMatchContext): boolean {
if (!entry.globalPathGlobs || entry.globalPathGlobs.length === 0) {
return true;
}
for (const glob of entry.globalPathGlobs) {
if (this.#matchesGlob(glob, context.filePaths)) {
return true;
}
}
return false;
}
#matchesScope(entry: TtsrEntry, context: TtsrMatchContext): boolean {
if (context.source === "text") {
return entry.scope.allowText;
}
if (context.source === "thinking") {
return entry.scope.allowThinking;
}
if (entry.scope.allowAnyTool) {
return true;
}
const toolName = context.toolName?.trim().toLowerCase();
for (const toolScope of entry.scope.toolScopes) {
if (toolScope.toolName && toolScope.toolName !== toolName) {
continue;
}
if (toolScope.pathGlob && !this.#matchesGlob(toolScope.pathGlob, context.filePaths)) {
continue;
}
return true;
}
return false;
}
#matchesCondition(entry: TtsrEntry, streamBuffer: string): boolean {
for (const condition of entry.conditions) {
condition.lastIndex = 0;
if (condition.test(streamBuffer)) {
return true;
}
}
return false;
}
addRule(rule: Rule): boolean {
if (this.#rules.has(rule.name)) {
return false;
}
const conditions = this.#compileConditions(rule);
if (conditions.length === 0) {
return false;
}
const scope = this.#buildScope(rule);
if (!this.#hasReachableScope(scope)) {
logger.warn("TTSR scope excludes all streams, skipping rule", {
ruleName: rule.name,
scope: rule.scope,
});
return false;
}
const globalPathGlobs = this.#compileGlobalPathGlobs(rule.globs);
this.#rules.set(rule.name, {
rule,
conditions,
scope,
globalPathGlobs,
});
logger.debug("TTSR rule registered", {
ruleName: rule.name,
conditions: rule.condition,
scope: rule.scope,
globs: rule.globs,
});
return true;
}
* Add a stream chunk to its scoped buffer and return matching rules.
*
* Buffers are isolated by source/tool key so matches don't bleed across
* assistant prose, thinking text, and unrelated tool argument streams.
*/
checkDelta(delta: string, context: TtsrMatchContext): Rule[] {
const bufferKey = this.#bufferKey(context);
const nextBuffer = `${this.#buffers.get(bufferKey) ?? ""}${delta}`;
this.#buffers.set(bufferKey, nextBuffer);
const matches: Rule[] = [];
for (const [name, entry] of this.#rules) {
if (!this.#canTrigger(name)) {
continue;
}
if (!this.#matchesScope(entry, context)) {
continue;
}
if (!this.#matchesGlobalPaths(entry, context)) {
continue;
}
if (!this.#matchesCondition(entry, nextBuffer)) {
continue;
}
matches.push(entry.rule);
logger.debug("TTSR condition matched", {
ruleName: name,
conditions: entry.rule.condition,
source: context.source,
toolName: context.toolName,
filePaths: context.filePaths,
});
}
return matches;
}
markInjected(rulesToMark: Rule[]): void {
this.markInjectedByNames(rulesToMark.map(rule => rule.name));
}
markInjectedByNames(ruleNames: string[]): void {
for (const rawName of ruleNames) {
const ruleName = rawName.trim();
if (ruleName.length === 0) {
continue;
}
const record = this.#injectionRecords.get(ruleName);
if (!record) {
this.#injectionRecords.set(ruleName, { lastInjectedAt: this.#messageCount });
} else {
record.lastInjectedAt = this.#messageCount;
}
logger.debug("TTSR rule marked as injected", {
ruleName,
messageCount: this.#messageCount,
repeatMode: this.#settings.repeatMode,
});
}
}
getInjectedRuleNames(): string[] {
return Array.from(this.#injectionRecords.keys());
}
restoreInjected(ruleNames: string[]): void {
for (const name of ruleNames) {
this.#injectionRecords.set(name, { lastInjectedAt: 0 });
}
if (ruleNames.length > 0) {
logger.debug("TTSR injected state restored", { ruleNames });
}
}
resetBuffer(): void {
this.#buffers.clear();
}
hasRules(): boolean {
return this.#rules.size > 0;
}
incrementMessageCount(): void {
this.#messageCount++;
}
getMessageCount(): number {
return this.#messageCount;
}
getSettings(): Required<TtsrSettings> {
return this.#settings;
}
}