* MCP Server Manager.
*
* Discovers, connects to, and manages MCP servers.
* Handles tool loading and lifecycle.
*/
import * as path from "node:path";
import * as url from "node:url";
import type { TSchema } from "@oh-my-pi/pi-ai";
import { logger } from "@oh-my-pi/pi-utils";
import type { SourceMeta } from "../capability/types";
import { resolveConfigValue } from "../config/resolve-config-value";
import type { CustomTool } from "../extensibility/custom-tools/types";
import type { AuthStorage } from "../session/auth-storage";
import {
connectToServer,
disconnectServer,
getPrompt,
listPrompts,
listResources,
listResourceTemplates,
listTools,
readResource,
serverSupportsPrompts,
serverSupportsResources,
subscribeToResources,
unsubscribeFromResources,
} from "./client";
import { loadAllMCPConfigs, validateServerConfig } from "./config";
import { refreshMCPOAuthToken } from "./oauth-flow";
import type { MCPToolDetails } from "./tool-bridge";
import { DeferredMCPTool, MCPTool } from "./tool-bridge";
import type { MCPToolCache } from "./tool-cache";
import { HttpTransport } from "./transports/http";
import type {
MCPGetPromptResult,
MCPPrompt,
MCPRequestOptions,
MCPResource,
MCPResourceReadResult,
MCPResourceTemplate,
MCPServerConfig,
MCPServerConnection,
MCPToolDefinition,
} from "./types";
import { MCPNotificationMethods } from "./types";
type ToolLoadResult = {
connection: MCPServerConnection;
serverTools: MCPToolDefinition[];
};
type TrackedPromise<T> = {
promise: Promise<T>;
status: "pending" | "fulfilled" | "rejected";
value?: T;
reason?: unknown;
};
const STARTUP_TIMEOUT_MS = 250;
function trackPromise<T>(promise: Promise<T>): TrackedPromise<T> {
const tracked: TrackedPromise<T> = { promise, status: "pending" };
promise.then(
value => {
tracked.status = "fulfilled";
tracked.value = value;
},
reason => {
tracked.status = "rejected";
tracked.reason = reason;
},
);
return tracked;
}
function delay(ms: number): Promise<void> {
return Bun.sleep(ms);
}
* Stable, total ordering on MCP tools by name.
*
* Anthropic prompt caching keys on byte-identical tool definitions: any reorder
* of the tools array invalidates the tools cache breakpoint and forces a full
* prefix rebuild on the next request. MCP servers connect/reconnect at arbitrary
* times, so the natural "insertion order" of `#tools` is non-deterministic.
* Sorting after every mutation makes the array bytes independent of connection
* sequence.
*/
export function sortMCPToolsByName<T extends { name: string }>(tools: T[]): T[] {
tools.sort((a, b) => (a.name < b.name ? -1 : a.name > b.name ? 1 : 0));
return tools;
}
export function resolveSubscriptionPostAction(
notificationsEnabled: boolean,
currentEpoch: number,
subscriptionEpoch: number,
): "rollback" | "ignore" | "apply" {
if (!notificationsEnabled) return "rollback";
if (currentEpoch !== subscriptionEpoch) return "ignore";
return "apply";
}
export interface MCPLoadResult {
tools: CustomTool<TSchema, MCPToolDetails>[];
errors: Map<string, string>;
connectedServers: string[];
exaApiKeys: string[];
}
export interface MCPDiscoverOptions {
enableProjectConfig?: boolean;
filterExa?: boolean;
filterBrowser?: boolean;
onConnecting?: (serverNames: string[]) => void;
}
* MCP Server Manager.
*
* Manages connections to MCP servers and provides tools to the agent.
*/
export class MCPManager {
static #instance: MCPManager | undefined;
static instance(): MCPManager | undefined {
return MCPManager.#instance;
}
static setInstance(value: MCPManager | undefined): void {
MCPManager.#instance = value;
}
static resetForTests(): void {
MCPManager.#instance = undefined;
}
#connections = new Map<string, MCPServerConnection>();
#tools: CustomTool<TSchema, MCPToolDetails>[] = [];
#pendingConnections = new Map<string, Promise<MCPServerConnection>>();
#pendingToolLoads = new Map<string, Promise<ToolLoadResult>>();
#sources = new Map<string, SourceMeta>();
#authStorage: AuthStorage | null = null;
#onNotification?: (serverName: string, method: string, params: unknown) => void;
#onToolsChanged?: (tools: CustomTool<TSchema, MCPToolDetails>[]) => void;
#onResourcesChanged?: (serverName: string, uri: string) => void;
#onPromptsChanged?: (serverName: string) => void;
#notificationsEnabled = false;
#notificationsEpoch = 0;
#subscribedResources = new Map<string, Set<string>>();
#pendingResourceRefresh = new Map<string, { connection: MCPServerConnection; promise: Promise<void> }>();
#pendingReconnections = new Map<string, Promise<MCPServerConnection | null>>();
#serverConfigs = new Map<string, MCPServerConfig>();
#epoch = 0;
constructor(
private cwd: string,
private toolCache: MCPToolCache | null = null,
) {}
* Set a callback to receive all server notifications.
*/
setOnNotification(handler: (serverName: string, method: string, params: unknown) => void): void {
this.#onNotification = handler;
}
* Set a callback to fire when any server's tools change.
*/
setOnToolsChanged(handler: (tools: CustomTool<TSchema, MCPToolDetails>[]) => void): void {
this.#onToolsChanged = handler;
}
* Set a callback to fire when any server's resources change.
*/
setOnResourcesChanged(handler: (serverName: string, uri: string) => void): void {
this.#onResourcesChanged = handler;
}
* Set a callback to fire when any server's prompts change.
*/
setOnPromptsChanged(handler: (serverName: string) => void): void {
this.#onPromptsChanged = handler;
for (const [name, connection] of this.#connections) {
if (connection.prompts?.length) {
handler(name);
}
}
}
#subscribeAndTrack(name: string, connection: MCPServerConnection, uris: string[], notificationEpoch: number): void {
void subscribeToResources(connection, uris)
.then(() => {
const action = resolveSubscriptionPostAction(
this.#notificationsEnabled,
this.#notificationsEpoch,
notificationEpoch,
);
if (action === "rollback") {
void unsubscribeFromResources(connection, uris).catch(error => {
logger.debug("Failed to rollback stale MCP resource subscription", {
path: `mcp:${name}`,
error,
});
});
return;
}
if (action === "ignore") {
return;
}
this.#subscribedResources.set(name, new Set(uris));
})
.catch(error => {
logger.debug("Failed to subscribe to MCP resources", { path: `mcp:${name}`, error });
});
}
setNotificationsEnabled(enabled: boolean): void {
const wasEnabled = this.#notificationsEnabled;
this.#notificationsEnabled = enabled;
if (enabled === wasEnabled) return;
this.#notificationsEpoch += 1;
const notificationEpoch = this.#notificationsEpoch;
if (enabled) {
for (const [name, connection] of this.#connections) {
if (connection.capabilities.resources?.subscribe && connection.resources) {
const uris = connection.resources.map(r => r.uri);
this.#subscribeAndTrack(name, connection, uris, notificationEpoch);
}
}
return;
}
for (const [name, connection] of this.#connections) {
const uris = this.#subscribedResources.get(name);
if (uris && uris.size > 0) {
void unsubscribeFromResources(connection, Array.from(uris)).catch(error => {
logger.debug("Failed to unsubscribe MCP resources", { path: `mcp:${name}`, error });
});
}
}
this.#subscribedResources.clear();
}
* Set the auth storage for resolving OAuth credentials.
*/
setAuthStorage(authStorage: AuthStorage): void {
this.#authStorage = authStorage;
}
* Discover and connect to all MCP servers from .mcp.json files.
* Returns tools and any connection errors.
*/
async discoverAndConnect(options?: MCPDiscoverOptions): Promise<MCPLoadResult> {
const { configs, exaApiKeys, sources } = await loadAllMCPConfigs(this.cwd, {
enableProjectConfig: options?.enableProjectConfig,
filterExa: options?.filterExa,
filterBrowser: options?.filterBrowser,
});
const result = await this.connectServers(configs, sources, options?.onConnecting);
result.exaApiKeys = exaApiKeys;
return result;
}
* Connect to specific MCP servers.
* Connections are made in parallel for faster startup.
*/
async connectServers(
configs: Record<string, MCPServerConfig>,
sources: Record<string, SourceMeta>,
onConnecting?: (serverNames: string[]) => void,
): Promise<MCPLoadResult> {
type ConnectionTask = {
name: string;
config: MCPServerConfig;
tracked: TrackedPromise<ToolLoadResult>;
toolsPromise: Promise<ToolLoadResult>;
};
const errors = new Map<string, string>();
const connectedServers = new Set<string>();
const allTools: CustomTool<TSchema, MCPToolDetails>[] = [];
const reportedErrors = new Set<string>();
let allowBackgroundLogging = false;
const connectionTasks: ConnectionTask[] = [];
for (const [name, config] of Object.entries(configs)) {
if (sources[name]) {
this.#sources.set(name, sources[name]);
const existing = this.#connections.get(name);
if (existing) {
existing._source = sources[name];
}
}
if (this.#connections.has(name)) {
connectedServers.add(name);
continue;
}
if (
this.#pendingConnections.has(name) ||
this.#pendingToolLoads.has(name) ||
this.#pendingReconnections.has(name)
) {
continue;
}
const validationErrors = validateServerConfig(name, config);
if (validationErrors.length > 0) {
errors.set(name, validationErrors.join("; "));
reportedErrors.add(name);
continue;
}
this.#serverConfigs.set(name, config);
const connectionPromise = (async () => {
const resolvedConfig = await this.#resolveAuthConfig(config);
return connectToServer(name, resolvedConfig, {
onNotification: (method, params) => {
this.#handleServerNotification(name, method, params);
},
onRequest: (method, params) => {
return this.#handleServerRequest(method, params);
},
});
})().then(
connection => {
connection.config = config;
this.#serverConfigs.set(name, config);
if (sources[name]) {
connection._source = sources[name];
}
if (this.#pendingConnections.get(name) === connectionPromise) {
this.#pendingConnections.delete(name);
this.#connections.set(name, connection);
}
if (connection.transport instanceof HttpTransport && config.auth?.type === "oauth") {
connection.transport.onAuthError = async () => {
const refreshed = await this.#resolveAuthConfig(config, true);
if (refreshed.type === "http" || refreshed.type === "sse") {
return refreshed.headers ?? null;
}
return null;
};
}
connection.transport.onClose = () => {
logger.debug("MCP transport lost, triggering reconnect", { path: `mcp:${name}` });
void this.reconnectServer(name);
};
return connection;
},
error => {
if (this.#pendingConnections.get(name) === connectionPromise) {
this.#pendingConnections.delete(name);
}
throw error;
},
);
this.#pendingConnections.set(name, connectionPromise);
const toolsPromise = connectionPromise.then(async connection => {
const serverTools = await listTools(connection);
return { connection, serverTools };
});
this.#pendingToolLoads.set(name, toolsPromise);
const tracked = trackPromise(toolsPromise);
connectionTasks.push({ name, config, tracked, toolsPromise });
void toolsPromise
.then(async ({ connection, serverTools }) => {
if (this.#pendingToolLoads.get(name) !== toolsPromise) return;
this.#pendingToolLoads.delete(name);
const reconnect = () => this.reconnectServer(name);
const customTools = MCPTool.fromTools(connection, serverTools, reconnect);
this.#replaceServerTools(name, customTools);
this.#onToolsChanged?.(this.#tools);
void this.toolCache?.set(name, config, serverTools);
await this.#loadServerResourcesAndPrompts(name, connection);
})
.catch(error => {
if (this.#pendingToolLoads.get(name) !== toolsPromise) return;
this.#pendingToolLoads.delete(name);
if (!allowBackgroundLogging || reportedErrors.has(name)) return;
const message = error instanceof Error ? error.message : String(error);
logger.error("MCP tool load failed", { path: `mcp:${name}`, error: message });
});
}
if (connectionTasks.length > 0 && onConnecting) {
onConnecting(connectionTasks.map(task => task.name));
}
if (connectionTasks.length > 0) {
await Promise.race([
Promise.allSettled(connectionTasks.map(task => task.tracked.promise)),
delay(STARTUP_TIMEOUT_MS),
]);
const cachedTools = new Map<string, MCPToolDefinition[]>();
const pendingTasks = connectionTasks.filter(task => task.tracked.status === "pending");
if (pendingTasks.length > 0) {
if (this.toolCache) {
await Promise.all(
pendingTasks.map(async task => {
const cached = await this.toolCache?.get(task.name, task.config);
if (cached) {
cachedTools.set(task.name, cached);
}
}),
);
}
const pendingWithoutCache = pendingTasks.filter(task => !cachedTools.has(task.name));
if (pendingWithoutCache.length > 0) {
await Promise.allSettled(pendingWithoutCache.map(task => task.tracked.promise));
}
}
for (const task of connectionTasks) {
const { name } = task;
if (task.tracked.status === "fulfilled") {
const value = task.tracked.value;
if (!value) continue;
const { connection, serverTools } = value;
connectedServers.add(name);
const reconnect = () => this.reconnectServer(name);
allTools.push(...MCPTool.fromTools(connection, serverTools, reconnect));
} else if (task.tracked.status === "rejected") {
const message =
task.tracked.reason instanceof Error ? task.tracked.reason.message : String(task.tracked.reason);
errors.set(name, message);
reportedErrors.add(name);
} else {
const cached = cachedTools.get(name);
if (cached) {
const source = this.#sources.get(name);
const reconnect = () => this.reconnectServer(name);
allTools.push(
...DeferredMCPTool.fromTools(name, cached, () => this.waitForConnection(name), source, reconnect),
);
}
}
}
}
sortMCPToolsByName(allTools);
this.#tools = allTools;
allowBackgroundLogging = true;
return {
tools: allTools,
errors,
connectedServers: Array.from(connectedServers),
exaApiKeys: [],
};
}
#replaceServerTools(name: string, tools: CustomTool<TSchema, MCPToolDetails>[]): void {
this.#tools = this.#tools.filter(t => !t.name.startsWith(`mcp__${name}_`));
this.#tools.push(...tools);
sortMCPToolsByName(this.#tools);
}
#triggerNotificationRefresh(serverName: string, kind: "tools" | "resources" | "prompts"): void {
const refresh = (() => {
switch (kind) {
case "tools":
return this.refreshServerTools(serverName);
case "resources":
return this.refreshServerResources(serverName);
case "prompts":
return this.refreshServerPrompts(serverName);
}
})();
void refresh.catch(error => {
logger.debug("Failed MCP notification refresh", { path: `mcp:${serverName}`, kind, error });
});
}
#handleServerNotification(serverName: string, method: string, params: unknown): void {
logger.debug("MCP notification received", { path: `mcp:${serverName}`, method });
switch (method) {
case MCPNotificationMethods.TOOLS_LIST_CHANGED:
this.#triggerNotificationRefresh(serverName, "tools");
break;
case MCPNotificationMethods.RESOURCES_LIST_CHANGED:
this.#triggerNotificationRefresh(serverName, "resources");
break;
case MCPNotificationMethods.RESOURCES_UPDATED: {
const uri = (params as { uri?: string })?.uri;
const subscribed = this.#subscribedResources.get(serverName);
if (uri && subscribed?.has(uri)) {
this.#onResourcesChanged?.(serverName, uri);
}
break;
}
case MCPNotificationMethods.PROMPTS_LIST_CHANGED:
this.#triggerNotificationRefresh(serverName, "prompts");
break;
default:
break;
}
this.#onNotification?.(serverName, method, params);
}
async #handleServerRequest(method: string, _params: unknown): Promise<unknown> {
switch (method) {
case "ping":
return {};
case "roots/list":
return this.#getRoots();
default:
throw Object.assign(new Error(`Unsupported server request: ${method}`), { code: -32601 });
}
}
#getRoots(): { roots: Array<{ uri: string; name: string }> } {
return {
roots: [
{
uri: url.pathToFileURL(this.cwd).href,
name: path.basename(this.cwd),
},
],
};
}
* Get all loaded tools.
*/
getTools(): CustomTool<TSchema, MCPToolDetails>[] {
return this.#tools;
}
* Get a specific connection.
*/
getConnection(name: string): MCPServerConnection | undefined {
return this.#connections.get(name);
}
* Get current connection status for a server.
*/
getConnectionStatus(name: string): "connected" | "connecting" | "disconnected" {
if (this.#connections.has(name)) return "connected";
if (
this.#pendingConnections.has(name) ||
this.#pendingToolLoads.has(name) ||
this.#pendingReconnections.has(name)
)
return "connecting";
return "disconnected";
}
* Get the source metadata for a server.
*/
getSource(name: string): SourceMeta | undefined {
return this.#sources.get(name) ?? this.#connections.get(name)?._source;
}
* Wait for a connection to complete (or fail).
*/
async waitForConnection(name: string): Promise<MCPServerConnection> {
const connection = this.#connections.get(name);
if (connection) return connection;
const pending = this.#pendingConnections.get(name);
if (pending) return pending;
const reconnecting = this.#pendingReconnections.get(name);
if (reconnecting) {
const result = await reconnecting;
if (result) return result;
}
throw new Error(`MCP server not connected: ${name}`);
}
* Resolve auth and shell-command substitutions in config before connecting.
*/
async prepareConfig(config: MCPServerConfig): Promise<MCPServerConfig> {
return this.#resolveAuthConfig(config);
}
* Get all connected server names.
*/
getConnectedServers(): string[] {
return Array.from(this.#connections.keys());
}
* Get all known server names (connected, connecting, or discovered).
*/
getAllServerNames(): string[] {
return Array.from(
new Set([...this.#sources.keys(), ...this.#connections.keys(), ...this.#pendingConnections.keys()]),
);
}
* Disconnect from a specific server.
*/
async disconnectServer(name: string): Promise<void> {
this.#pendingConnections.delete(name);
this.#pendingToolLoads.delete(name);
this.#pendingReconnections.delete(name);
this.#sources.delete(name);
this.#serverConfigs.delete(name);
this.#pendingResourceRefresh.delete(name);
const connection = this.#connections.get(name);
const subscribedUris = this.#subscribedResources.get(name);
if (subscribedUris && subscribedUris.size > 0 && connection) {
void unsubscribeFromResources(connection, Array.from(subscribedUris)).catch(() => {});
}
this.#subscribedResources.delete(name);
if (connection) {
connection.transport.onClose = undefined;
await disconnectServer(connection);
this.#connections.delete(name);
}
const hadTools = this.#tools.some(t => t.name.startsWith(`mcp__${name}_`));
this.#tools = this.#tools.filter(t => !t.name.startsWith(`mcp__${name}_`));
if (hadTools) this.#onToolsChanged?.(this.#tools);
if (connection?.prompts?.length) this.#onPromptsChanged?.(name);
}
* Disconnect from all servers.
*/
async disconnectAll(): Promise<void> {
this.#epoch++;
for (const conn of this.#connections.values()) {
conn.transport.onClose = undefined;
}
const promises = Array.from(this.#connections.values()).map(conn => disconnectServer(conn));
await Promise.allSettled(promises);
this.#pendingConnections.clear();
this.#pendingToolLoads.clear();
this.#pendingReconnections.clear();
this.#pendingResourceRefresh.clear();
this.#sources.clear();
this.#serverConfigs.clear();
this.#connections.clear();
this.#tools = [];
this.#subscribedResources.clear();
}
* Reconnect to a server after a connection failure.
* Tears down the stale connection, re-resolves auth, establishes a new
* connection, reloads tools, and notifies consumers.
* Concurrent calls for the same server share one reconnection attempt.
* Returns the new connection, or null if reconnection failed.
*/
async reconnectServer(name: string): Promise<MCPServerConnection | null> {
const pending = this.#pendingReconnections.get(name);
if (pending) return pending;
const attempt = this.#doReconnect(name);
this.#pendingReconnections.set(name, attempt);
return attempt.finally(() => this.#pendingReconnections.delete(name));
}
async #doReconnect(name: string): Promise<MCPServerConnection | null> {
const oldConnection = this.#connections.get(name);
const config = oldConnection?.config ?? this.#serverConfigs.get(name);
const source = this.#sources.get(name) ?? oldConnection?._source;
if (!config) return null;
logger.debug("MCP reconnecting", { path: `mcp:${name}` });
const reconnectEpoch = this.#epoch;
if (oldConnection) {
oldConnection.transport.onClose = undefined;
void oldConnection.transport.close().catch(() => {});
this.#connections.delete(name);
}
this.#pendingConnections.delete(name);
this.#pendingToolLoads.delete(name);
const delays = [500, 1000, 2000, 4000];
for (let attempt = 0; attempt <= delays.length; attempt++) {
if (this.#epoch !== reconnectEpoch) {
logger.debug("MCP reconnect aborted before attempt after configuration changed", {
path: `mcp:${name}`,
storedEpoch: reconnectEpoch,
currentEpoch: this.#epoch,
});
return null;
}
try {
const connection = await this.#connectAndWireServer(name, config, source, reconnectEpoch);
logger.debug("MCP reconnected", { path: `mcp:${name}`, tools: connection.tools?.length ?? 0 });
return connection;
} catch (error) {
if (this.#epoch !== reconnectEpoch) {
logger.debug("MCP reconnect aborted after configuration changed", {
path: `mcp:${name}`,
storedEpoch: reconnectEpoch,
currentEpoch: this.#epoch,
});
return null;
}
const msg = error instanceof Error ? error.message : String(error);
if (attempt < delays.length) {
logger.debug("MCP reconnect attempt failed, retrying", {
path: `mcp:${name}`,
attempt: attempt + 1,
error: msg,
});
await Bun.sleep(delays[attempt]);
} else {
logger.error("MCP reconnect failed after retries", { path: `mcp:${name}`, error: msg });
}
}
}
return null;
}
async #connectAndWireServer(
name: string,
config: MCPServerConfig,
source: SourceMeta | undefined,
reconnectEpoch: number,
): Promise<MCPServerConnection> {
const resolvedConfig = await this.#resolveAuthConfig(config);
const connection = await connectToServer(name, resolvedConfig, {
onNotification: (method, params) => {
this.#handleServerNotification(name, method, params);
},
onRequest: (method, params) => {
return this.#handleServerRequest(method, params);
},
});
connection.config = config;
if (source) connection._source = source;
if (!this.#serverConfigs.has(name) || this.#epoch !== reconnectEpoch) {
await connection.transport.close().catch(() => {});
throw new Error(`Server "${name}" was disconnected during reconnection`);
}
this.#connections.set(name, connection);
if (connection.transport instanceof HttpTransport && config.auth?.type === "oauth") {
connection.transport.onAuthError = async () => {
const refreshed = await this.#resolveAuthConfig(config, true);
if (refreshed.type === "http" || refreshed.type === "sse") {
return refreshed.headers ?? null;
}
return null;
};
}
connection.transport.onClose = () => {
logger.debug("MCP transport lost, triggering reconnect", { path: `mcp:${name}` });
void this.reconnectServer(name);
};
try {
const serverTools = await listTools(connection);
const reconnect = () => this.reconnectServer(name);
const customTools = MCPTool.fromTools(connection, serverTools, reconnect);
void this.toolCache?.set(name, config, serverTools);
this.#replaceServerTools(name, customTools);
this.#onToolsChanged?.(this.#tools);
void this.#loadServerResourcesAndPrompts(name, connection);
return connection;
} catch (error) {
connection.transport.onClose = undefined;
await connection.transport.close().catch(() => {});
this.#connections.delete(name);
throw error;
}
}
* Best-effort loading of resources, resource subscriptions, and prompts.
* Shared between initial connection and reconnection.
*/
async #loadServerResourcesAndPrompts(name: string, connection: MCPServerConnection): Promise<void> {
if (serverSupportsResources(connection.capabilities)) {
try {
const [resources] = await Promise.all([listResources(connection), listResourceTemplates(connection)]);
if (this.#notificationsEnabled && connection.capabilities.resources?.subscribe) {
const uris = resources.map(r => r.uri);
const notificationEpoch = this.#notificationsEpoch;
this.#subscribeAndTrack(name, connection, uris, notificationEpoch);
}
} catch (error) {
logger.debug("Failed to load MCP resources", { path: `mcp:${name}`, error });
}
}
if (serverSupportsPrompts(connection.capabilities)) {
try {
await listPrompts(connection);
this.#onPromptsChanged?.(name);
} catch (error) {
logger.debug("Failed to load MCP prompts", { path: `mcp:${name}`, error });
}
}
}
* Refresh tools from a specific server.
*/
async refreshServerTools(name: string): Promise<void> {
const connection = this.#connections.get(name);
if (!connection) return;
connection.tools = undefined;
const serverTools = await listTools(connection);
const reconnect = () => this.reconnectServer(name);
const customTools = MCPTool.fromTools(connection, serverTools, reconnect);
void this.toolCache?.set(name, connection.config, serverTools);
this.#replaceServerTools(name, customTools);
this.#onToolsChanged?.(this.#tools);
}
* Refresh tools from all servers.
*/
async refreshAllTools(): Promise<void> {
const promises = Array.from(this.#connections.keys()).map(name => this.refreshServerTools(name));
await Promise.allSettled(promises);
}
* Refresh resources from a specific server.
*/
async refreshServerResources(name: string): Promise<void> {
const connection = this.#connections.get(name);
if (!connection || !serverSupportsResources(connection.capabilities)) return;
const existing = this.#pendingResourceRefresh.get(name);
if (existing && existing.connection === connection) return existing.promise;
const doRefresh = async (): Promise<void> => {
connection.resources = undefined;
connection.resourceTemplates = undefined;
const [resources] = await Promise.all([listResources(connection), listResourceTemplates(connection)]);
if (this.#notificationsEnabled && connection.capabilities.resources?.subscribe) {
const newUris = new Set(resources.map(r => r.uri));
const oldUris = this.#subscribedResources.get(name);
const notificationEpoch = this.#notificationsEpoch;
if (oldUris) {
const removed = [...oldUris].filter(uri => !newUris.has(uri));
if (removed.length > 0) {
try {
await unsubscribeFromResources(connection, removed);
} catch (error) {
logger.debug("Failed to unsubscribe stale MCP resources", { path: `mcp:${name}`, error });
}
}
}
try {
const allUris = [...newUris];
await subscribeToResources(connection, allUris);
const action = resolveSubscriptionPostAction(
this.#notificationsEnabled,
this.#notificationsEpoch,
notificationEpoch,
);
if (action === "rollback") {
await unsubscribeFromResources(connection, allUris).catch(error => {
logger.debug("Failed to rollback stale MCP resource subscription", { path: `mcp:${name}`, error });
});
return;
}
if (action === "ignore") {
return;
}
this.#subscribedResources.set(name, newUris);
} catch (error) {
logger.debug("Failed to re-subscribe to MCP resources", { path: `mcp:${name}`, error });
}
}
};
const promise = doRefresh().finally(() => {
const pending = this.#pendingResourceRefresh.get(name);
if (pending?.promise === promise) {
this.#pendingResourceRefresh.delete(name);
}
});
this.#pendingResourceRefresh.set(name, { connection, promise });
return promise;
}
* Refresh prompts from a specific server.
*/
async refreshServerPrompts(name: string): Promise<void> {
const connection = this.#connections.get(name);
if (!connection || !serverSupportsPrompts(connection.capabilities)) return;
connection.prompts = undefined;
await listPrompts(connection);
this.#onPromptsChanged?.(name);
}
* Get resources and templates for a specific server.
*/
getServerResources(name: string): { resources: MCPResource[]; templates: MCPResourceTemplate[] } | undefined {
const connection = this.#connections.get(name);
if (!connection) return undefined;
return {
resources: connection.resources ?? [],
templates: connection.resourceTemplates ?? [],
};
}
* Read a specific resource from a server.
*/
async readServerResource(
name: string,
uri: string,
options?: MCPRequestOptions,
): Promise<MCPResourceReadResult | undefined> {
const connection = this.#connections.get(name);
if (!connection) return undefined;
return readResource(connection, uri, options);
}
* Get prompts for a specific server.
*/
getServerPrompts(name: string): MCPPrompt[] | undefined {
const connection = this.#connections.get(name);
if (!connection) return undefined;
return connection.prompts ?? [];
}
* Get a specific prompt from a server.
*/
async executePrompt(
name: string,
promptName: string,
args?: Record<string, string>,
options?: MCPRequestOptions,
): Promise<MCPGetPromptResult | undefined> {
const connection = this.#connections.get(name);
if (!connection) return undefined;
return getPrompt(connection, promptName, args, options);
}
* Get all server instructions (for system prompt injection).
*/
getServerInstructions(): Map<string, string> {
const instructions = new Map<string, string>();
for (const [name, connection] of this.#connections) {
if (connection.instructions) {
instructions.set(name, connection.instructions);
}
}
return instructions;
}
* Get notification state for display.
*/
getNotificationState(): { enabled: boolean; subscriptions: Map<string, ReadonlySet<string>> } {
return {
enabled: this.#notificationsEnabled,
subscriptions: this.#subscribedResources as Map<string, ReadonlySet<string>>,
};
}
* Resolve OAuth credentials and shell commands in config.
*/
async #resolveAuthConfig(config: MCPServerConfig, forceRefresh = false): Promise<MCPServerConfig> {
let resolved: MCPServerConfig = { ...config };
const auth = config.auth;
if (auth?.type === "oauth" && auth.credentialId && this.#authStorage) {
const credentialId = auth.credentialId;
try {
let credential = this.#authStorage.get(credentialId);
if (credential?.type === "oauth") {
const REFRESH_BUFFER_MS = 5 * 60_000;
const shouldRefresh =
forceRefresh || (credential.expires && Date.now() >= credential.expires - REFRESH_BUFFER_MS);
if (shouldRefresh && credential.refresh && auth.tokenUrl) {
try {
const refreshed = await refreshMCPOAuthToken(
auth.tokenUrl,
credential.refresh,
auth.clientId,
auth.clientSecret,
);
const refreshedCredential = { type: "oauth" as const, ...refreshed };
await this.#authStorage.set(credentialId, refreshedCredential);
credential = refreshedCredential;
} catch (refreshError) {
logger.warn("MCP OAuth refresh failed, using existing token", {
credentialId,
error: refreshError,
});
}
}
if (resolved.type === "http" || resolved.type === "sse") {
resolved = {
...resolved,
headers: {
...resolved.headers,
Authorization: `Bearer ${credential.access}`,
},
};
} else {
resolved = {
...resolved,
env: {
...resolved.env,
OAUTH_ACCESS_TOKEN: credential.access,
},
};
}
}
} catch (error) {
logger.warn("Failed to resolve OAuth credential", { credentialId, error });
}
}
if (resolved.type !== "http" && resolved.type !== "sse") {
if (resolved.env) {
const nextEnv: Record<string, string> = {};
for (const [key, value] of Object.entries(resolved.env)) {
const resolvedValue = await resolveConfigValue(value);
if (resolvedValue) nextEnv[key] = resolvedValue;
}
resolved = { ...resolved, env: nextEnv };
}
} else {
if (resolved.headers) {
const nextHeaders: Record<string, string> = {};
for (const [key, value] of Object.entries(resolved.headers)) {
const resolvedValue = await resolveConfigValue(value);
if (resolvedValue) nextHeaders[key] = resolvedValue;
}
resolved = { ...resolved, headers: nextHeaders };
}
}
return resolved;
}
}
* Create an MCP manager and discover servers.
* Convenience function for quick setup.
*/
export async function createMCPManager(
cwd: string,
options?: MCPDiscoverOptions,
): Promise<{
manager: MCPManager;
result: MCPLoadResult;
}> {
const manager = new MCPManager(cwd);
const result = await manager.discoverAndConnect(options);
return { manager, result };
}