import { afterEach, beforeEach, describe, expect, test, vi } from "bun:test";
import * as fs from "node:fs/promises";
import * as os from "node:os";
import * as path from "node:path";
import {
AuthBrokerClient,
type AuthBrokerServerHandle,
AuthStorage,
REMOTE_REFRESH_SENTINEL,
RemoteAuthCredentialStore,
SqliteAuthCredentialStore,
startAuthBroker,
} from "../src";
const ANTHROPIC_ENV = ["ANTHROPIC_API_KEY", "ANTHROPIC_OAUTH_TOKEN"] as const;
const savedEnv: Partial<Record<(typeof ANTHROPIC_ENV)[number], string | undefined>> = {};
function mintOAuthCredential(suffix: string, expires: number) {
return {
type: "oauth" as const,
access: `access-${suffix}`,
refresh: `refresh-${suffix}`,
expires,
accountId: `account-${suffix}`,
email: `${suffix}@example.com`,
};
}
async function waitUntil(predicate: () => boolean, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (predicate()) return;
await Bun.sleep(10);
}
if (!predicate()) throw new Error("waitUntil timeout");
}
describe("RemoteAuthCredentialStore SSE integration", () => {
let tempDir = "";
let store: SqliteAuthCredentialStore | undefined;
let storage: AuthStorage | undefined;
let handle: AuthBrokerServerHandle | undefined;
let remote: RemoteAuthCredentialStore | undefined;
const token = "remote-store-bearer";
beforeEach(async () => {
for (const key of ANTHROPIC_ENV) {
savedEnv[key] = process.env[key];
delete process.env[key];
}
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "auth-broker-remote-store-"));
store = await SqliteAuthCredentialStore.open(path.join(tempDir, "agent.db"));
store.saveOAuth("anthropic", mintOAuthCredential("a", Date.now() + 60_000));
storage = new AuthStorage(store);
await storage.reload();
handle = startAuthBroker({
storage,
bind: "127.0.0.1:0",
bearerTokens: [token],
disableRefresher: true,
});
});
afterEach(async () => {
vi.restoreAllMocks();
remote?.close();
await handle?.close();
storage?.close();
store?.close();
await fs.rm(tempDir, { recursive: true, force: true });
for (const key of ANTHROPIC_ENV) {
if (savedEnv[key] === undefined) delete process.env[key];
else process.env[key] = savedEnv[key];
}
});
test("consumes initial snapshot, upsert, and removal over SSE without manual refresh", async () => {
const client = new AuthBrokerClient({ url: handle!.url, token });
remote = new RemoteAuthCredentialStore({ client });
await waitUntil(() => remote!.snapshot.credentials.length === 1);
const initialEntry = remote!.snapshot.credentials[0];
expect(initialEntry.provider).toBe("anthropic");
expect(initialEntry.credential.type).toBe("oauth");
if (initialEntry.credential.type === "oauth") {
expect(initialEntry.credential.access).toBe("access-a");
expect(initialEntry.credential.refresh).toBe(REMOTE_REFRESH_SENTINEL);
}
const initialGeneration = remote!.snapshot.generation;
storage!.upsertCredential("anthropic", mintOAuthCredential("b", Date.now() + 120_000));
await waitUntil(() => remote!.snapshot.credentials.length === 2);
expect(remote!.snapshot.generation).toBeGreaterThan(initialGeneration);
const accessTokens = remote!.snapshot.credentials
.filter(entry => entry.credential.type === "oauth")
.map(entry => (entry.credential.type === "oauth" ? entry.credential.access : ""))
.sort();
expect(accessTokens).toEqual(["access-a", "access-b"]);
const bId = remote!.snapshot.credentials.find(
entry => entry.credential.type === "oauth" && entry.credential.access === "access-b",
)?.id;
expect(bId).toBeDefined();
const disabled = storage!.disableCredentialById(bId!, "revoked by test");
expect(disabled).toBe(true);
await waitUntil(() => remote!.snapshot.credentials.length === 1);
expect(remote!.snapshot.credentials[0].id).not.toBe(bId);
});
});