* MCP Client.
*
* Handles connection initialization, tool listing, and tool calling.
*/
import * as path from "node:path";
import * as url from "node:url";
import { getProjectDir, logger, withTimeout } from "@oh-my-pi/pi-utils";
import { describeMCPTimeout, isMCPTimeoutEnabled, resolveMCPTimeoutMs } from "./timeout";
import { createHttpTransport } from "./transports/http";
import { createStdioTransport } from "./transports/stdio";
import type {
MCPGetPromptParams,
MCPGetPromptResult,
MCPHttpServerConfig,
MCPInitializeParams,
MCPInitializeResult,
MCPPrompt,
MCPPromptsListResult,
MCPRequestOptions,
MCPResource,
MCPResourceReadParams,
MCPResourceReadResult,
MCPResourceSubscribeParams,
MCPResourcesListResult,
MCPResourceTemplate,
MCPResourceTemplatesListResult,
MCPServerCapabilities,
MCPServerConfig,
MCPServerConnection,
MCPSseServerConfig,
MCPStdioServerConfig,
MCPToolCallParams,
MCPToolCallResult,
MCPToolDefinition,
MCPToolsListResult,
MCPTransport,
} from "./types";
const PROTOCOL_VERSION = "2025-03-26";
const CLIENT_INFO = {
name: "omp-coding-agent",
version: "1.0.0",
};
* Default handler for standard MCP server-to-client requests.
* Handles `ping` and `roots/list`; rejects unknown methods with -32601.
* Reads getProjectDir() at call time so the root stays stable even if
* the process cwd changes during tool execution.
*/
async function defaultRequestHandler(method: string, _params: unknown): Promise<unknown> {
switch (method) {
case "ping":
return {};
case "roots/list": {
const cwd = getProjectDir();
return {
roots: [{ uri: url.pathToFileURL(cwd).href, name: path.basename(cwd) }],
};
}
default:
throw Object.assign(new Error(`Unsupported server request: ${method}`), { code: -32601 });
}
}
* Create a transport for the given server config.
*/
async function createTransport(config: MCPServerConfig): Promise<MCPTransport> {
const serverType = config.type ?? "stdio";
switch (serverType) {
case "stdio":
return createStdioTransport(config as MCPStdioServerConfig);
case "http":
case "sse":
return createHttpTransport(config as MCPHttpServerConfig | MCPSseServerConfig);
default:
throw new Error(`Unknown server type: ${serverType}`);
}
}
* Initialize connection with MCP server.
*/
async function initializeConnection(
transport: MCPTransport,
options?: {
signal?: AbortSignal;
/** Called after the initialize response (which sets the session ID) but before notifications/initialized. */
onInitialized?: () => void | Promise<void>;
},
): Promise<MCPInitializeResult> {
const params: MCPInitializeParams = {
protocolVersion: PROTOCOL_VERSION,
capabilities: {
roots: { listChanged: false },
},
clientInfo: CLIENT_INFO,
};
const result = await transport.request<MCPInitializeResult>(
"initialize",
params as unknown as Record<string, unknown>,
{ signal: options?.signal },
);
if (options?.signal?.aborted) {
throw options.signal.reason instanceof Error ? options.signal.reason : new Error("Aborted");
}
await options?.onInitialized?.();
await transport.notify("notifications/initialized");
return result;
}
* Connect to an MCP server.
* Has a 30 second timeout by default to prevent blocking startup.
* Set OMP_MCP_TIMEOUT_MS=0 to disable MCP client-side timeouts.
*/
export async function connectToServer(
name: string,
config: MCPServerConfig,
options?: {
signal?: AbortSignal;
onNotification?: (method: string, params: unknown) => void;
onRequest?: (method: string, params: unknown) => Promise<unknown>;
},
): Promise<MCPServerConnection> {
const timeoutMs = resolveMCPTimeoutMs(config.timeout);
let transport: MCPTransport | undefined;
const connect = async (): Promise<MCPServerConnection> => {
transport = await createTransport(config);
if (options?.onNotification) {
transport.onNotification = options.onNotification;
}
transport.onRequest = options?.onRequest ?? defaultRequestHandler;
try {
const initResult = await initializeConnection(transport, {
signal: options?.signal,
async onInitialized() {
if ("startSSEListener" in transport! && typeof transport!.startSSEListener === "function") {
await (transport as { startSSEListener(): Promise<void> }).startSSEListener();
}
},
});
return {
name,
config,
transport,
serverInfo: initResult.serverInfo,
capabilities: initResult.capabilities,
instructions: initResult.instructions,
};
} catch (error) {
await transport.close();
throw error;
}
};
try {
if (!isMCPTimeoutEnabled(timeoutMs)) {
return await connect();
}
return await withTimeout(
connect(),
timeoutMs,
`Connection to MCP server "${name}" timed out after ${describeMCPTimeout(timeoutMs)}`,
options?.signal,
);
} catch (error) {
if (transport) {
void transport.close().catch(() => {});
}
throw error;
}
}
* List tools from a connected server.
*/
export async function listTools(
connection: MCPServerConnection,
options?: { signal?: AbortSignal },
): Promise<MCPToolDefinition[]> {
if (!connection.capabilities.tools) {
return [];
}
if (connection.tools) {
return connection.tools;
}
const allTools: MCPToolDefinition[] = [];
let cursor: string | undefined;
do {
const params: Record<string, unknown> = {};
if (cursor) {
params.cursor = cursor;
}
const result = await connection.transport.request<MCPToolsListResult>("tools/list", params, options);
allTools.push(...result.tools);
cursor = result.nextCursor;
} while (cursor);
connection.tools = allTools;
return allTools;
}
* Call a tool on a connected server.
*/
export async function callTool(
connection: MCPServerConnection,
toolName: string,
args: Record<string, unknown> = {},
options?: MCPRequestOptions,
): Promise<MCPToolCallResult> {
const params: MCPToolCallParams = {
name: toolName,
arguments: args,
};
return connection.transport.request<MCPToolCallResult>(
"tools/call",
params as unknown as Record<string, unknown>,
options,
);
}
* Disconnect from a server.
*/
export async function disconnectServer(connection: MCPServerConnection): Promise<void> {
await connection.transport.close();
}
* Check if a server supports tools.
*/
export function serverSupportsTools(capabilities: MCPServerCapabilities): boolean {
return capabilities.tools !== undefined;
}
* List resources from a connected server.
*/
export async function listResources(
connection: MCPServerConnection,
options?: { signal?: AbortSignal },
): Promise<MCPResource[]> {
if (!connection.capabilities.resources) {
return [];
}
if (connection.resources) {
return connection.resources;
}
const allResources: MCPResource[] = [];
let cursor: string | undefined;
do {
const params: Record<string, unknown> = {};
if (cursor) {
params.cursor = cursor;
}
const result = await connection.transport.request<MCPResourcesListResult>("resources/list", params, options);
allResources.push(...result.resources);
cursor = result.nextCursor;
} while (cursor);
connection.resources = allResources;
return allResources;
}
* List resource templates from a connected server.
*/
export async function listResourceTemplates(
connection: MCPServerConnection,
options?: { signal?: AbortSignal },
): Promise<MCPResourceTemplate[]> {
if (!connection.capabilities.resources) {
return [];
}
if (connection.resourceTemplates) {
return connection.resourceTemplates;
}
const allTemplates: MCPResourceTemplate[] = [];
let cursor: string | undefined;
do {
const params: Record<string, unknown> = {};
if (cursor) {
params.cursor = cursor;
}
const result = await connection.transport.request<MCPResourceTemplatesListResult>(
"resources/templates/list",
params,
options,
);
allTemplates.push(...result.resourceTemplates);
cursor = result.nextCursor;
} while (cursor);
connection.resourceTemplates = allTemplates;
return allTemplates;
}
* Read a resource from a connected server.
*/
export async function readResource(
connection: MCPServerConnection,
uri: string,
options?: MCPRequestOptions,
): Promise<MCPResourceReadResult> {
const params: MCPResourceReadParams = { uri };
return connection.transport.request<MCPResourceReadResult>(
"resources/read",
params as unknown as Record<string, unknown>,
options,
);
}
* Subscribe to resource update notifications.
*/
export async function subscribeToResources(
connection: MCPServerConnection,
uris: string[],
options?: MCPRequestOptions,
): Promise<void> {
if (uris.length === 0 || !connection.capabilities.resources?.subscribe) return;
const results = await Promise.allSettled(
uris.map(uri => {
const params: MCPResourceSubscribeParams = { uri };
return connection.transport.request(
"resources/subscribe",
params as unknown as Record<string, unknown>,
options,
);
}),
);
for (const result of results) {
if (result.status === "rejected") {
logger.warn("Failed to subscribe to MCP resource", { error: result.reason });
}
}
}
* Unsubscribe from resource update notifications.
*/
export async function unsubscribeFromResources(
connection: MCPServerConnection,
uris: string[],
options?: MCPRequestOptions,
): Promise<void> {
if (uris.length === 0 || !connection.capabilities.resources?.subscribe) return;
const results = await Promise.allSettled(
uris.map(uri => {
const params: MCPResourceSubscribeParams = { uri };
return connection.transport.request(
"resources/unsubscribe",
params as unknown as Record<string, unknown>,
options,
);
}),
);
for (const result of results) {
if (result.status === "rejected") {
logger.warn("Failed to unsubscribe from MCP resource", { error: result.reason });
}
}
}
* Check if a server supports resource subscriptions.
*/
export function serverSupportsResourceSubscriptions(capabilities: MCPServerCapabilities): boolean {
return capabilities.resources?.subscribe === true;
}
* Check if a server supports resources.
*/
export function serverSupportsResources(capabilities: MCPServerCapabilities): boolean {
return capabilities.resources !== undefined;
}
* List prompts from a connected server.
*/
export async function listPrompts(
connection: MCPServerConnection,
options?: { signal?: AbortSignal },
): Promise<MCPPrompt[]> {
if (!connection.capabilities.prompts) {
return [];
}
if (connection.prompts) {
return connection.prompts;
}
const allPrompts: MCPPrompt[] = [];
let cursor: string | undefined;
do {
const params: Record<string, unknown> = {};
if (cursor) {
params.cursor = cursor;
}
const result = await connection.transport.request<MCPPromptsListResult>("prompts/list", params, options);
allPrompts.push(...result.prompts);
cursor = result.nextCursor;
} while (cursor);
connection.prompts = allPrompts;
return allPrompts;
}
* Get a specific prompt from a connected server.
*/
export async function getPrompt(
connection: MCPServerConnection,
name: string,
args?: Record<string, string>,
options?: MCPRequestOptions,
): Promise<MCPGetPromptResult> {
const params: MCPGetPromptParams = { name };
if (args && Object.keys(args).length > 0) {
params.arguments = args;
}
return connection.transport.request<MCPGetPromptResult>(
"prompts/get",
params as unknown as Record<string, unknown>,
options,
);
}
* Check if a server supports prompts.
*/
export function serverSupportsPrompts(capabilities: MCPServerCapabilities): boolean {
return capabilities.prompts !== undefined;
}