import { Database } from "bun:sqlite";
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import { getAgentDir, getSessionsDir, getStatsDbPath, setAgentDir, TempDir } from "@oh-my-pi/pi-utils";
import { syncAllSessions } from "../src/aggregator";
import { closeDb, getOverallStats, getRecentRequests } from "../src/db";
import { parseSessionFile } from "../src/parser";
const originalConfigDir = process.env.PI_CONFIG_DIR;
const originalAgentDir = getAgentDir();
let tempDir: TempDir | null = null;
beforeEach(() => {
tempDir = TempDir.createSync("@pi-stats-priority-");
const configDir = path.relative(os.homedir(), tempDir.join("config"));
process.env.PI_CONFIG_DIR = configDir;
setAgentDir(path.join(os.homedir(), configDir, "agent"));
});
afterEach(() => {
closeDb();
if (originalConfigDir === undefined) {
delete process.env.PI_CONFIG_DIR;
} else {
process.env.PI_CONFIG_DIR = originalConfigDir;
}
setAgentDir(originalAgentDir);
tempDir?.removeSync();
tempDir = null;
});
interface SessionLines {
lines: Array<Record<string, unknown>>;
}
async function writeSession(folder: string, name: string, { lines }: SessionLines): Promise<string> {
const dir = path.join(getSessionsDir(), folder);
await fs.mkdir(dir, { recursive: true });
const filePath = path.join(dir, name);
const text = `${lines.map(line => JSON.stringify(line)).join("\n")}\n`;
await fs.writeFile(filePath, text);
return filePath;
}
function assistantEntry(opts: {
id: string;
parentId?: string | null;
provider: string;
premiumRequests?: number;
}): Record<string, unknown> {
return {
type: "message",
id: opts.id,
parentId: opts.parentId ?? null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "ok" }],
api: "openai-responses",
provider: opts.provider,
model: "gpt-5.4",
stopReason: "stop",
timestamp: Date.now(),
usage: {
input: 10,
output: 5,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 15,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
...(opts.premiumRequests !== undefined ? { premiumRequests: opts.premiumRequests } : {}),
},
},
};
}
describe("priority service-tier premium-request backfill", () => {
it("derives premium_requests from service_tier_change entries for OpenAI traffic", async () => {
await writeSession("--tmp--proj", "01.jsonl", {
lines: [
{ type: "session", version: 1, id: "s1", timestamp: new Date().toISOString(), cwd: "/tmp/proj" },
{ type: "service_tier_change", id: "stc1", timestamp: new Date().toISOString(), serviceTier: "priority" },
assistantEntry({ id: "a1", provider: "openai" }),
assistantEntry({ id: "a2", provider: "openai-codex" }),
assistantEntry({ id: "a3", provider: "anthropic" }),
{ type: "service_tier_change", id: "stc2", timestamp: new Date().toISOString(), serviceTier: null },
assistantEntry({ id: "a4", provider: "openai" }),
],
});
await syncAllSessions();
const overall = await getOverallStats();
expect(overall.totalRequests).toBe(4);
expect(overall.totalPremiumRequests).toBe(2);
});
it("preserves an existing non-zero premiumRequests value (Copilot multiplier) even under priority tier", async () => {
await writeSession("--tmp--proj", "02.jsonl", {
lines: [
{ type: "session", version: 1, id: "s2", timestamp: new Date().toISOString(), cwd: "/tmp/proj" },
{ type: "service_tier_change", id: "stc", timestamp: new Date().toISOString(), serviceTier: "priority" },
assistantEntry({ id: "b1", provider: "github-copilot", premiumRequests: 0.33 }),
],
});
await syncAllSessions();
const request = getRecentRequests(1)[0];
expect(request?.usage.premiumRequests).toBeCloseTo(0.33, 6);
});
it("re-derives premium_requests on re-sync via UPSERT for sessions ingested before the fix", async () => {
const sessionFile = await writeSession("--tmp--proj", "03.jsonl", {
lines: [
{ type: "session", version: 1, id: "s3", timestamp: new Date().toISOString(), cwd: "/tmp/proj" },
{ type: "service_tier_change", id: "stc", timestamp: new Date().toISOString(), serviceTier: "priority" },
assistantEntry({ id: "c1", provider: "openai" }),
],
});
await syncAllSessions();
closeDb();
const sessionStats = await fs.stat(sessionFile);
const raw = new Database(getStatsDbPath());
raw.exec("DELETE FROM messages");
raw.exec("DELETE FROM file_offsets");
raw.exec("DELETE FROM meta WHERE key = 'premium_requests_priority_v1'");
raw.prepare(
`INSERT INTO messages (
session_file, entry_id, folder, model, provider, api, timestamp,
duration, ttft, stop_reason, error_message,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, total_tokens, premium_requests,
cost_input, cost_output, cost_cache_read, cost_cache_write, cost_total
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
).run(
sessionFile,
"c1",
"/tmp/proj",
"gpt-5.4",
"openai",
"openai-responses",
Date.now(),
1000,
100,
"stop",
null,
10,
5,
0,
0,
15,
0,
0,
0,
0,
0,
0,
);
raw.prepare("INSERT INTO file_offsets (session_file, offset, last_modified) VALUES (?, ?, ?)").run(
sessionFile,
sessionStats.size,
sessionStats.mtimeMs,
);
raw.close();
await syncAllSessions();
const request = getRecentRequests(1)[0];
expect(request?.entryId).toBe("c1");
expect(request?.usage.premiumRequests).toBe(1);
});
it("carries the active service tier across incremental parseSessionFile calls", async () => {
const sessionFile = await writeSession("--tmp--proj", "04.jsonl", {
lines: [
{ type: "session", version: 1, id: "s4", timestamp: new Date().toISOString(), cwd: "/tmp/proj" },
{ type: "service_tier_change", id: "stc", timestamp: new Date().toISOString(), serviceTier: "priority" },
assistantEntry({ id: "d1", provider: "openai" }),
],
});
const bytes = await fs.readFile(sessionFile);
const tierLineEnd = bytes.indexOf(0x0a, bytes.indexOf(Buffer.from("service_tier_change"))) + 1;
expect(tierLineEnd).toBeGreaterThan(0);
const second = await parseSessionFile(sessionFile, tierLineEnd);
expect(second.stats).toHaveLength(1);
expect(second.stats[0]?.entryId).toBe("d1");
expect(second.stats[0]?.usage.premiumRequests).toBe(1);
});
});