/**
 * Session-keyed message store.
 *
 * Holds per-session state in a Map keyed by sessionId.
 * Session switch = change activeSessionId pointer. No clearing. Old data stays.
 * WebSocket handler = store.appendRealtime(msg.sessionId, msg). One line.
 * No localStorage for messages. Backend JSONL is the source of truth.
 */

import { useCallback, useMemo, useRef, useState } from 'react';
import type { SessionProvider } from '../types/app';
import { authenticatedFetch } from '../utils/api';

// ─── NormalizedMessage (mirrors server/adapters/types.js) ────────────────────

export type MessageKind =
  | 'text'
  | 'tool_use'
  | 'tool_result'
  | 'thinking'
  | 'stream_delta'
  | 'stream_end'
  | 'error'
  | 'complete'
  | 'status'
  | 'permission_request'
  | 'permission_cancelled'
  | 'session_created'
  | 'interactive_prompt'
  | 'task_notification'
  | 'interrupted'
  | 'compact_boundary'
  | 'agent_activity'
  | 'agent_activity_summary';

export interface CompactProgress {
  level: number;
  stage: string;
  label: string;
  state: 'started' | 'running' | 'failed' | 'completed';
  pre_tokens?: number;
  reason?: string;
}

export interface NormalizedMessage {
  id: string;
  sessionId: string;
  timestamp: string;
  provider: SessionProvider;
  kind: MessageKind;

  // kind-specific fields (flat for simplicity)
  role?: 'user' | 'assistant';
  content?: string;
  images?: string[];
  attachments?: Array<{
    name: string;
    path?: string;
    size?: number;
    mimeType?: string;
  }>;
  toolName?: string;
  toolInput?: unknown;
  toolId?: string;
  toolResult?: { content: string; isError: boolean; toolUseResult?: unknown } | null;
  /**
   * Inline image payloads attached to a `tool_result` frame (e.g. `read_file`
   * on a PNG). Object shape with `data` (data URL) and optional `mimeType` —
   * distinct from `images?: string[]` above, which carries user-message
   * upload data URLs. The bridge wraps gateway base64 as data URLs upstream
   * so the UI can drop these straight into `<img src>` without re-parsing.
   */
  toolResultImages?: Array<{ data: string; mimeType?: string; name?: string }>;
  isError?: boolean;
  /**
   * `PilotDeckToolErrorCode` from the gateway when `kind === 'tool_result'`
   * and `isError === true` — flat on the frame because the bridge merges
   * `tool_call_finished.errorCode` here verbatim. See
   * `pilotdeck-bridge.js#tool_call_finished` and `chatPermissions.ts`.
   */
  errorCode?: string;
  text?: string;
  tokens?: number;
  canInterrupt?: boolean;
  compactProgress?: CompactProgress;
  tokenBudget?: unknown;
  requestId?: string;
  input?: unknown;
  context?: unknown;
  newSessionId?: string;
  status?: string;
  summary?: string;
  exitCode?: number;
  actualSessionId?: string;
  parentToolUseId?: string;
  subagentTools?: unknown[];
  taskId?: string;
  outputFile?: string;
  taskResult?: string;
  trigger?: string;
  preTokens?: number;
  compactLevel?: number;
  compactStage?: string;
  compactStageLabel?: string;
  compactMetadata?: unknown;
  runId?: string;
  activityId?: string;
  phase?: string;
  state?: string;
  title?: string;
  detail?: string;
  startedAt?: string;
  endedAt?: string | null;
  durationMs?: number | null;
  severity?: string;
  toolCallCount?: number;
  toolErrorCount?: number;
  ragSearchCount?: number;
  compactCount?: number;
  editedFileCount?: number;
  exploredFileCount?: number;
  commandCount?: number;
  subagentCount?: number;
  thinkingCount?: number;
  otherToolCount?: number;
  keySteps?: unknown[];
  isFinal?: boolean;
  // Cursor-specific ordering
  sequence?: number;
  rowid?: number;
  // Streaming-only: id of slot.serverMessages tail at the moment the
  // streaming row was created. computeMerged uses this for an id-based
  // same-turn-snapshot test instead of a timestamp window.
  serverTailIdAtStart?: string;
}

// ─── Per-session slot ────────────────────────────────────────────────────────

export type SessionStatus = 'idle' | 'loading' | 'streaming' | 'error';

export interface SessionSlot {
  serverMessages: NormalizedMessage[];
  realtimeMessages: NormalizedMessage[];
  activityMessages: NormalizedMessage[];
  merged: NormalizedMessage[];
  /** @internal Cache-invalidation refs for computeMerged */
  _lastServerRef: NormalizedMessage[];
  _lastRealtimeRef: NormalizedMessage[];
  status: SessionStatus;
  fetchedAt: number;
  lastError: string | null;
  total: number;
  hasMore: boolean;
  offset: number;
  tokenUsage: unknown;
}

const EMPTY: NormalizedMessage[] = [];

function createEmptySlot(): SessionSlot {
  return {
    serverMessages: EMPTY,
    realtimeMessages: EMPTY,
    activityMessages: EMPTY,
    merged: EMPTY,
    _lastServerRef: EMPTY,
    _lastRealtimeRef: EMPTY,
    status: 'idle',
    fetchedAt: 0,
    lastError: null,
    total: 0,
    hasMore: false,
    offset: 0,
    tokenUsage: null,
  };
}

function normalizeRealtimeText(value?: string): string {
  return typeof value === 'string' ? value.replace(/\s+/g, ' ').trim() : '';
}

function parseTimestampMs(value?: string): number | null {
  if (!value) return null;
  const parsed = Date.parse(value);
  return Number.isFinite(parsed) ? parsed : null;
}

function isConfirmedUserMessageDuplicate(
  realtimeMessage: NormalizedMessage,
  serverMessages: NormalizedMessage[],
): boolean {
  if (
    realtimeMessage.kind !== 'text'
    || realtimeMessage.role !== 'user'
    || !realtimeMessage.id.startsWith('local_')
  ) {
    return false;
  }

  const realtimeText = normalizeRealtimeText(realtimeMessage.content);
  if (!realtimeText) return false;

  const realtimeTimestamp = parseTimestampMs(realtimeMessage.timestamp);

  return serverMessages.some((serverMessage) => {
    if (serverMessage.kind !== 'text' || serverMessage.role !== 'user') {
      return false;
    }

    if (normalizeRealtimeText(serverMessage.content) !== realtimeText) {
      return false;
    }

    if (realtimeTimestamp == null) {
      return true;
    }

    const serverTimestamp = parseTimestampMs(serverMessage.timestamp);
    if (serverTimestamp == null) {
      return true;
    }

    return Math.abs(serverTimestamp - realtimeTimestamp) <= 10_000;
  });
}

/**
 * The backend pushes a synthetic `interrupted` notice the moment abort fires

 * "[Request interrupted by user]" entry into the JSONL during the next user
 * turn. Once that JSONL entry is replayed via the server, drop the locally
 * pushed one to avoid stacking two dividers in the conversation.
 */
function isLocalInterruptDuplicate(
  realtimeMessage: NormalizedMessage,
  serverMessages: NormalizedMessage[],
): boolean {
  if (
    realtimeMessage.kind !== 'interrupted'
    || !realtimeMessage.id.startsWith('local_interrupt_')
  ) {
    return false;
  }

  const realtimeTimestamp = parseTimestampMs(realtimeMessage.timestamp);

  return serverMessages.some((serverMessage) => {
    if (serverMessage.kind !== 'interrupted') return false;
    if (realtimeTimestamp == null) return true;
    const serverTimestamp = parseTimestampMs(serverMessage.timestamp);
    if (serverTimestamp == null) return true;
    // Be generous on the window — the JSONL timestamp is when the SDK wrote
    // it on the next turn, which can be many minutes after the actual abort.
    return Math.abs(serverTimestamp - realtimeTimestamp) <= 30 * 60_000;
  });
}

/**
 * Compute merged messages: server + realtime, deduped by id.
 * Server messages take priority (they're the persisted source of truth).
 * Realtime messages that aren't yet in server stay (in-flight streaming).
 */
function computeMerged(server: NormalizedMessage[], realtime: NormalizedMessage[]): NormalizedMessage[] {
  if (realtime.length === 0) return server;
  if (server.length === 0) return realtime;
  const serverIds = new Set(server.map(m => m.id));
  const extra = realtime.filter((message) => {
    if (serverIds.has(message.id)) return false;
    if (isConfirmedUserMessageDuplicate(message, server)) return false;
    if (isLocalInterruptDuplicate(message, server)) return false;
    return true;
  });
  if (extra.length === 0) return server;

  // Structural dedup: if there's an active __streaming_ message in extras
  // AND the server's last message is an assistant text whose id is NEW
  // (different from the id captured when streaming started), the server
  // wrote a mid-stream snapshot of the in-progress turn. Drop the server
  // snapshot in favor of the live streaming version.
  //
  // We compare ids (not timestamps) so the test is immune to NTP drift /
  // burst-turn scenarios where the previous turn's assistant message
  // finished writing within milliseconds of the next turn's first
  // stream_delta — a timestamp window can't distinguish those cases,
  // but an id comparison can: the previous turn's tail id was already
  // captured into `serverTailIdAtStart`, so a `lastServer.id ===
  // streamMsg.serverTailIdAtStart` match means "still the same tail
  // that was there at turn start" → don't dedup.
  const streamIdx = extra.findIndex(m => m.id.startsWith('__streaming_'));
  if (streamIdx >= 0 && server.length > 0) {
    const lastServer = server[server.length - 1];
    const streamMsg = extra[streamIdx];
    const isAssistantText = lastServer.kind === 'text' && lastServer.role === 'assistant';
    const tailIdChanged = streamMsg.serverTailIdAtStart !== undefined
      && lastServer.id !== streamMsg.serverTailIdAtStart;
    if (isAssistantText && tailIdChanged) {
      return [...server.slice(0, -1), ...extra];
    }
  }

  return [...server, ...extra];
}

function upsertRealtimeMessages(
  existing: NormalizedMessage[],
  incoming: NormalizedMessage[],
): NormalizedMessage[] {
  if (incoming.length === 0) return existing;
  const updated = [...existing];
  const indexById = new Map(updated.map((message, index) => [message.id, index]));
  for (const message of incoming) {
    const existingIndex = indexById.get(message.id);
    if (existingIndex === undefined) {
      indexById.set(message.id, updated.length);
      updated.push(message);
    } else {
      updated[existingIndex] = message;
    }
  }
  return updated;
}

/**
 * Recompute slot.merged only when the input arrays have actually changed
 * (by reference). Returns true if merged was recomputed.
 */
function recomputeMergedIfNeeded(slot: SessionSlot): boolean {
  if (slot.serverMessages === slot._lastServerRef && slot.realtimeMessages === slot._lastRealtimeRef) {
    return false;
  }
  slot._lastServerRef = slot.serverMessages;
  slot._lastRealtimeRef = slot.realtimeMessages;
  slot.merged = computeMerged(slot.serverMessages, slot.realtimeMessages);
  return true;
}

// ─── Stale threshold ─────────────────────────────────────────────────────────

const STALE_THRESHOLD_MS = 30_000;

const MAX_REALTIME_MESSAGES = 500;

// ─── Hook ────────────────────────────────────────────────────────────────────

export function useSessionStore() {
  const storeRef = useRef(new Map<string, SessionSlot>());
  const activeSessionIdRef = useRef<string | null>(null);
  // Bump to force re-render — only when the active session's data changes
  const [, setTick] = useState(0);
  const notify = useCallback((sessionId: string) => {
    if (sessionId === activeSessionIdRef.current) {
      setTick(n => n + 1);
    }
  }, []);

  const setActiveSession = useCallback((sessionId: string | null) => {
    const changed = activeSessionIdRef.current !== sessionId;
    activeSessionIdRef.current = sessionId;
    if (changed) {
      setTick(n => n + 1);
    }
  }, []);

  const getSlot = useCallback((sessionId: string): SessionSlot => {
    const store = storeRef.current;
    if (!store.has(sessionId)) {
      store.set(sessionId, createEmptySlot());
    }
    return store.get(sessionId)!;
  }, []);

  const has = useCallback((sessionId: string) => storeRef.current.has(sessionId), []);

  /**
   * Fetch messages from the unified endpoint and populate serverMessages.
   */
  const fetchFromServer = useCallback(async (
    sessionId: string,
    opts: {
      provider?: SessionProvider;
      projectName?: string;
      projectPath?: string;
      sessionKind?: string;
      parentSessionId?: string;
      relativeTranscriptPath?: string;
      limit?: number | null;
      offset?: number;
    } = {},
  ) => {
    const slot = getSlot(sessionId);
    slot.status = 'loading';
    notify(sessionId);

    const fetchStartedAt = Date.now();

    try {
      const params = new URLSearchParams();
      if (opts.provider) params.append('provider', opts.provider);
      if (opts.projectName) params.append('projectName', opts.projectName);
      if (opts.projectPath) params.append('projectPath', opts.projectPath);
      if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
      if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
      if (opts.relativeTranscriptPath) {
        params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
      }
      if (opts.limit !== null && opts.limit !== undefined) {
        params.append('limit', String(opts.limit));
        params.append('offset', String(opts.offset ?? 0));
      }

      const qs = params.toString();
      const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;
      const response = await authenticatedFetch(url);

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }

      const data = await response.json();
      const messages: NormalizedMessage[] = data.messages || [];

      slot.serverMessages = messages;
      slot.total = data.total ?? messages.length;
      slot.hasMore = Boolean(data.hasMore);
      slot.offset = (opts.offset ?? 0) + messages.length;
      slot.fetchedAt = Date.now();
      slot.status = 'idle';
      slot.lastError = null;

      // Prune realtime messages covered by server data.  Use the later of
      // fetchStartedAt and the latest server message timestamp as watermark
      // so that messages finalized DURING the fetch (race window) are also
      // pruned when the server response already includes them.
      if (slot.realtimeMessages.length > 0 && messages.length > 0) {
        const latestServerTs = messages.reduce(
          (max, m) => Math.max(max, Date.parse(m.timestamp) || 0), 0,
        );
        const watermark = Math.max(fetchStartedAt, latestServerTs);
        slot.realtimeMessages = slot.realtimeMessages.filter(m => {
          if (m.id.startsWith('__streaming_')) return true;
          return (Date.parse(m.timestamp) || 0) > watermark;
        });
      }

      recomputeMergedIfNeeded(slot);
      if (data.tokenUsage) {
        slot.tokenUsage = data.tokenUsage;
      }

      notify(sessionId);
      return slot;
    } catch (error) {
      console.error(`[SessionStore] fetch failed for ${sessionId}:`, error);
      slot.status = 'error';
      slot.lastError = error instanceof Error ? error.message : 'Unknown error';
      notify(sessionId);
      return slot;
    }
  }, [getSlot, notify]);

  /**
   * Load older (paginated) messages and prepend to serverMessages.
   */
  const fetchMore = useCallback(async (
    sessionId: string,
    opts: {
      provider?: SessionProvider;
      projectName?: string;
      projectPath?: string;
      sessionKind?: string;
      parentSessionId?: string;
      relativeTranscriptPath?: string;
      limit?: number;
    } = {},
  ) => {
    const slot = getSlot(sessionId);
    if (!slot.hasMore) return slot;

    const params = new URLSearchParams();
    if (opts.provider) params.append('provider', opts.provider);
    if (opts.projectName) params.append('projectName', opts.projectName);
    if (opts.projectPath) params.append('projectPath', opts.projectPath);
    if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
    if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
    if (opts.relativeTranscriptPath) {
      params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
    }
    const limit = opts.limit ?? 20;
    params.append('limit', String(limit));
    params.append('offset', String(slot.offset));

    const qs = params.toString();
    const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;

    try {
      const response = await authenticatedFetch(url);
      if (!response.ok) throw new Error(`HTTP ${response.status}`);
      const data = await response.json();
      const olderMessages: NormalizedMessage[] = data.messages || [];

      // Prepend older messages (they're earlier in the conversation)
      slot.serverMessages = [...olderMessages, ...slot.serverMessages];
      slot.hasMore = Boolean(data.hasMore);
      slot.offset = slot.offset + olderMessages.length;
      recomputeMergedIfNeeded(slot);
      notify(sessionId);
      return slot;
    } catch (error) {
      console.error(`[SessionStore] fetchMore failed for ${sessionId}:`, error);
      return slot;
    }
  }, [getSlot, notify]);

  /**
   * Append a realtime (WebSocket) message to the correct session slot.
   * This works regardless of which session is actively viewed.
   */
  const appendRealtime = useCallback((sessionId: string, msg: NormalizedMessage) => {
    const slot = getSlot(sessionId);
    let updated = upsertRealtimeMessages(slot.realtimeMessages, [msg]);
    if (updated.length > MAX_REALTIME_MESSAGES) {
      updated = updated.slice(-MAX_REALTIME_MESSAGES);
    }
    slot.realtimeMessages = updated;
    recomputeMergedIfNeeded(slot);
    notify(sessionId);
  }, [getSlot, notify]);

  const upsertActivity = useCallback((sessionId: string, msg: NormalizedMessage) => {
    const slot = getSlot(sessionId);
    const key = msg.activityId || msg.id;
    const existingIndex = slot.activityMessages.findIndex((activity) =>
      (activity.activityId || activity.id) === key
    );

    if (existingIndex >= 0) {
      const updated = [...slot.activityMessages];
      updated[existingIndex] = msg;
      slot.activityMessages = updated;
    } else {
      slot.activityMessages = [...slot.activityMessages, msg];
    }

    notify(sessionId);
  }, [getSlot, notify]);

  const setActivities = useCallback((sessionId: string, msgs: NormalizedMessage[]) => {
    const slot = getSlot(sessionId);
    const byKey = new Map<string, NormalizedMessage>();

    for (const msg of msgs) {
      if (msg.kind !== 'agent_activity') continue;
      byKey.set(msg.activityId || msg.id, msg);
    }

    slot.activityMessages = Array.from(byKey.values());
    notify(sessionId);
  }, [getSlot, notify]);

  /**
   * Append multiple realtime messages at once (batch).
   */
  const appendRealtimeBatch = useCallback((sessionId: string, msgs: NormalizedMessage[]) => {
    if (msgs.length === 0) return;
    const slot = getSlot(sessionId);
    let updated = upsertRealtimeMessages(slot.realtimeMessages, msgs);
    if (updated.length > MAX_REALTIME_MESSAGES) {
      updated = updated.slice(-MAX_REALTIME_MESSAGES);
    }
    slot.realtimeMessages = updated;
    recomputeMergedIfNeeded(slot);
    notify(sessionId);
  }, [getSlot, notify]);

  /**
   * Re-fetch serverMessages from the unified endpoint (e.g., on projects_updated).
   */
  const refreshFromServer = useCallback(async (
    sessionId: string,
    opts: {
      provider?: SessionProvider;
      projectName?: string;
      projectPath?: string;
      sessionKind?: string;
      parentSessionId?: string;
      relativeTranscriptPath?: string;
    } = {},
  ) => {
    const slot = getSlot(sessionId);
    try {
      const params = new URLSearchParams();
      if (opts.provider) params.append('provider', opts.provider);
      if (opts.projectName) params.append('projectName', opts.projectName);
      if (opts.projectPath) params.append('projectPath', opts.projectPath);
      if (opts.sessionKind) params.append('sessionKind', opts.sessionKind);
      if (opts.parentSessionId) params.append('parentSessionId', opts.parentSessionId);
      if (opts.relativeTranscriptPath) {
        params.append('relativeTranscriptPath', opts.relativeTranscriptPath);
      }

      const qs = params.toString();
      const url = `/api/sessions/${encodeURIComponent(sessionId)}/messages${qs ? `?${qs}` : ''}`;
      const response = await authenticatedFetch(url);

      if (!response.ok) throw new Error(`HTTP ${response.status}`);
      const data = await response.json();

      slot.serverMessages = data.messages || [];
      slot.total = data.total ?? slot.serverMessages.length;
      slot.hasMore = Boolean(data.hasMore);
      slot.fetchedAt = Date.now();
      // drop realtime messages that the server has caught up with to prevent unbounded growth.
      slot.realtimeMessages = [];
      recomputeMergedIfNeeded(slot);
      notify(sessionId);
    } catch (error) {
      console.error(`[SessionStore] refresh failed for ${sessionId}:`, error);
    }
  }, [getSlot, notify]);

  /**
   * Update session status.
   */
  const setStatus = useCallback((sessionId: string, status: SessionStatus) => {
    const slot = getSlot(sessionId);
    slot.status = status;
    notify(sessionId);
  }, [getSlot, notify]);

  /**
   * Check if a session's data is stale (>30s old).
   */
  const isStale = useCallback((sessionId: string) => {
    const slot = storeRef.current.get(sessionId);
    if (!slot) return true;
    return Date.now() - slot.fetchedAt > STALE_THRESHOLD_MS;
  }, []);

  /**
   * Update or create a streaming message (accumulated text so far).
   * Uses a well-known ID so subsequent calls replace the same message.
   */
  const updateStreaming = useCallback((sessionId: string, accumulatedText: string, msgProvider: SessionProvider) => {
    const slot = getSlot(sessionId);
    const streamId = `__streaming_${sessionId}`;
    const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
    if (idx >= 0) {
      // Subsequent delta — preserve the original turn-start timestamp so
      // computeMerged can tell which server snapshots belong to this turn.
      const existing = slot.realtimeMessages[idx];
      slot.realtimeMessages = [...slot.realtimeMessages];
      slot.realtimeMessages[idx] = {
        ...existing,
        provider: msgProvider,
        content: accumulatedText,
      };
    } else {
      // Record the id of server's tail message at the moment this turn
      // started streaming. computeMerged uses this for an id-based
      // dedup check that's immune to NTP drift / burst-turn time
      // windows: only delete the server tail if it's a NEW message
      // (a real mid-stream snapshot) rather than the previous turn's
      // legitimate trailing assistant message.
      const serverTailId = slot.serverMessages.length > 0
        ? slot.serverMessages[slot.serverMessages.length - 1].id
        : null;
      const msg: NormalizedMessage = {
        id: streamId,
        sessionId,
        timestamp: new Date().toISOString(),
        provider: msgProvider,
        kind: 'stream_delta',
        content: accumulatedText,
        serverTailIdAtStart: serverTailId ?? undefined,
      };
      slot.realtimeMessages = [...slot.realtimeMessages, msg];
    }
    recomputeMergedIfNeeded(slot);
    notify(sessionId);
  }, [getSlot, notify]);

  /**
   * Finalize streaming: convert the streaming message to a regular text message.
   * The well-known streaming ID is replaced with a unique text message ID.
   */
  const finalizeStreaming = useCallback((sessionId: string) => {
    const slot = storeRef.current.get(sessionId);
    if (!slot) return;
    const streamId = `__streaming_${sessionId}`;
    const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
    if (idx >= 0) {
      const stream = slot.realtimeMessages[idx];
      const newId = `text_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
      slot.realtimeMessages = [...slot.realtimeMessages];
      slot.realtimeMessages[idx] = {
        ...stream,
        id: newId,
        kind: 'text',
        role: 'assistant',
      };
      recomputeMergedIfNeeded(slot);
      notify(sessionId);
    }
  }, [notify]);

  /**
   * Update or create a streaming thinking message (accumulated thinking so far).
   * Mirrors updateStreaming but uses kind='thinking' and a separate well-known ID.
   */
  const updateStreamingThinking = useCallback((sessionId: string, accumulatedText: string, msgProvider: SessionProvider) => {
    const slot = getSlot(sessionId);
    const streamId = `__streaming_thinking_${sessionId}`;
    const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
    if (idx >= 0) {
      const existing = slot.realtimeMessages[idx];
      slot.realtimeMessages = [...slot.realtimeMessages];
      slot.realtimeMessages[idx] = {
        ...existing,
        provider: msgProvider,
        content: accumulatedText,
      };
    } else {
      const msg: NormalizedMessage = {
        id: streamId,
        sessionId,
        timestamp: new Date().toISOString(),
        provider: msgProvider,
        kind: 'thinking',
        content: accumulatedText,
      };
      slot.realtimeMessages = [...slot.realtimeMessages, msg];
    }
    recomputeMergedIfNeeded(slot);
    notify(sessionId);
  }, [getSlot, notify]);

  /**
   * Finalize streaming thinking: replace the well-known streaming thinking ID
   * with a unique ID so subsequent thinking blocks don't overwrite it.
   */
  const finalizeStreamingThinking = useCallback((sessionId: string) => {
    const slot = storeRef.current.get(sessionId);
    if (!slot) return;
    const streamId = `__streaming_thinking_${sessionId}`;
    const idx = slot.realtimeMessages.findIndex(m => m.id === streamId);
    if (idx >= 0) {
      const stream = slot.realtimeMessages[idx];
      const newId = `thinking_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
      slot.realtimeMessages = [...slot.realtimeMessages];
      slot.realtimeMessages[idx] = {
        ...stream,
        id: newId,
      };
      recomputeMergedIfNeeded(slot);
      notify(sessionId);
    }
  }, [notify]);

  /**
   * Clear realtime messages for a session (e.g., after stream completes and server fetch catches up).
   */
  const clearRealtime = useCallback((sessionId: string) => {
    const slot = storeRef.current.get(sessionId);
    if (slot) {
      slot.realtimeMessages = [];
      recomputeMergedIfNeeded(slot);
      notify(sessionId);
    }
  }, [notify]);

  /**
   * Get merged messages for a session (for rendering).
   */
  const getMessages = useCallback((sessionId: string): NormalizedMessage[] => {
    return storeRef.current.get(sessionId)?.merged ?? [];
  }, []);

  const getActivityMessages = useCallback((sessionId: string): NormalizedMessage[] => {
    return storeRef.current.get(sessionId)?.activityMessages ?? [];
  }, []);

  /**
   * Get session slot (for status, pagination info, etc.).
   */
  const getSessionSlot = useCallback((sessionId: string): SessionSlot | undefined => {
    return storeRef.current.get(sessionId);
  }, []);

  return useMemo(() => ({
    getSlot,
    has,
    fetchFromServer,
    fetchMore,
    appendRealtime,
    upsertActivity,
    setActivities,
    appendRealtimeBatch,
    refreshFromServer,
    setActiveSession,
    setStatus,
    isStale,
    updateStreaming,
    finalizeStreaming,
    updateStreamingThinking,
    finalizeStreamingThinking,
    clearRealtime,
    getMessages,
    getActivityMessages,
    getSessionSlot,
  }), [
    getSlot, has, fetchFromServer, fetchMore,
    appendRealtime, upsertActivity, setActivities, appendRealtimeBatch, refreshFromServer,
    setActiveSession, setStatus, isStale, updateStreaming, finalizeStreaming,
    updateStreamingThinking, finalizeStreamingThinking,
    clearRealtime, getMessages, getActivityMessages, getSessionSlot,
  ]);
}

export type SessionStore = ReturnType<typeof useSessionStore>;