* MCP HTTP transport (Streamable HTTP).
*
* Implements JSON-RPC 2.0 over HTTP POST with optional SSE streaming.
* Based on MCP spec 2025-03-26.
*/
import { logger, readSseJson, Snowflake } from "@oh-my-pi/pi-utils";
import type {
JsonRpcError,
JsonRpcMessage,
JsonRpcRequest,
JsonRpcResponse,
MCPHttpServerConfig,
MCPRequestOptions,
MCPSseServerConfig,
MCPTransport,
} from "../../mcp/types";
import { toJsonRpcError } from "../../mcp/types";
import { createMCPTimeout, getNeverAbortSignal, isMCPTimeoutEnabled, resolveMCPTimeoutMs } from "../timeout";
const HTTP_SSE_CONNECT_TIMEOUT_MS = 1_000;
* Best-effort startup deadline for the optional Streamable HTTP GET SSE listener.
*
* Returns `0` (disabled) when the operator has explicitly disabled MCP client-side
* timeouts via `timeout: 0` or `OMP_MCP_TIMEOUT_MS=0`, mirroring the rest of the
* MCP timeout surface. Otherwise caps the wait at one second and scales below
* short request timeouts so connect-time never exceeds the request budget.
*/
export function resolveSSEConnectTimeoutMs(configTimeout?: number): number {
const requestTimeout = resolveMCPTimeoutMs(configTimeout);
if (!isMCPTimeoutEnabled(requestTimeout)) return 0;
const boundedTimeout = Math.min(HTTP_SSE_CONNECT_TIMEOUT_MS, Math.floor(requestTimeout / 4));
return Math.max(1, boundedTimeout);
}
* HTTP transport for MCP servers.
* Uses POST for requests, supports SSE responses.
*/
export class HttpTransport implements MCPTransport {
#connected = false;
#sessionId: string | null = null;
#sseConnection: AbortController | null = null;
onClose?: () => void;
onError?: (error: Error) => void;
onNotification?: (method: string, params: unknown) => void;
onRequest?: (method: string, params: unknown) => Promise<unknown>;
onAuthError?: () => Promise<Record<string, string> | null>;
constructor(private config: MCPHttpServerConfig | MCPSseServerConfig) {}
get connected(): boolean {
return this.#connected;
}
get url(): string {
return this.config.url;
}
* Mark transport as connected.
* HTTP doesn't need persistent connection, but we track state.
*/
async connect(): Promise<void> {
if (this.#connected) return;
this.#connected = true;
}
* Start SSE listener for server-initiated messages.
* Resolves once the SSE connection is established (or fails/unsupported).
* Message reading continues in the background.
*/
async startSSEListener(): Promise<void> {
if (!this.#connected) return;
if (this.#sseConnection) return;
this.#sseConnection = new AbortController();
const headers: Record<string, string> = {
Accept: "text/event-stream",
...this.config.headers,
};
if (this.#sessionId) {
headers["Mcp-Session-Id"] = this.#sessionId;
}
let response: Response;
let timedOut = false;
const startupTimeoutMs = resolveSSEConnectTimeoutMs(this.config.timeout);
const timeoutId =
startupTimeoutMs > 0
? setTimeout(() => {
timedOut = true;
this.#sseConnection?.abort();
}, startupTimeoutMs)
: null;
try {
response = await fetch(this.config.url, {
method: "GET",
headers,
signal: this.#sseConnection.signal,
});
} catch (error) {
this.#sseConnection = null;
if (error instanceof Error && error.name !== "AbortError" && !timedOut) {
this.onError?.(error);
}
return;
} finally {
if (timeoutId !== null) clearTimeout(timeoutId);
}
if (response.status === 405 || !response.ok || !response.body) {
await response.body?.cancel();
this.#sseConnection = null;
return;
}
const signal = this.#sseConnection.signal;
void this.#readSSEStream(response.body!, signal).finally(() => {
const wasConnected = this.#connected;
this.#sseConnection = null;
if (wasConnected) this.onClose?.();
});
}
async #readSSEStream(body: ReadableStream<Uint8Array>, signal: AbortSignal): Promise<void> {
try {
for await (const message of readSseJson<JsonRpcMessage>(body, signal)) {
if (!this.#connected) break;
this.#dispatchSSEMessage(message);
}
} catch (error) {
if (error instanceof Error && error.name !== "AbortError") {
logger.debug("HTTP SSE stream error", { url: this.config.url, error: error.message });
this.onError?.(error);
}
}
}
#dispatchSSEMessage(message: JsonRpcMessage | JsonRpcMessage[]): void {
if (Array.isArray(message)) {
for (const m of message) this.#dispatchSSEMessage(m);
return;
}
if ("method" in message && "id" in message && message.id != null) {
void this.#handleServerRequest(message as JsonRpcRequest);
return;
}
if ("method" in message && !("id" in message)) {
this.onNotification?.(message.method, message.params);
}
}
async request<T = unknown>(
method: string,
params?: Record<string, unknown>,
options?: MCPRequestOptions,
): Promise<T> {
try {
return await this.#executeRequest<T>(method, params, options);
} catch (error) {
if (this.onAuthError && error instanceof Error && /^HTTP (401|403):/.test(error.message)) {
const newHeaders = await this.onAuthError();
if (newHeaders) {
this.config = { ...this.config, headers: newHeaders };
return this.#executeRequest<T>(method, params, options);
}
}
throw error;
}
}
async #executeRequest<T>(
method: string,
params: Record<string, unknown> | undefined,
options: MCPRequestOptions | undefined,
): Promise<T> {
if (!this.#connected) {
throw new Error("Transport not connected");
}
const id = Snowflake.next();
const body = {
jsonrpc: "2.0" as const,
id,
method,
params: params ?? {},
};
const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
...this.config.headers,
};
if (this.#sessionId) {
headers["Mcp-Session-Id"] = this.#sessionId;
}
const timeout = resolveMCPTimeoutMs(this.config.timeout);
const operation = createMCPTimeout(timeout, options?.signal);
try {
const response = await fetch(this.config.url, {
method: "POST",
headers,
body: JSON.stringify(body),
signal: operation.signal,
});
operation.clear();
const newSessionId = response.headers.get("Mcp-Session-Id");
if (newSessionId) {
this.#sessionId = newSessionId;
}
if (!response.ok) {
const text = await response.text();
const wwwAuthenticate = response.headers.get("WWW-Authenticate");
const mcpAuthServer = response.headers.get("Mcp-Auth-Server");
const authHints = [
wwwAuthenticate ? `WWW-Authenticate: ${wwwAuthenticate}` : null,
mcpAuthServer ? `Mcp-Auth-Server: ${mcpAuthServer}` : null,
]
.filter(Boolean)
.join("; ");
const suffix = authHints ? ` [${authHints}]` : "";
throw new Error(`HTTP ${response.status}: ${text}${suffix}`);
}
const contentType = response.headers.get("Content-Type") ?? "";
if (contentType.includes("text/event-stream")) {
return this.#parseSSEResponse<T>(response, id, options);
}
const result = (await response.json()) as JsonRpcResponse;
if (result.error) {
throw new Error(`MCP error ${result.error.code}: ${result.error.message}`);
}
return result.result as T;
} catch (error) {
operation.clear();
if (operation.isTimeoutAbort(error)) {
throw new Error(`Request timeout after ${timeout}ms`);
}
throw error;
}
}
#parseSSEResponse<T>(response: Response, expectedId: string | number, options?: MCPRequestOptions): Promise<T> {
if (!response.body) {
throw new Error("No response body");
}
const timeout = resolveMCPTimeoutMs(this.config.timeout);
const operation = createMCPTimeout(timeout, options?.signal);
const signal = operation.signal ?? getNeverAbortSignal();
const { promise, resolve, reject } = Promise.withResolvers<T>();
let captured = false;
const drain = async (): Promise<void> => {
try {
for await (const raw of readSseJson<JsonRpcMessage | JsonRpcMessage[]>(response.body!, signal)) {
const messages = Array.isArray(raw) ? raw : [raw];
for (const message of messages) {
if (
!captured &&
"id" in message &&
message.id === expectedId &&
("result" in message || "error" in message)
) {
captured = true;
operation.clear();
if (message.error) {
reject(new Error(`MCP error ${message.error.code}: ${message.error.message}`));
} else {
resolve(message.result as T);
}
continue;
}
if (!this.#connected) continue;
this.#dispatchSSEMessage(message);
}
}
if (!captured) {
reject(new Error(`No response received for request ID ${expectedId}`));
}
} catch (error) {
if (captured) return;
if (operation.isTimeoutAbort(error)) {
reject(new Error(`SSE response timeout after ${timeout}ms`));
} else {
reject(error as Error);
}
} finally {
operation.clear();
}
};
void drain();
return promise;
}
async #handleServerRequest(request: JsonRpcRequest): Promise<void> {
if (!this.onRequest) {
await this.#sendServerResponse(request.id, undefined, { code: -32601, message: "Method not found" });
return;
}
try {
const result = await this.onRequest(request.method, request.params);
await this.#sendServerResponse(request.id, result);
} catch (error) {
await this.#sendServerResponse(request.id, undefined, toJsonRpcError(error));
}
}
async #sendServerResponse(id: string | number, result?: unknown, error?: JsonRpcError): Promise<void> {
if (!this.#connected) return;
const body = error
? { jsonrpc: "2.0" as const, id, error }
: { jsonrpc: "2.0" as const, id, result: result ?? {} };
const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
...this.config.headers,
};
if (this.#sessionId) {
headers["Mcp-Session-Id"] = this.#sessionId;
}
const timeout = resolveMCPTimeoutMs(this.config.timeout);
let operation = createMCPTimeout(timeout);
try {
const resp = await fetch(this.config.url, {
method: "POST",
headers,
body: JSON.stringify(body),
signal: operation.signal,
});
operation.clear();
if (this.onAuthError && (resp.status === 401 || resp.status === 403)) {
await resp.body?.cancel();
const newHeaders = await this.onAuthError();
if (newHeaders) {
this.config.headers ??= {};
Object.assign(this.config.headers, newHeaders);
Object.assign(headers, newHeaders);
operation = createMCPTimeout(timeout);
const retry = await fetch(this.config.url, {
method: "POST",
headers,
body: JSON.stringify(body),
signal: operation.signal,
});
operation.clear();
await retry.body?.cancel();
return;
}
}
await resp.body?.cancel();
} catch {
operation.clear();
}
}
async notify(method: string, params?: Record<string, unknown>): Promise<void> {
if (!this.#connected) {
throw new Error("Transport not connected");
}
const body = {
jsonrpc: "2.0" as const,
method,
params: params ?? {},
};
const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
...this.config.headers,
};
if (this.#sessionId) {
headers["Mcp-Session-Id"] = this.#sessionId;
}
const timeout = resolveMCPTimeoutMs(this.config.timeout);
const operation = createMCPTimeout(timeout);
try {
const response = await fetch(this.config.url, {
method: "POST",
headers,
body: JSON.stringify(body),
signal: operation.signal,
});
operation.clear();
if (!response.ok && response.status !== 202) {
const text = await response.text();
throw new Error(`HTTP ${response.status}: ${text}`);
}
const contentType = response.headers.get("Content-Type") ?? "";
if (contentType.includes("text/event-stream") && response.body) {
if (this.#sseConnection) {
void this.#readSSEStream(response.body, this.#sseConnection.signal);
} else {
const readOperation = createMCPTimeout(timeout);
const signal = readOperation.signal ?? getNeverAbortSignal();
void this.#readSSEStream(response.body, signal).finally(() => readOperation.clear());
}
} else {
await response.body?.cancel();
}
} catch (error) {
operation.clear();
if (operation.isTimeoutAbort(error)) {
throw new Error(`Notify timeout after ${timeout}ms`);
}
throw error;
}
}
async close(): Promise<void> {
if (!this.#connected) return;
this.#connected = false;
if (this.#sseConnection) {
this.#sseConnection.abort();
this.#sseConnection = null;
}
if (this.#sessionId) {
const timeout = resolveMCPTimeoutMs(this.config.timeout);
const operation = createMCPTimeout(timeout);
try {
const headers: Record<string, string> = {
...this.config.headers,
"Mcp-Session-Id": this.#sessionId,
};
await fetch(this.config.url, {
method: "DELETE",
headers,
signal: operation.signal,
});
operation.clear();
} catch {
operation.clear();
}
this.#sessionId = null;
}
this.onClose?.();
this.onClose = undefined;
}
}
* Create and connect an HTTP transport.
*/
export async function createHttpTransport(config: MCPHttpServerConfig | MCPSseServerConfig): Promise<HttpTransport> {
const transport = new HttpTransport(config);
await transport.connect();
return transport;
}