import { isEnoent, logger, ptree, untilAborted } from "@oh-my-pi/pi-utils";
import { ToolAbortError, throwIfAborted } from "../tools/tool-errors";
import { applyWorkspaceEdit } from "./edits";
import { getLspmuxCommand, isLspmuxSupported } from "./lspmux";
import type {
LspClient,
LspJsonRpcNotification,
LspJsonRpcRequest,
LspJsonRpcResponse,
PublishDiagnosticsParams,
ServerConfig,
WorkspaceEdit,
} from "./types";
import { detectLanguageId, fileToUri } from "./utils";
const clients = new Map<string, LspClient>();
const clientLocks = new Map<string, Promise<LspClient>>();
const fileOperationLocks = new Map<string, Promise<void>>();
let idleTimeoutMs: number | null = null;
let idleCheckInterval: NodeJS.Timeout | null = null;
const IDLE_CHECK_INTERVAL_MS = 60 * 1000;
* Configure the idle timeout for LSP clients.
* @param ms - Timeout in milliseconds, or null/undefined to disable
*/
export function setIdleTimeout(ms: number | null | undefined): void {
idleTimeoutMs = ms ?? null;
if (idleTimeoutMs && idleTimeoutMs > 0) {
startIdleChecker();
} else {
stopIdleChecker();
}
}
function startIdleChecker(): void {
if (idleCheckInterval) return;
idleCheckInterval = setInterval(() => {
if (!idleTimeoutMs) return;
const now = Date.now();
for (const [key, client] of Array.from(clients.entries())) {
if (now - client.lastActivity > idleTimeoutMs) {
void shutdownClient(key);
}
}
}, IDLE_CHECK_INTERVAL_MS);
}
function stopIdleChecker(): void {
if (idleCheckInterval) {
clearInterval(idleCheckInterval);
idleCheckInterval = null;
}
}
const CLIENT_CAPABILITIES = {
textDocument: {
synchronization: {
didSave: true,
dynamicRegistration: false,
willSave: false,
willSaveWaitUntil: false,
},
hover: {
contentFormat: ["markdown", "plaintext"],
dynamicRegistration: false,
},
definition: {
dynamicRegistration: false,
linkSupport: true,
},
typeDefinition: {
dynamicRegistration: false,
linkSupport: true,
},
implementation: {
dynamicRegistration: false,
linkSupport: true,
},
references: {
dynamicRegistration: false,
},
documentSymbol: {
dynamicRegistration: false,
hierarchicalDocumentSymbolSupport: true,
symbolKind: {
valueSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
},
},
rename: {
dynamicRegistration: false,
prepareSupport: true,
},
codeAction: {
dynamicRegistration: false,
codeActionLiteralSupport: {
codeActionKind: {
valueSet: [
"quickfix",
"refactor",
"refactor.extract",
"refactor.inline",
"refactor.rewrite",
"source",
"source.organizeImports",
"source.fixAll",
],
},
},
resolveSupport: {
properties: ["edit"],
},
},
formatting: {
dynamicRegistration: false,
},
rangeFormatting: {
dynamicRegistration: false,
},
publishDiagnostics: {
relatedInformation: true,
versionSupport: true,
tagSupport: { valueSet: [1, 2] },
codeDescriptionSupport: true,
dataSupport: true,
},
},
window: {
workDoneProgress: true,
},
workspace: {
applyEdit: true,
workspaceEdit: {
documentChanges: true,
resourceOperations: ["create", "rename", "delete"],
failureHandling: "textOnlyTransactional",
},
configuration: true,
symbol: {
dynamicRegistration: false,
symbolKind: {
valueSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
},
},
fileOperations: {
dynamicRegistration: false,
willCreate: false,
didCreate: false,
willRename: true,
didRename: true,
willDelete: false,
didDelete: false,
},
},
experimental: {
snippetTextEdit: true,
},
};
* Parse a single LSP message from a buffer.
* Returns the parsed message and remaining buffer, or null if incomplete.
*/
function parseMessage(
buffer: Buffer,
): { message: LspJsonRpcResponse | LspJsonRpcNotification; remaining: Buffer } | null {
const headerEndIndex = findHeaderEnd(buffer);
if (headerEndIndex === -1) return null;
const headerText = new TextDecoder().decode(buffer.slice(0, headerEndIndex));
const contentLengthMatch = headerText.match(/Content-Length: (\d+)/i);
if (!contentLengthMatch) return null;
const contentLength = Number.parseInt(contentLengthMatch[1], 10);
const messageStart = headerEndIndex + 4;
const messageEnd = messageStart + contentLength;
if (buffer.length < messageEnd) return null;
const messageBytes = buffer.subarray(messageStart, messageEnd);
const messageText = new TextDecoder().decode(messageBytes);
const remaining = buffer.subarray(messageEnd);
return {
message: JSON.parse(messageText),
remaining,
};
}
* Find the end of the header section (before \r\n\r\n)
*/
function findHeaderEnd(buffer: Uint8Array): number {
for (let i = 0; i < buffer.length - 3; i++) {
if (buffer[i] === 13 && buffer[i + 1] === 10 && buffer[i + 2] === 13 && buffer[i + 3] === 10) {
return i;
}
}
return -1;
}
async function writeMessage(
sink: Bun.FileSink,
message: LspJsonRpcRequest | LspJsonRpcNotification | LspJsonRpcResponse,
): Promise<void> {
const content = JSON.stringify(message);
sink.write(`Content-Length: ${Buffer.byteLength(content, "utf-8")}\r\n\r\n${content}`);
await sink.flush();
}
function queueWriteMessage(
client: LspClient,
message: LspJsonRpcRequest | LspJsonRpcNotification | LspJsonRpcResponse,
): Promise<void> {
const write = client.writeQueue.catch(() => {}).then(() => writeMessage(client.proc.stdin, message));
client.writeQueue = write.catch(() => {});
return write;
}
* Start background message reader for a client.
* Routes responses to pending requests and handles notifications.
*/
async function startMessageReader(client: LspClient): Promise<void> {
if (client.isReading) return;
client.isReading = true;
const reader = (client.proc.stdout as ReadableStream<Uint8Array>).getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const currentBuffer: Buffer = Buffer.concat([client.messageBuffer, value]);
client.messageBuffer = currentBuffer;
let workingBuffer = currentBuffer;
let parsed = parseMessage(workingBuffer);
while (parsed) {
const { message, remaining } = parsed;
workingBuffer = remaining;
if ("id" in message && message.id !== undefined) {
const pending = client.pendingRequests.get(message.id);
if (pending) {
client.pendingRequests.delete(message.id);
if ("error" in message && message.error) {
pending.reject(new Error(`LSP error: ${message.error.message}`));
} else {
pending.resolve(message.result);
}
} else if ("method" in message) {
await handleServerRequest(client, message as LspJsonRpcRequest);
}
} else if ("method" in message) {
if (message.method === "textDocument/publishDiagnostics" && message.params) {
const params = message.params as PublishDiagnosticsParams;
client.diagnostics.set(params.uri, {
diagnostics: params.diagnostics,
version: params.version ?? null,
});
client.diagnosticsVersion += 1;
} else if (message.method === "$/progress" && message.params) {
const params = message.params as { token: string | number; value?: { kind?: string } };
if (params.value?.kind === "begin") {
client.activeProgressTokens.add(params.token);
} else if (params.value?.kind === "end") {
client.activeProgressTokens.delete(params.token);
if (client.activeProgressTokens.size === 0) {
client.resolveProjectLoaded();
}
}
}
}
parsed = parseMessage(workingBuffer);
}
client.messageBuffer = workingBuffer;
}
} catch (err) {
for (const pending of Array.from(client.pendingRequests.values())) {
pending.reject(new Error(`LSP connection closed: ${err}`));
}
client.pendingRequests.clear();
} finally {
reader.releaseLock();
client.isReading = false;
}
}
* Handle workspace/configuration requests from the server.
*/
async function handleConfigurationRequest(client: LspClient, message: LspJsonRpcRequest): Promise<void> {
if (typeof message.id !== "number") return;
const params = message.params as { items?: Array<{ section?: string }> };
const items = params?.items ?? [];
const result = items.map(item => {
const section = item.section ?? "";
return client.config.settings?.[section] ?? {};
});
await sendResponse(client, message.id, result, "workspace/configuration");
}
* Handle workspace/applyEdit requests from the server.
*/
async function handleApplyEditRequest(client: LspClient, message: LspJsonRpcRequest): Promise<void> {
if (typeof message.id !== "number") return;
const params = message.params as { edit?: WorkspaceEdit };
if (!params?.edit) {
await sendResponse(
client,
message.id,
{ applied: false, failureReason: "No edit provided" },
"workspace/applyEdit",
);
return;
}
try {
await applyWorkspaceEdit(params.edit, client.cwd);
await sendResponse(client, message.id, { applied: true }, "workspace/applyEdit");
} catch (err) {
await sendResponse(client, message.id, { applied: false, failureReason: String(err) }, "workspace/applyEdit");
}
}
* Respond to a server-initiated request.
*/
async function handleServerRequest(client: LspClient, message: LspJsonRpcRequest): Promise<void> {
if (message.method === "workspace/configuration") {
await handleConfigurationRequest(client, message);
return;
}
if (message.method === "workspace/applyEdit") {
await handleApplyEditRequest(client, message);
return;
}
if (message.method === "window/workDoneProgress/create") {
if (typeof message.id === "number") {
await sendResponse(client, message.id, null, message.method);
}
return;
}
if (typeof message.id !== "number") return;
await sendResponse(client, message.id, null, message.method, {
code: -32601,
message: `Method not found: ${message.method}`,
});
}
* Send an LSP response to the server.
*/
async function sendResponse(
client: LspClient,
id: number,
result: unknown,
method: string,
error?: { code: number; message: string; data?: unknown },
): Promise<void> {
const response: LspJsonRpcResponse = {
jsonrpc: "2.0",
id,
...(error ? { error } : { result }),
};
try {
await queueWriteMessage(client, response);
} catch (err) {
logger.error("LSP failed to respond.", { method, error: String(err) });
}
}
export const WARMUP_TIMEOUT_MS = 5000;
const PROJECT_LOAD_TIMEOUT_MS = 15_000;
const SHUTDOWN_TIMEOUT_MS = 5_000;
const EXIT_TIMEOUT_MS = 1_000;
* Get or create an LSP client for the given server configuration and working directory.
* @param config - Server configuration
* @param cwd - Working directory
* @param initTimeoutMs - Optional timeout for the initialize request (defaults to 30s)
*/
export async function getOrCreateClient(config: ServerConfig, cwd: string, initTimeoutMs?: number): Promise<LspClient> {
const key = `${config.command}:${cwd}`;
const existingClient = clients.get(key);
if (existingClient) {
existingClient.lastActivity = Date.now();
return existingClient;
}
const existingLock = clientLocks.get(key);
if (existingLock) {
return existingLock;
}
const clientPromise = (async () => {
const baseCommand = config.resolvedCommand ?? config.command;
const baseArgs = config.args ?? [];
const { command, args, env } = isLspmuxSupported(baseCommand)
? await getLspmuxCommand(baseCommand, baseArgs)
: { command: baseCommand, args: baseArgs };
const proc = ptree.spawn([command, ...args], {
cwd,
stdin: "pipe",
env: env ? { ...Bun.env, ...env } : undefined,
});
let resolveProjectLoaded!: () => void;
const projectLoaded = new Promise<void>(resolve => {
resolveProjectLoaded = resolve;
});
const projectLoadTimeout = setTimeout(resolveProjectLoaded, PROJECT_LOAD_TIMEOUT_MS);
const originalResolve = resolveProjectLoaded;
resolveProjectLoaded = () => {
clearTimeout(projectLoadTimeout);
originalResolve();
};
const client: LspClient = {
name: key,
cwd,
proc,
config,
requestId: 0,
diagnostics: new Map(),
diagnosticsVersion: 0,
openFiles: new Map(),
pendingRequests: new Map(),
messageBuffer: new Uint8Array(0),
isReading: false,
lastActivity: Date.now(),
writeQueue: Promise.resolve(),
activeProgressTokens: new Set(),
projectLoaded,
resolveProjectLoaded,
};
clients.set(key, client);
proc.exited.then(() => {
clients.delete(key);
clientLocks.delete(key);
client.resolveProjectLoaded();
if (client.pendingRequests.size > 0) {
const rawStderr = proc.peekStderr().trim();
const stderr = rawStderr
.split("\n")
.filter(line => !/^\[\d{2}:\d{2}:\d{2} (?:INF|DBG|VRB)\]/.test(line))
.join("\n")
.trim();
const code = proc.exitCode;
const err = new Error(
stderr ? `LSP server exited (code ${code}): ${stderr}` : `LSP server exited unexpectedly (code ${code})`,
);
for (const pending of client.pendingRequests.values()) {
pending.reject(err);
}
client.pendingRequests.clear();
}
});
startMessageReader(client);
try {
const initResult = (await sendRequest(
client,
"initialize",
{
processId: process.pid,
rootUri: fileToUri(cwd),
rootPath: cwd,
capabilities: CLIENT_CAPABILITIES,
initializationOptions: config.initOptions ?? {},
workspaceFolders: [{ uri: fileToUri(cwd), name: cwd.split("/").pop() ?? "workspace" }],
},
undefined,
initTimeoutMs,
)) as { capabilities?: unknown };
if (!initResult) {
throw new Error("Failed to initialize LSP: no response");
}
client.serverCapabilities = initResult.capabilities as LspClient["serverCapabilities"];
await sendNotification(client, "initialized", {});
return client;
} catch (err) {
clients.delete(key);
clientLocks.delete(key);
proc.kill();
throw err;
} finally {
clientLocks.delete(key);
}
})();
clientLocks.set(key, clientPromise);
return clientPromise;
}
* Ensure a file is opened in the LSP client.
* Sends didOpen notification if the file is not already tracked.
*/
export async function ensureFileOpen(client: LspClient, filePath: string, signal?: AbortSignal): Promise<void> {
throwIfAborted(signal);
const uri = fileToUri(filePath);
const lockKey = `${client.name}:${uri}`;
if (client.openFiles.has(uri)) {
return;
}
const existingLock = fileOperationLocks.get(lockKey);
if (existingLock) {
await untilAborted(signal, () => existingLock);
return;
}
const openPromise = (async () => {
throwIfAborted(signal);
if (client.openFiles.has(uri)) {
return;
}
let content: string;
try {
content = await Bun.file(filePath).text();
throwIfAborted(signal);
} catch (err) {
if (isEnoent(err)) return;
throw err;
}
const languageId = detectLanguageId(filePath);
throwIfAborted(signal);
await sendNotification(client, "textDocument/didOpen", {
textDocument: {
uri,
languageId,
version: 1,
text: content,
},
});
client.openFiles.set(uri, { version: 1, languageId });
client.lastActivity = Date.now();
})();
fileOperationLocks.set(lockKey, openPromise);
try {
await openPromise;
} finally {
fileOperationLocks.delete(lockKey);
}
}
* Wait for the server's initial project loading to complete.
* Races the server's $/progress tracking against the abort signal.
* Returns immediately if loading already completed or timed out.
*/
export async function waitForProjectLoaded(client: LspClient, signal?: AbortSignal): Promise<void> {
if (signal?.aborted) return;
await Promise.race([
client.projectLoaded,
...(signal
? [new Promise<void>(resolve => signal.addEventListener("abort", () => resolve(), { once: true }))]
: []),
]);
}
* Sync in-memory content to the LSP client without reading from disk.
* Use this to provide instant feedback during edits before the file is saved.
*/
export async function syncContent(
client: LspClient,
filePath: string,
content: string,
signal?: AbortSignal,
): Promise<void> {
const uri = fileToUri(filePath);
const lockKey = `${client.name}:${uri}`;
throwIfAborted(signal);
const existingLock = fileOperationLocks.get(lockKey);
if (existingLock) {
await untilAborted(signal, () => existingLock);
}
const syncPromise = (async () => {
client.diagnostics.delete(uri);
const info = client.openFiles.get(uri);
if (!info) {
const languageId = detectLanguageId(filePath);
throwIfAborted(signal);
await sendNotification(client, "textDocument/didOpen", {
textDocument: {
uri,
languageId,
version: 1,
text: content,
},
});
client.openFiles.set(uri, { version: 1, languageId });
client.lastActivity = Date.now();
return;
}
const version = ++info.version;
throwIfAborted(signal);
await sendNotification(client, "textDocument/didChange", {
textDocument: { uri, version },
contentChanges: [{ text: content }],
});
client.lastActivity = Date.now();
})();
fileOperationLocks.set(lockKey, syncPromise);
try {
await syncPromise;
} finally {
fileOperationLocks.delete(lockKey);
}
}
* Notify LSP that a file was saved.
* Assumes content was already synced via syncContent - just sends didSave.
*/
export async function notifySaved(client: LspClient, filePath: string, signal?: AbortSignal): Promise<void> {
const uri = fileToUri(filePath);
const info = client.openFiles.get(uri);
if (!info) return;
throwIfAborted(signal);
await sendNotification(client, "textDocument/didSave", {
textDocument: { uri },
});
client.lastActivity = Date.now();
}
* Refresh a file in the LSP client.
* Increments version, sends didChange and didSave notifications.
*/
export async function refreshFile(client: LspClient, filePath: string, signal?: AbortSignal): Promise<void> {
throwIfAborted(signal);
const uri = fileToUri(filePath);
const lockKey = `${client.name}:${uri}`;
const existingLock = fileOperationLocks.get(lockKey);
if (existingLock) {
await untilAborted(signal, () => existingLock);
}
const refreshPromise = (async () => {
throwIfAborted(signal);
client.diagnostics.delete(uri);
const info = client.openFiles.get(uri);
if (!info) {
await ensureFileOpen(client, filePath, signal);
return;
}
let content: string;
try {
content = await Bun.file(filePath).text();
throwIfAborted(signal);
} catch (err) {
if (isEnoent(err)) return;
throw err;
}
const version = ++info.version;
throwIfAborted(signal);
await sendNotification(client, "textDocument/didChange", {
textDocument: { uri, version },
contentChanges: [{ text: content }],
});
throwIfAborted(signal);
await sendNotification(client, "textDocument/didSave", {
textDocument: { uri },
text: content,
});
client.lastActivity = Date.now();
})();
fileOperationLocks.set(lockKey, refreshPromise);
try {
await refreshPromise;
} finally {
fileOperationLocks.delete(lockKey);
}
}
async function waitForExit(client: LspClient, timeoutMs: number): Promise<boolean> {
return await Promise.race([
client.proc.exited.then(
() => true,
() => true,
),
Bun.sleep(timeoutMs).then(() => false),
]);
}
* Shutdown a specific client instance using the LSP shutdown/exit handshake.
*/
async function shutdownClientInstance(client: LspClient): Promise<void> {
const err = new Error("LSP client shutdown");
for (const pending of Array.from(client.pendingRequests.values())) {
pending.reject(err);
}
client.pendingRequests.clear();
const shutdownCompleted = await sendRequest(client, "shutdown", null, undefined, SHUTDOWN_TIMEOUT_MS).then(
() => true,
() => false,
);
if (shutdownCompleted) {
await sendNotification(client, "exit", undefined).catch(() => {});
if (await waitForExit(client, EXIT_TIMEOUT_MS)) return;
}
client.proc.kill();
await waitForExit(client, EXIT_TIMEOUT_MS);
}
* Shutdown a specific client by key.
*/
export async function shutdownClient(key: string): Promise<void> {
const client = clients.get(key);
if (!client) return;
clients.delete(key);
await shutdownClientInstance(client);
}
const DEFAULT_REQUEST_TIMEOUT_MS = 30000;
* Send an LSP request and wait for response.
*/
export async function sendRequest(
client: LspClient,
method: string,
params: unknown,
signal?: AbortSignal,
timeoutMs: number = DEFAULT_REQUEST_TIMEOUT_MS,
): Promise<unknown> {
const id = ++client.requestId;
if (signal?.aborted) {
const reason = signal.reason instanceof Error ? signal.reason : new ToolAbortError();
return Promise.reject(reason);
}
const request: LspJsonRpcRequest = {
jsonrpc: "2.0",
id,
method,
params,
};
client.lastActivity = Date.now();
const { promise, resolve, reject } = Promise.withResolvers<unknown>();
let timeout: NodeJS.Timeout | undefined;
const cleanup = () => {
if (signal) {
signal.removeEventListener("abort", abortHandler);
}
};
const abortHandler = () => {
if (client.pendingRequests.has(id)) {
client.pendingRequests.delete(id);
}
void sendNotification(client, "$/cancelRequest", { id }).catch(() => {});
if (timeout) clearTimeout(timeout);
cleanup();
const reason = signal?.reason instanceof Error ? signal.reason : new ToolAbortError();
reject(reason);
};
timeout = setTimeout(() => {
if (client.pendingRequests.has(id)) {
client.pendingRequests.delete(id);
const err = new Error(`LSP request ${method} timed out after ${timeoutMs}ms`);
cleanup();
reject(err);
}
}, timeoutMs);
if (signal) {
signal.addEventListener("abort", abortHandler, { once: true });
if (signal.aborted) {
abortHandler();
return promise;
}
}
client.pendingRequests.set(id, {
resolve: result => {
if (timeout) clearTimeout(timeout);
cleanup();
resolve(result);
},
reject: err => {
if (timeout) clearTimeout(timeout);
cleanup();
reject(err);
},
method,
});
queueWriteMessage(client, request).catch(err => {
if (timeout) clearTimeout(timeout);
client.pendingRequests.delete(id);
cleanup();
reject(err);
});
return promise;
}
* Send an LSP notification (no response expected).
*/
export async function sendNotification(client: LspClient, method: string, params: unknown): Promise<void> {
const notification: LspJsonRpcNotification = {
jsonrpc: "2.0",
method,
params,
};
client.lastActivity = Date.now();
await queueWriteMessage(client, notification);
}
* Shutdown all LSP clients.
*/
export async function shutdownAll(): Promise<void> {
const clientsToShutdown = Array.from(clients.values());
clients.clear();
await Promise.allSettled(clientsToShutdown.map(client => shutdownClientInstance(client)));
}
export interface LspServerStatus {
name: string;
status: "connecting" | "ready" | "error";
fileTypes: string[];
error?: string;
}
* Get status of all active LSP clients.
*/
export function getActiveClients(): LspServerStatus[] {
return Array.from(clients.values()).map(client => ({
name: client.config.command,
status: "ready" as const,
fileTypes: client.config.fileTypes,
}));
}
if (typeof process !== "undefined") {
process.on("beforeExit", () => {
void shutdownAll();
});
process.on("SIGINT", () => {
void (async () => {
await shutdownAll();
process.exit(0);
})();
});
process.on("SIGTERM", () => {
void (async () => {
await shutdownAll();
process.exit(0);
})();
});
}