import { describe, expect, it } from "bun:test";
import { Effort } from "../src/model-thinking";
import { encodeStream, formatError, parseRequest } from "../src/providers/pi-native-server";
import type {
AssistantMessage,
AssistantMessageEvent,
AssistantMessageEventStream,
Context,
Usage,
} from "../src/types";
function makeEventStream(events: AssistantMessageEvent[], final: AssistantMessage): AssistantMessageEventStream {
async function* iter() {
for (const e of events) yield e;
}
const stream = iter() as unknown as AssistantMessageEventStream;
(stream as { result(): Promise<AssistantMessage> }).result = async () => final;
return stream;
}
async function collectSse(stream: ReadableStream<Uint8Array>): Promise<string[]> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let buf = "";
for (;;) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
}
buf += decoder.decode();
return buf.split("\n\n").filter(s => s.length > 0);
}
function parseSseLine(line: string): unknown {
const stripped = line.replace(/^data: /, "");
if (stripped === "[DONE]") return "[DONE]";
return JSON.parse(stripped);
}
const ZERO_USAGE: Usage = {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
};
function baseAssistant(overrides?: Partial<AssistantMessage>): AssistantMessage {
return {
role: "assistant",
content: [],
api: "anthropic-messages",
provider: "anthropic",
model: "claude-sonnet-4-5",
usage: ZERO_USAGE,
stopReason: "stop",
timestamp: 0,
...overrides,
};
}
const baseContext: Context = {
systemPrompt: ["you are helpful"],
messages: [{ role: "user", content: "hi", timestamp: 0 }],
};
describe("pi-native parseRequest", () => {
it("accepts modelId + context and returns canonical shape", () => {
const parsed = parseRequest({
modelId: "claude-sonnet-4-5",
context: baseContext,
options: { temperature: 0.5, reasoning: Effort.High },
stream: false,
});
expect(parsed.modelId).toBe("claude-sonnet-4-5");
expect(parsed.context).toEqual(baseContext);
expect(parsed.options.temperature).toBe(0.5);
expect(parsed.options.reasoning).toBe(Effort.High);
expect(parsed.stream).toBe(false);
});
it("falls back to model.id when modelId is absent (streamProxy compat)", () => {
const parsed = parseRequest({
model: { id: "claude-opus-4-1", provider: "anthropic", api: "anthropic-messages" },
context: baseContext,
});
expect(parsed.modelId).toBe("claude-opus-4-1");
});
it("accepts top-level string `model` as the id (extra compat)", () => {
const parsed = parseRequest({
model: "gpt-5",
context: baseContext,
});
expect(parsed.modelId).toBe("gpt-5");
});
it("defaults stream to true when omitted", () => {
const parsed = parseRequest({ modelId: "x", context: baseContext });
expect(parsed.stream).toBe(true);
});
it("drops server-controlled and unknown option keys", () => {
const parsed = parseRequest({
modelId: "x",
context: baseContext,
options: {
temperature: 0.2,
apiKey: "should-be-stripped",
signal: {},
fetch: () => {},
onPayload: () => {},
onResponse: () => {},
onSseEvent: () => {},
execHandlers: {},
providerSessionState: new Map(),
notARealField: "ignored",
},
});
expect(parsed.options).toEqual({ temperature: 0.2 });
expect("apiKey" in parsed.options).toBe(false);
expect("signal" in parsed.options).toBe(false);
expect("fetch" in parsed.options).toBe(false);
expect("onPayload" in parsed.options).toBe(false);
expect("notARealField" in parsed.options).toBe(false);
});
it("preserves headers, metadata, sessionId, thinkingBudgets", () => {
const parsed = parseRequest({
modelId: "x",
context: baseContext,
options: {
headers: { "x-foo": "bar" },
metadata: { user_id: "u" },
sessionId: "explicit-session",
thinkingBudgets: { high: 8192 },
stopSequences: ["\n\n"],
toolChoice: "required",
serviceTier: "priority",
cacheRetention: "long",
},
});
expect(parsed.options.headers).toEqual({ "x-foo": "bar" });
expect(parsed.options.metadata).toEqual({ user_id: "u" });
expect(parsed.options.sessionId).toBe("explicit-session");
expect(parsed.options.thinkingBudgets).toEqual({ high: 8192 });
expect(parsed.options.stopSequences).toEqual(["\n\n"]);
expect(parsed.options.toolChoice).toBe("required");
expect(parsed.options.serviceTier).toBe("priority");
expect(parsed.options.cacheRetention).toBe("long");
});
it("rejects missing required fields", () => {
expect(() => parseRequest({ context: baseContext })).toThrow(/modelId/);
expect(() => parseRequest({ modelId: "x" })).toThrow(/context/);
expect(() => parseRequest({ modelId: "x", context: { systemPrompt: [] } })).toThrow(/messages/);
});
it("rejects non-object body", () => {
expect(() => parseRequest(null)).toThrow();
expect(() => parseRequest("hello")).toThrow();
expect(() => parseRequest([])).toThrow();
});
it("validates systemPrompt and tools shape", () => {
expect(() => parseRequest({ modelId: "x", context: { systemPrompt: "not array", messages: [] } })).toThrow(
/systemPrompt/,
);
expect(() => parseRequest({ modelId: "x", context: { messages: [], tools: "not array" } })).toThrow(/tools/);
});
it("skips null and undefined option values", () => {
const parsed = parseRequest({
modelId: "x",
context: baseContext,
options: { temperature: null, topP: undefined, maxTokens: 100 },
});
expect("temperature" in parsed.options).toBe(false);
expect("topP" in parsed.options).toBe(false);
expect(parsed.options.maxTokens).toBe(100);
});
});
describe("pi-native encodeStream", () => {
it("ships every AssistantMessageEvent verbatim, terminated by [DONE]", async () => {
const finalMessage = baseAssistant({
content: [{ type: "text", text: "hi" }],
usage: { ...ZERO_USAGE, input: 4, output: 2, totalTokens: 6 },
});
const partialAfterDelta: AssistantMessage = baseAssistant({
content: [{ type: "text", text: "hi" }],
});
const events: AssistantMessageEvent[] = [
{ type: "start", partial: baseAssistant() },
{ type: "text_start", contentIndex: 0, partial: baseAssistant({ content: [{ type: "text", text: "" }] }) },
{ type: "text_delta", contentIndex: 0, delta: "hi", partial: partialAfterDelta },
{ type: "text_end", contentIndex: 0, content: "hi", partial: partialAfterDelta },
{ type: "done", reason: "stop", message: finalMessage },
];
const chunks = await collectSse(encodeStream(makeEventStream(events, finalMessage)));
const parsed = chunks.map(parseSseLine);
expect(parsed.length).toBe(events.length + 1);
for (let i = 0; i < events.length; i++) {
expect(parsed[i]).toEqual(JSON.parse(JSON.stringify(events[i])));
}
expect(parsed[parsed.length - 1]).toBe("[DONE]");
});
it("preserves the rolling `partial` on every delta (sanity: no shrink)", async () => {
const final = baseAssistant({ content: [{ type: "text", text: "abc" }] });
const events: AssistantMessageEvent[] = [
{ type: "text_delta", contentIndex: 0, delta: "abc", partial: final },
{ type: "done", reason: "stop", message: final },
];
const parsed = (await collectSse(encodeStream(makeEventStream(events, final)))).map(parseSseLine) as Array<
Record<string, unknown>
>;
expect(parsed[0]).toHaveProperty("partial");
expect((parsed[0] as { partial: AssistantMessage }).partial.content).toEqual([{ type: "text", text: "abc" }]);
});
it("stops streaming after a terminal `done` and emits [DONE] once", async () => {
const final = baseAssistant();
const events: AssistantMessageEvent[] = [
{ type: "done", reason: "stop", message: final },
{ type: "text_delta", contentIndex: 0, delta: "ghost", partial: final },
];
const parsed = (await collectSse(encodeStream(makeEventStream(events, final)))).map(parseSseLine);
expect(parsed.length).toBe(2);
expect((parsed[0] as { type: string }).type).toBe("done");
expect(parsed[1]).toBe("[DONE]");
});
it("forwards `error` events verbatim, then closes with [DONE]", async () => {
const errored = baseAssistant({
stopReason: "error",
errorMessage: "upstream blew up",
usage: { ...ZERO_USAGE, input: 3 },
});
const events: AssistantMessageEvent[] = [{ type: "error", reason: "error", error: errored }];
const parsed = (await collectSse(encodeStream(makeEventStream(events, errored)))).map(parseSseLine);
expect(parsed[0]).toEqual({ type: "error", reason: "error", error: JSON.parse(JSON.stringify(errored)) });
expect(parsed[1]).toBe("[DONE]");
});
it("emits a synthetic error envelope when the source iterator throws", async () => {
const broken = (async function* () {
yield { type: "start", partial: baseAssistant() } satisfies AssistantMessageEvent;
throw new Error("connection reset");
})() as unknown as AssistantMessageEventStream;
(broken as { result(): Promise<AssistantMessage> }).result = async () => baseAssistant();
const parsed = (await collectSse(encodeStream(broken))).map(parseSseLine);
expect((parsed[0] as { type: string }).type).toBe("start");
expect(parsed[1]).toEqual({ type: "error", reason: "error", errorMessage: "connection reset" });
expect(parsed[2]).toBe("[DONE]");
});
});
describe("pi-native formatError", () => {
it("emits { error: { type, message } } with the given status", async () => {
const res = formatError(401, "authentication_error", "no credential");
expect(res.status).toBe(401);
expect(res.headers.get("Content-Type")).toBe("application/json; charset=utf-8");
expect(await res.json()).toEqual({ error: { type: "authentication_error", message: "no credential" } });
});
});