* Session-keyed message store.
*
* Holds per-session state in a Map keyed by sessionId.
* Session switch = change activeSessionId pointer. No clearing. Old data stays.
* WebSocket handler = store.appendRealtime(msg.sessionId, msg). One line.
* No localStorage for messages. Backend JSONL is the source of truth.
*/
import { useCallback, useMemo, useRef, useState } from 'react';
import type { SessionProvider } from '../types/app';
import { authenticatedFetch } from '../utils/api';
export type MessageKind =
| 'text'
| 'tool_use'
| 'tool_result'
| 'thinking'
| 'stream_delta'
| 'stream_end'
| 'error'
| 'complete'
| 'status'
| 'permission_request'
| 'permission_cancelled'
| 'session_created'
| 'interactive_prompt'
| 'task_notification'
| 'interrupted'
| 'compact_boundary'
| 'agent_activity'
| 'agent_activity_summary';
export interface CompactProgress {
level: number;
stage: string;
label: string;
state: 'started' | 'running' | 'failed' | 'completed';
pre_tokens?: number;
reason?: string;
}
export interface NormalizedMessage {
id: string;
sessionId: string;
timestamp: string;
provider: SessionProvider;
kind: MessageKind;
role?: 'user' | 'assistant';
content?: string;
images?: string[];
attachments?: Array<{
name: string;
path?: string;
size?: number;
mimeType?: string;
}>;
toolName?: string;
toolInput?: unknown;
toolId?: string;
toolResult?: { content: string; isError: boolean; toolUseResult?: unknown } | null;
* Inline image payloads attached to a `tool_result` frame (e.g. `read_file`
* on a PNG). Object shape with `data` (data URL) and optional `mimeType` —
* distinct from `images?: string[]` above, which carries user-message
* upload data URLs. The bridge wraps gateway base64 as data URLs upstream
* so the UI can drop these straight into `<img src>` without re-parsing.
*/
toolResultImages?: Array<{ data: string; mimeType?: string; name?: string }>;
isError?: boolean;
* `PilotDeckToolErrorCode` from the gateway when `kind === 'tool_result'`
* and `isError === true` — flat on the frame because the bridge merges
* `tool_call_finished.errorCode` here verbatim. See
* `pilotdeck-bridge.js#tool_call_finished` and `chatPermissions.ts`.
*/
errorCode?: string;
text?: string;
tokens?: number;
canInterrupt?: boolean;
compactProgress?: CompactProgress;
tokenBudget?: unknown;
requestId?: string;
input?: unknown;
context?: unknown;
newSessionId?: string;
status?: string;
summary?: string;
exitCode?: number;
actualSessionId?: string;
parentToolUseId?: string;
subagentTools?: unknown[];
taskId?: string;
outputFile?: string;
taskResult?: string;
trigger?: string;
preTokens?: number;
compactLevel?: number;
compactStage?: string;
compactStageLabel?: string;
compactMetadata?: unknown;
runId?: string;
activityId?: string;
phase?: string;
state?: string;
title?: string;
detail?: string;
startedAt?: string;
endedAt?: string | null;
durationMs?: number | null;
severity?: string;
toolCallCount?: number;
toolErrorCount?: number;
ragSearchCount?: number;
compactCount?: number;
editedFileCount?: number;
exploredFileCount?: number;
commandCount?: number;
subagentCount?: number;
thinkingCount?: number;
otherToolCount?: number;
keySteps?: unknown[];
isFinal?: boolean;
sequence?: number;
rowid?: number;
serverTailIdAtStart?: string;
}
export type SessionStatus = 'idle' | 'loading' | 'streaming' | 'error';
export interface SessionSlot {
serverMessages: NormalizedMessage[];
realtimeMessages: NormalizedMessage[];
activityMessages: NormalizedMessage[];
merged: NormalizedMessage[];
_lastServerRef: NormalizedMessage[];
_lastRealtimeRef: NormalizedMessage[];
status: SessionStatus;
fetchedAt: number;
lastError: string | null;
total: number;
hasMore: boolean;
offset: number;
tokenUsage: unknown;
}
const EMPTY: NormalizedMessage[] = [];
function createEmptySlot(): SessionSlot {
return {
serverMessages: EMPTY,
realtimeMessages: EMPTY,
activityMessages: EMPTY,
merged: EMPTY,
_lastServerRef: EMPTY,
_lastRealtimeRef: EMPTY,
status: 'idle',
fetchedAt: 0,
lastError: null,
total: 0,
hasMore: false,
offset: 0,
tokenUsage: null,
};
}
function normalizeRealtimeText(value?: string): string {
return typeof value === 'string' ? value.replace(/\s+/g, ' ').trim() : '';
}
function parseTimestampMs(value?: string): number | null {
if (!value) return null;
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : null;
}
function isConfirmedUserMessageDuplicate(
realtimeMessage: NormalizedMessage,
serverMessages: NormalizedMessage[],
): boolean {
if (
realtimeMessage.kind !== 'text'
|| realtimeMessage.role !== 'user'
|| !realtimeMessage.id.startsWith('local_')
) {
return false;
}
const realtimeText = normalizeRealtimeText(realtimeMessage.content);
if (!realtimeText) return false;
const realtimeTimestamp = parseTimestampMs(realtimeMessage.timestamp);
return serverMessages.some((serverMessage) => {
if (serverMessage.kind !== 'text' || serverMessage.role !== 'user') {
return false;
}
if (normalizeRealtimeText(serverMessage.content) !== realtimeText) {
return false;
}
if (realtimeTimestamp == null) {
return true;
}
const serverTimestamp = parseTimestampMs(serverMessage.timestamp);
if (serverTimestamp == null) {
return true;
}
return Math.abs(serverTimestamp - realtimeTimestamp) <= 10_000;
});
}
* The backend pushes a synthetic `interrupted` notice the moment abort fires
* "[Request interrupted by user]" entry into the JSONL during the next user
* turn. Once that JSONL entry is replayed via the server, drop the locally
* pushed one to avoid stacking two dividers in the conversation.
*/
function isLocalInterruptDuplicate(
realtimeMessage: NormalizedMessage,
serverMessages: NormalizedMessage[],
): boolean {
if (
realtimeMessage.kind !== 'interrupted'
|| !realtimeMessage.id.startsWith('local_interrupt_')
) {
return false;
}
const realtimeTimestamp = parseTimestampMs(realtimeMessage.timestamp);
return serverMessages.some((serverMessage) => {
if (serverMessage.kind !== 'interrupted') return false;
if (realtimeTimestamp == null) return true;
const serverTimestamp = parseTimestampMs(serverMessage.timestamp);
if (serverTimestamp == null) return true;
return Math.abs(serverTimestamp - realtimeTimestamp) <= 30 * 60_000;
});
}
* Compute merged messages: server + realtime, deduped by id.
* Server messages take priority (they're the persisted source of truth).
* Realtime messages that aren't yet in server stay (in-flight streaming).
*/
function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] {
if (realtime.length === 0) return server;
if (server.length === 0) return realtime;
const serverIds = new Set(server.map(m => m.id));
const extra = realtime.filter((message) => {
if (serverIds.has(message.id)) return false;
if (isConfirmedUserMessageDuplicate(message, server)) return false;
if (isLocalInterruptDuplicate(message, server)) return false;
return true;
});
if (extra.length === 0) return server;
const streamIdx = extra.findIndex(m => m.id.startsWith('__streaming_'));
if (streamIdx >= 0 && server.length > 0) {
const lastServer = server[server.length - 1];
const streamMsg = extra[streamIdx];
const isAssistantText = lastServer.kind === 'text' && lastServer.role === 'assistant';
const tailIdChanged = streamMsg.serverTailIdAtStart !== undefined
&& lastServer.id !== streamMsg.serverTailIdAtStart;
if (isAssistantText && tailIdChanged) {
return [...server.slice(0, -1), ...extra];
}
}
return [...server, ...extra];
}
function upsertRealtimeMessages(
existing: NormalizedMessage[],
incoming: NormalizedMessage[],
): NormalizedMessage[] {
if (incoming.length === 0) return existing;
const updated = [...existing];
const indexById = new Map(updated.map((message, index) => [message.id, index]));
for (const message of incoming) {
const existingIndex = indexById.get(message.id);
if (existingIndex === undefined) {
indexById.set(message.id, updated.length);
updated.push(message);
} else {
updated[existingIndex] = message;
}
}
return updated;
}
* Recompute slot.merged only when the input arrays have actually changed
* (by reference). Returns true if merged was recomputed.
*/
function recomputeMergedIfNeeded(slot: SessionSlot): boolean {
if (slot.serverMessages === slot._lastServerRef && slot.realtimeMessages === slot._lastRealtimeRef) {
return false;
}
slot._lastServerRef = slot.serverMessages;
slot._lastRealtimeRef = slot.realtimeMessages;
slot.merged = computeMerged(slot.serverMessages, slot.realtimeMessages);
return true;
}
const STALE_THRESHOLD_MS = 30_000;
const MAX_REALTIME_MESSAGES = 500;
export function useSessionStore() {
const storeRef = useRef(new Map<string, SessionSlot>());
const activeSessionIdRef = useRef<string | null>(null);
const [, setTick] = useState(0);
const notify = useCallback((sessionId: string) => {
if (sessionId === activeSessionIdRef.current) {
setTick(n => n + 1);
}
}, []);
const setActiveSession = useCallback((sessionId: string | null) => {
const changed = activeSessionIdRef.current !== sessionId;
activeSessionIdRef.current = sessionId;
if (changed) {
setTick(n => n + 1);
}
}, []);
const getSlot = useCallback((sessionId: string): SessionSlot => {
const store = storeRef.current;
if (!store.has(sessionId)) {
store.set(sessionId, createEmptySlot());
}
return store.get(sessionId)!;
}, []);
const has = useCallback((sessionId: string) => storeRef.current.has(sessionId), []);
* Fetch messages from the unified endpoint and populate serverMessages.
*/
const fetchFromServer = useCallback(async (
sessionId: string,
opts: {
provider?: SessionProvider;
projectName?: string;
projectPath?: string;
sessionKind?: string;
parentSessionId?: string;
relativeTranscriptPath?: string;
limit?: number | null;
offset?: number;
} = {},
) => {
const slot = getSlot(sessionId);
slot.status = 'loading';
notify(sessionId);
const fetchStartedAt = Date.now();
try {
const params = new URLSearchParams();
if (opts.provider) params.append('provider', opts.provider);
if (opts.projectName) params.append('projectName', opts.projectName);
if (opts.projectPath) params.append('projectPath', opts.projectPath);
if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
if (opts.relativeTranscriptPath) {
params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
}
if (opts.limit !== null && opts.limit !== undefined) {
params.append('limit', String(opts.limit));
params.append('offset', String(opts.offset ?? 0));
}
const qs = params.toString();
const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;
const response = await authenticatedFetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const data = await response.json();
const messages: NormalizedMessage[] = data.messages || [];
slot.serverMessages = messages;
slot.total = data.total ?? messages.length;
slot.hasMore = Boolean(data.hasMore);
slot.offset = (opts.offset ?? 0) + messages.length;
slot.fetchedAt = Date.now();
slot.status = 'idle';
slot.lastError = null;
if (slot.realtimeMessages.length > 0 && messages.length > 0) {
const latestServerTs = messages.reduce(
(max, m) => Math.max(max, Date.parse(m.timestamp) || 0), 0,
);
const watermark = Math.max(fetchStartedAt, latestServerTs);
slot.realtimeMessages = slot.realtimeMessages.filter(m => {
if (m.id.startsWith('__streaming_')) return true;
return (Date.parse(m.timestamp) || 0) > watermark;
});
}
recomputeMergedIfNeeded(slot);
if (data.tokenUsage) {
slot.tokenUsage = data.tokenUsage;
}
notify(sessionId);
return slot;
} catch (error) {
console.error(`[SessionStore] fetch failed for ${sessionId}:`, error);
slot.status = 'error';
slot.lastError = error instanceof Error ? error.message : 'Unknown error';
notify(sessionId);
return slot;
}
}, [getSlot, notify]);
* Load older (paginated) messages and prepend to serverMessages.
*/
const fetchMore = useCallback(async (
sessionId: string,
opts: {
provider?: SessionProvider;
projectName?: string;
projectPath?: string;
sessionKind?: string;
parentSessionId?: string;
relativeTranscriptPath?: string;
limit?: number;
} = {},
) => {
const slot = getSlot(sessionId);
if (!slot.hasMore) return slot;
const params = new URLSearchParams();
if (opts.provider) params.append('provider', opts.provider);
if (opts.projectName) params.append('projectName', opts.projectName);
if (opts.projectPath) params.append('projectPath', opts.projectPath);
if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
if (opts.relativeTranscriptPath) {
params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
}
const limit = opts.limit ?? 20;
params.append('limit', String(limit));
params.append('offset', String(slot.offset));
const qs = params.toString();
const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;
try {
const response = await authenticatedFetch(url);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const data = await response.json();
const olderMessages: NormalizedMessage[] = data.messages || [];
slot.serverMessages = [...olderMessages, ...slot.serverMessages];
slot.hasMore = Boolean(data.hasMore);
slot.offset = slot.offset + olderMessages.length;
recomputeMergedIfNeeded(slot);
notify(sessionId);
return slot;
} catch (error) {
console.error(`[SessionStore] fetchMore failed for ${sessionId}:`, error);
return slot;
}
}, [getSlot, notify]);
* Append a realtime (WebSocket) message to the correct session slot.
* This works regardless of which session is actively viewed.
*/
const appendRealtime = useCallback((sessionId: string, msg: NormalizedMessage) => {
const slot = getSlot(sessionId);
let updated = upsertRealtimeMessages(slot.realtimeMessages, [msg]);
if (updated.length > MAX_REALTIME_MESSAGES) {
updated = updated.slice(-MAX_REALTIME_MESSAGES);
}
slot.realtimeMessages = updated;
recomputeMergedIfNeeded(slot);
notify(sessionId);
}, [getSlot, notify]);
const upsertActivity = useCallback((sessionId: string, msg: NormalizedMessage) => {
const slot = getSlot(sessionId);
const key = msg.activityId || msg.id;
const existingIndex = slot.activityMessages.findIndex((activity) =>
(activity.activityId || activity.id) === key
);
if (existingIndex >= 0) {
const updated = [...slot.activityMessages];
updated[existingIndex] = msg;
slot.activityMessages = updated;
} else {
slot.activityMessages = [...slot.activityMessages, msg];
}
notify(sessionId);
}, [getSlot, notify]);
const setActivities = useCallback((sessionId: string, msgs: NormalizedMessage[]) => {
const slot = getSlot(sessionId);
const byKey = new Map<string, NormalizedMessage>();
for (const msg of msgs) {
if (msg.kind !== 'agent_activity') continue;
byKey.set(msg.activityId || msg.id, msg);
}
slot.activityMessages = Array.from(byKey.values());
notify(sessionId);
}, [getSlot, notify]);
* Append multiple realtime messages at once (batch).
*/
const appendRealtimeBatch = useCallback((sessionId: string, msgs: NormalizedMessage[]) => {
if (msgs.length === 0) return;
const slot = getSlot(sessionId);
let updated = upsertRealtimeMessages(slot.realtimeMessages, msgs);
if (updated.length > MAX_REALTIME_MESSAGES) {
updated = updated.slice(-MAX_REALTIME_MESSAGES);
}
slot.realtimeMessages = updated;
recomputeMergedIfNeeded(slot);
notify(sessionId);
}, [getSlot, notify]);
* Re-fetch serverMessages from the unified endpoint (e.g., on projects_updated).
*/
const refreshFromServer = useCallback(async (
sessionId: string,
opts: {
provider?: SessionProvider;
projectName?: string;
projectPath?: string;
sessionKind?: string;
parentSessionId?: string;
relativeTranscriptPath?: string;
} = {},
) => {
const slot = getSlot(sessionId);
try {
const params = new URLSearchParams();
if (opts.provider) params.append('provider', opts.provider);
if (opts.projectName) params.append('projectName', opts.projectName);
if (opts.projectPath) params.append('projectPath', opts.projectPath);
if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
if (opts.relativeTranscriptPath) {
params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
}
const qs = params.toString();
const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;
const response = await authenticatedFetch(url);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const data = await response.json();
slot.serverMessages = data.messages || [];
slot.total = data.total ?? slot.serverMessages.length;
slot.hasMore = Boolean(data.hasMore);
slot.fetchedAt = Date.now();
slot.realtimeMessages = [];
recomputeMergedIfNeeded(slot);
notify(sessionId);
} catch (error) {
console.error(`[SessionStore] refresh failed for ${sessionId}:`, error);
}
}, [getSlot, notify]);
* Update session status.
*/
const setStatus = useCallback((sessionId: string, status: SessionStatus) => {
const slot = getSlot(sessionId);
slot.status = status;
notify(sessionId);
}, [getSlot, notify]);
* Check if a session's data is stale (>30s old).
*/
const isStale = useCallback((sessionId: string) => {
const slot = storeRef.current.get(sessionId);
if (!slot) return true;
return Date.now() - slot.fetchedAt > STALE_THRESHOLD_MS;
}, []);
* Update or create a streaming message (accumulated text so far).
* Uses a well-known ID so subsequent calls replace the same message.
*/
const updateStreaming = useCallback((sessionId: string, accumulatedText: string, msgProvider: SessionProvider) => {
const slot = getSlot(sessionId);
const streamId = `__streaming_${sessionId}`;
const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
if (idx >= 0) {
const existing = slot.realtimeMessages[idx];
slot.realtimeMessages = [...slot.realtimeMessages];
slot.realtimeMessages[idx] = {
...existing,
provider: msgProvider,
content: accumulatedText,
};
} else {
const serverTailId = slot.serverMessages.length > 0
? slot.serverMessages[slot.serverMessages.length - 1].id
: null;
const msg: NormalizedMessage = {
id: streamId,
sessionId,
timestamp: new Date().toISOString(),
provider: msgProvider,
kind: 'stream_delta',
content: accumulatedText,
serverTailIdAtStart: serverTailId ?? undefined,
};
slot.realtimeMessages = [...slot.realtimeMessages, msg];
}
recomputeMergedIfNeeded(slot);
notify(sessionId);
}, [getSlot, notify]);
* Finalize streaming: convert the streaming message to a regular text message.
* The well-known streaming ID is replaced with a unique text message ID.
*/
const finalizeStreaming = useCallback((sessionId: string) => {
const slot = storeRef.current.get(sessionId);
if (!slot) return;
const streamId = `__streaming_${sessionId}`;
const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
if (idx >= 0) {
const stream = slot.realtimeMessages[idx];
const newId = `text_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
slot.realtimeMessages = [...slot.realtimeMessages];
slot.realtimeMessages[idx] = {
...stream,
id: newId,
kind: 'text',
role: 'assistant',
};
recomputeMergedIfNeeded(slot);
notify(sessionId);
}
}, [notify]);
* Update or create a streaming thinking message (accumulated thinking so far).
* Mirrors updateStreaming but uses kind='thinking' and a separate well-known ID.
*/
const updateStreamingThinking = useCallback((sessionId: string, accumulatedText: string, msgProvider: SessionProvider) => {
const slot = getSlot(sessionId);
const streamId = `__streaming_thinking_${sessionId}`;
const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
if (idx >= 0) {
const existing = slot.realtimeMessages[idx];
slot.realtimeMessages = [...slot.realtimeMessages];
slot.realtimeMessages[idx] = {
...existing,
provider: msgProvider,
content: accumulatedText,
};
} else {
const msg: NormalizedMessage = {
id: streamId,
sessionId,
timestamp: new Date().toISOString(),
provider: msgProvider,
kind: 'thinking',
content: accumulatedText,
};
slot.realtimeMessages = [...slot.realtimeMessages, msg];
}
recomputeMergedIfNeeded(slot);
notify(sessionId);
}, [getSlot, notify]);
* Finalize streaming thinking: replace the well-known streaming thinking ID
* with a unique ID so subsequent thinking blocks don't overwrite it.
*/
const finalizeStreamingThinking = useCallback((sessionId: string) => {
const slot = storeRef.current.get(sessionId);
if (!slot) return;
const streamId = `__streaming_thinking_${sessionId}`;
const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
if (idx >= 0) {
const stream = slot.realtimeMessages[idx];
const newId = `thinking_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
slot.realtimeMessages = [...slot.realtimeMessages];
slot.realtimeMessages[idx] = {
...stream,
id: newId,
};
recomputeMergedIfNeeded(slot);
notify(sessionId);
}
}, [notify]);
* Clear realtime messages for a session (e.g., after stream completes and server fetch catches up).
*/
const clearRealtime = useCallback((sessionId: string) => {
const slot = storeRef.current.get(sessionId);
if (slot) {
slot.realtimeMessages = [];
recomputeMergedIfNeeded(slot);
notify(sessionId);
}
}, [notify]);
* Get merged messages for a session (for rendering).
*/
const getMessages = useCallback((sessionId: string): NormalizedMessage[] => {
return storeRef.current.get(sessionId)?.merged ?? [];
}, []);
const getActivityMessages = useCallback((sessionId: string): NormalizedMessage[] => {
return storeRef.current.get(sessionId)?.activityMessages ?? [];
}, []);
* Get session slot (for status, pagination info, etc.).
*/
const getSessionSlot = useCallback((sessionId: string): SessionSlot | undefined => {
return storeRef.current.get(sessionId);
}, []);
return useMemo(() => ({
getSlot,
has,
fetchFromServer,
fetchMore,
appendRealtime,
upsertActivity,
setActivities,
appendRealtimeBatch,
refreshFromServer,
setActiveSession,
setStatus,
isStale,
updateStreaming,
finalizeStreaming,
updateStreamingThinking,
finalizeStreamingThinking,
clearRealtime,
getMessages,
getActivityMessages,
getSessionSlot,
}), [
getSlot, has, fetchFromServer, fetchMore,
appendRealtime, upsertActivity, setActivities, appendRealtimeBatch, refreshFromServer,
setActiveSession, setStatus, isStale, updateStreaming, finalizeStreaming,
updateStreamingThinking, finalizeStreamingThinking,
clearRealtime, getMessages, getActivityMessages, getSessionSlot,
]);
}
export type SessionStore = ReturnType<typeof useSessionStore>;