"""Two-phase candidate extraction.

Phase 1 — Span Identification: Cheap JSON-mode LLM call identifies which
user message ranges contain extractable information.

Phase 2 — Span Structuring: For each span, focused tool-call extraction
with +/- 3 turns context.  Parallelized across spans (max_workers=4).

Falls back to single-span mode when Phase 1 fails or returns no spans,
preserving backward compatibility with mock / test scenarios.
"""

from __future__ import annotations

import logging
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional

from core.interfaces import LLM
from core.models import RequestContext, CandidateMemory

from extraction.schemas.registry import SchemaRegistry
from extraction.tool_builder import (
    build_extraction_tools,
    build_tool_to_category,
    parse_tool_call,
)
from extraction.prefetch import MemoryPrefetcher, PrefetchResult

logger = logging.getLogger(__name__)

_DEFAULT_SCHEMA_REGISTRY: SchemaRegistry | None = None
EXTRACTION_TOOLS: list[dict] = []
TOOL_TO_CATEGORY: dict[str, tuple[str, str]] = {}


def _get_default_schema_registry() -> SchemaRegistry:
    global _DEFAULT_SCHEMA_REGISTRY, EXTRACTION_TOOLS, TOOL_TO_CATEGORY
    if _DEFAULT_SCHEMA_REGISTRY is None:
        try:
            registry = SchemaRegistry()
            EXTRACTION_TOOLS = build_extraction_tools(registry)
            TOOL_TO_CATEGORY = build_tool_to_category(registry)
            _DEFAULT_SCHEMA_REGISTRY = registry
        except Exception:
            logger.exception("Default schema registry initialization failed")
            raise
    return _DEFAULT_SCHEMA_REGISTRY


# ---------------------------------------------------------------------------
# Internal data
# ---------------------------------------------------------------------------

@dataclass
class _Span:
    """A contiguous message range identified as containing extractable info."""
    start: int
    end: int
    reason: str = ""
    categories: list[str] = field(default_factory=list)
    message_ids: list[str] = field(default_factory=list)


@dataclass
class _PreparedSpan:
    """A span with its final Phase 2 prompt prepared for tool-calling."""
    span: _Span
    prompt: str
    prompt_sha: str


# JSON schema for Phase 1 span identification
_SPAN_SCHEMA: dict = {
    "type": "object",
    "properties": {
        "spans": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "start": {"type": "integer"},
                    "end": {"type": "integer"},
                    "reason": {"type": "string"},
                    "categories": {"type": "array", "items": {"type": "string"}},
                },
                "required": ["start", "end"],
            },
        },
    },
    "required": ["spans"],
}


def _prompt_sha(prompt: str) -> str:
    return hashlib.sha256(prompt.encode("utf-8", errors="replace")).hexdigest()


def _short_sha(sha: str) -> str:
    return sha[:12]


def _merge_unique_categories(primary: list[str], extra: list[str]) -> list[str]:
    """Merge categories in order so deduped prompts keep all prefetch scopes."""
    merged: list[str] = []
    seen: set[str] = set()
    for category in [*primary, *extra]:
        if not category or category in seen:
            continue
        merged.append(category)
        seen.add(category)
    return merged


# Phase 1 prompt — lightweight, no tool definitions
_SPAN_PROMPT = """\
Analyze this conversation and identify contiguous message ranges (spans) \
that contain personal information worth remembering.

Rules:
- ONLY identify spans from non-assistant messages. Ignore lines starting \
with "[N] assistant:".
- Each span should contain a coherent unit of extractable information.
- Merge adjacent user messages about the same topic into one span.
- Skip pure greetings, acknowledgments, or general knowledge.
- If a "Previously Extracted Context" section is present, DO NOT re-identify \
content already listed there.

For each span, specify:
- start: message index (the [N] prefix number)
- end: message index (inclusive)
- reason: what extractable information this span contains
- categories: likely categories from: profile, preference, entity, event, \
case, pattern, skill, tool

{session_summary_block}\
Conversation:
{conversation}

Return JSON: {{"spans": [{{"start": int, "end": int, "reason": str, \
"categories": [str]}}]}}
If nothing is extractable: {{"spans": []}}"""


# ---------------------------------------------------------------------------
# Extractor
# ---------------------------------------------------------------------------

class Extractor:
    """Two-phase candidate memory extractor.

    Phase 1 uses a cheap JSON-mode LLM call to identify which user message
    ranges contain extractable information.  Phase 2 runs focused tool-call
    extraction per span with surrounding context, parallelized across spans.
    """

    def __init__(
        self,
        llm: LLM,
        prompt_manager: Any | None = None,
        schema_registry: SchemaRegistry | None = None,
        fs: Any | None = None,
        vector_index: Any | None = None,
        embedder: Any | None = None,
        uri_resolver: Any | None = None,
        mode: str = "eager",
        react_timeout_seconds: float = 30.0,
        internal_tool_usage_tracker: Any | None = None,
    ):
        """Initialize the Extractor.

        Args:
            llm: LLM instance with complete_with_tools() and complete_json().
            prompt_manager: Optional PromptManager for template-based prompts.
                          If None, auto-creates one from default templates dir.
            schema_registry: Optional SchemaRegistry override for tests or
                           custom memory types. Tool definitions are always
                           generated from SchemaRegistry; no legacy fallback
                           exists. If None, a default registry is loaded from
                           YAML definitions.
            fs: Optional ContextFS for prefetching existing memories.
            vector_index: Optional VectorIndex for prefetching existing memories.
            embedder: Optional Embedder for prefetching existing memories.
            uri_resolver: Optional URIResolver for prefetching existing memories.
            mode: Extraction mode — "eager" (prefetch + single-shot) or
                  "lazy" (ReAct loop with read tools). Defaults to "eager".
            react_timeout_seconds: Maximum seconds for ReAct loop before timeout. Defaults to 30.0.
        """
        self._llm = llm
        if prompt_manager is None:
            from extraction.prompts import PromptManager as _PM
            prompt_manager = _PM()
        self._prompt_manager = prompt_manager
        self._schema_registry = schema_registry or _get_default_schema_registry()
        self._fs = fs
        self._vector_index = vector_index
        self._embedder = embedder
        self._uri_resolver = uri_resolver
        self._mode = mode
        self._react_timeout_seconds = react_timeout_seconds
        self._internal_tool_usage_tracker = internal_tool_usage_tracker
        self._prefetcher: Optional[MemoryPrefetcher] = None
        if fs and vector_index and embedder and uri_resolver and self._schema_registry:
            self._prefetcher = MemoryPrefetcher(fs, vector_index, embedder, self._schema_registry, uri_resolver)
            logger.info(f"Extractor: prefetcher enabled ({mode} mode)")

        self._extraction_tools = build_extraction_tools(self._schema_registry)
        if not self._extraction_tools:
            incompatible = [
                f"{schema.memory_type}:{schema.version}"
                for schema in self._schema_registry.list_incompatible_enabled()
            ]
            detail = f" incompatible={incompatible}" if incompatible else ""
            raise ValueError(f"No compatible extraction tools available.{detail}")
        self._tool_to_category = build_tool_to_category(self._schema_registry)
        self._parse_tool_call_fn = lambda name, inp: parse_tool_call(name, inp, self._schema_registry)
        logger.info(f"Extractor using SchemaRegistry with {len(self._extraction_tools)} tools")

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def extract(
        self,
        messages: list[dict],
        ctx: RequestContext,
        session_time: Any | None = None,
        session_summary: str = "",
        tool_stats_text: str = "",
        archive_id: str | None = None,
    ) -> list[CandidateMemory]:
        """Extract candidate memories from conversation messages.

        Args:
            messages: List of message dicts with "role" and "content"
            ctx: RequestContext for this extraction
            session_time: Optional datetime for temporal resolution
            session_summary: Optional summary of previously extracted content
            tool_stats_text: Optional tool usage statistics text
            archive_id: Optional archive_id for provenance tracking

        Returns:
            List of CandidateMemory with confidence >= 0.5
        """
        # No user content to extract from
        user_indices = [
            i for i, m in enumerate(messages)
            if m.get("role") != "assistant"
        ]
        if not user_indices:
            return []

        # Detect language once
        language = self._detect_language(messages)

        # Phase 1: identify spans worth extracting
        spans = self._identify_spans(messages, session_time, session_summary)
        if not spans:
            user_content_len = sum(
                len(m.get("content", "") or "")
                for m in messages if m.get("role") != "assistant"
            )
            _MIN_FALLBACK_LEN = 200
            if user_content_len >= _MIN_FALLBACK_LEN:
                spans = [_Span(
                    start=0,
                    end=len(messages) - 1,
                    reason="fallback: Phase 1 returned no spans",
                    categories=[],
                )]
                logger.info(
                    "Phase 1 empty → fallback full-conversation span [0, %d] "
                    "(user_content_len=%d)",
                    len(messages) - 1, user_content_len,
                )
            else:
                logger.info(
                    "Phase 1 empty, user content too short (%d chars < %d) → skip Phase 2",
                    user_content_len, _MIN_FALLBACK_LEN,
                )
                return []

        # Phase 2: structure each span
        candidates = self._structure_spans(
            spans, messages, language, session_time, session_summary, tool_stats_text, ctx, archive_id,
        )

        confidence_threshold = 0.5
        filtered = [c for c in candidates if c.confidence >= confidence_threshold]
        dropped = len(candidates) - len(filtered)
        if dropped > 0:
            for c in candidates:
                if c.confidence < confidence_threshold:
                    logger.info(
                        "Dropped candidate (confidence=%.2f < %.1f): category=%s routing_key=%s abstract=%.80s",
                        c.confidence, confidence_threshold, c.category, c.routing_key, c.abstract,
                    )
            logger.info(
                "Phase 2 confidence filter: %d kept, %d dropped (< %.1f)",
                len(filtered), dropped, confidence_threshold,
            )
        return filtered

    # ------------------------------------------------------------------
    # Phase 1 — Span Identification
    # ------------------------------------------------------------------

    def _identify_spans(
        self,
        messages: list[dict],
        session_time: Any | None,
        session_summary: str,
    ) -> list[_Span]:
        """Use a cheap JSON-mode call to find extractable message ranges."""
        conversation = self._format_indexed_conversation(messages, session_time)

        summary_block = ""
        if session_summary:
            summary_block = (
                "Previously Extracted Context "
                "(DO NOT re-identify these — they are already saved):\n"
                f"{session_summary}\n\n"
            )

        prompt = _SPAN_PROMPT.format(
            conversation=conversation,
            session_summary_block=summary_block,
        )

        try:
            result = self._llm.complete_json(prompt, schema=_SPAN_SCHEMA)
            spans: list[_Span] = []
            n = len(messages)
            for s in result.get("spans", []):
                if n <= 1:
                    span_start = 0
                    span_end = 0
                else:
                    span_start = max(0, int(s.get("start", 0)))
                    span_end = min(n - 1, int(s.get("end", 0)))
                span_message_ids = [
                    m.get("id", "")
                    for m in messages[span_start:span_end + 1]
                    if m.get("id")
                ]
                span = _Span(
                    start=span_start,
                    end=span_end,
                    reason=s.get("reason", ""),
                    categories=s.get("categories", []),
                    message_ids=span_message_ids,
                )
                if span.start <= span.end:
                    spans.append(span)
            if not spans:
                logger.warning("Phase 1 returned 0 valid spans for %d messages", len(messages))
            return spans
        except Exception as exc:
            logger.error("Phase 1 span identification FAILED: %s", exc, exc_info=True)
            raise

    # ------------------------------------------------------------------
    # Phase 2 — Span Structuring
    # ------------------------------------------------------------------

    def _structure_spans(
        self,
        spans: list[_Span],
        messages: list[dict],
        language: str,
        session_time: Any | None,
        session_summary: str,
        tool_stats_text: str,
        ctx: RequestContext,
        archive_id: str | None = None,
    ) -> list[CandidateMemory]:
        """Run focused tool-call extraction for each span (dual-run with merge)."""
        prepared_spans = self._prepare_unique_spans(
            spans, messages, language,
            session_time, session_summary, tool_stats_text, ctx,
        )
        if not prepared_spans:
            return []

        all_candidates: list[CandidateMemory] = []

        # Single span → skip thread pool overhead
        if len(prepared_spans) == 1:
            c1 = self._structure_prepared_span(prepared_spans[0], ctx, archive_id)
            c2 = self._structure_prepared_span(prepared_spans[0], ctx, archive_id, temperature=0)
            all_candidates.extend(c1)
            all_candidates.extend(c2)
            return self._merge_dual_run(all_candidates)

        with ThreadPoolExecutor(max_workers=4) as pool:
            futures = {}
            for prepared in prepared_spans:
                f1 = pool.submit(self._structure_prepared_span, prepared, ctx, archive_id)
                f2 = pool.submit(self._structure_prepared_span, prepared, ctx, archive_id, temperature=0)
                futures[f1] = prepared.span
                futures[f2] = prepared.span

            for future in as_completed(futures):
                span = futures[future]
                try:
                    result = future.result()
                except Exception as exc:
                    logger.error(
                        "Span %d-%d structuring FAILED: %s",
                        span.start, span.end, exc, exc_info=True,
                    )
                    raise
                all_candidates.extend(result)

        return self._merge_dual_run(all_candidates)

    @staticmethod
    def _merge_dual_run(candidates: list[CandidateMemory]) -> list[CandidateMemory]:
        """Merge dual-run results by (category, routing_key).

        For each group, keep the candidate with higher confidence and richer
        content. Merges when/who/where fields by taking non-empty values.
        """
        merged: dict[tuple[str, str], CandidateMemory] = {}
        for c in candidates:
            key = (c.category, c.routing_key)
            if key not in merged:
                merged[key] = c
                continue
            existing = merged[key]
            # Pick higher confidence; if equal, pick richer content
            if c.confidence > existing.confidence:
                # Keep existing content if it's richer
                if len(existing.content) > len(c.content):
                    c.content = existing.content
                if not c.when and existing.when:
                    c.when = existing.when
                if not c.who and existing.who:
                    c.who = existing.who
                if not c.where and existing.where:
                    c.where = existing.where
                merged[key] = c
            elif len(c.content) > len(existing.content) * 1.5:
                # New content is significantly richer → upgrade
                if not c.when and existing.when:
                    c.when = existing.when
                if not c.who and existing.who:
                    c.who = existing.who
                if not c.where and existing.where:
                    c.where = existing.where
                merged[key] = c
        return list(merged.values())

    def _prepare_unique_spans(
        self,
        spans: list[_Span],
        messages: list[dict],
        language: str,
        session_time: Any | None,
        session_summary: str,
        tool_stats_text: str,
        ctx: RequestContext,
    ) -> list[_PreparedSpan]:
        """Build final Phase 2 prompts and drop duplicate prompt_sha work."""
        prepared_spans: list[_PreparedSpan] = []
        seen: dict[str, _PreparedSpan] = {}
        skipped = 0

        for span in spans:
            prepared = self._prepare_span(
                span, messages, language,
                session_time, session_summary, tool_stats_text, ctx,
            )
            existing = seen.get(prepared.prompt_sha)
            if existing is not None:
                if existing.prompt == prepared.prompt:
                    original_categories = list(existing.span.categories)
                    existing.span.categories = _merge_unique_categories(
                        original_categories,
                        prepared.span.categories,
                    )
                    skipped += 1
                    logger.info(
                        "Phase 2 duplicate prompt skipped: session=%s trace=%s "
                        "prompt_sha=%s duplicate_span=[%d-%d] original_span=[%d-%d] "
                        "duplicate_categories=%s original_categories=%s merged_categories=%s",
                        getattr(ctx, "session_id", ""),
                        getattr(ctx, "trace_id", ""),
                        _short_sha(prepared.prompt_sha),
                        prepared.span.start,
                        prepared.span.end,
                        existing.span.start,
                        existing.span.end,
                        prepared.span.categories,
                        original_categories,
                        existing.span.categories,
                    )
                    continue
                logger.warning(
                    "Phase 2 prompt_sha collision detected; keeping both spans: "
                    "session=%s trace=%s prompt_sha=%s span=[%d-%d] existing_span=[%d-%d]",
                    getattr(ctx, "session_id", ""),
                    getattr(ctx, "trace_id", ""),
                    _short_sha(prepared.prompt_sha),
                    prepared.span.start,
                    prepared.span.end,
                    existing.span.start,
                    existing.span.end,
                )

            seen[prepared.prompt_sha] = prepared
            prepared_spans.append(prepared)

        if skipped:
            logger.info(
                "Phase 2 prompt dedupe: session=%s trace=%s spans=%d unique=%d skipped=%d",
                getattr(ctx, "session_id", ""),
                getattr(ctx, "trace_id", ""),
                len(spans),
                len(prepared_spans),
                skipped,
            )
        return prepared_spans

    def _prepare_span(
        self,
        span: _Span,
        messages: list[dict],
        language: str,
        session_time: Any | None,
        session_summary: str,
        tool_stats_text: str,
        ctx: RequestContext,
    ) -> _PreparedSpan:
        """Prepare the final prompt used for one Phase 2 extraction request."""
        prompt = self._build_span_prompt(
            span, messages, language,
            session_time, session_summary, tool_stats_text, ctx,
        )
        if not self._use_lazy_mode():
            prompt = self._with_prefetch_context(span, prompt, ctx)
        prompt_sha = _prompt_sha(prompt)
        return _PreparedSpan(span=span, prompt=prompt, prompt_sha=prompt_sha)

    def _use_lazy_mode(self) -> bool:
        """Return whether this extractor can run the lazy ReAct path."""
        return bool(self._mode == "lazy" and self._fs and self._schema_registry and self._uri_resolver)

    def _structure_prepared_span(
        self,
        prepared: _PreparedSpan,
        ctx: RequestContext,
        archive_id: str | None = None,
        temperature: float | None = None,
    ) -> list[CandidateMemory]:
        """Extract structured candidates from an already prepared span prompt."""
        span = prepared.span
        if self._use_lazy_mode():
            return self._structure_span_lazy(span, prepared.prompt, ctx, archive_id)
        return self._structure_span_eager(span, prepared.prompt, ctx, prepared.prompt_sha, archive_id, temperature=temperature)

    def _build_span_prompt(
        self,
        span: _Span,
        messages: list[dict],
        language: str,
        session_time: Any | None,
        session_summary: str,
        tool_stats_text: str,
        ctx: RequestContext,
    ) -> str:
        """Build the base Phase 2 prompt for a span before eager prefetch."""
        # Focused context: +/- 3 turns around the span
        ctx_start = max(0, span.start - 3)
        ctx_end = min(len(messages) - 1, span.end + 3)
        focused = messages[ctx_start: ctx_end + 1]

        conversation = self._format_conversation(
            focused, session_time,
            session_summary="",
            tool_stats_text=(
                tool_stats_text if span.end >= len(messages) - 3 else ""
            ),
        )
        return self._build_prompt(language, conversation, session_summary, ctx=ctx)

    def _structure_span_eager(
        self,
        span: _Span,
        prompt: str,
        ctx: RequestContext,
        prompt_sha: str,
        archive_id: str | None = None,
        temperature: float | None = None,
    ) -> list[CandidateMemory]:
        """Eager mode: prefetch + single-shot extraction."""
        tool_calls = self._llm.complete_with_tools(
            prompt=prompt,
            tools=self._extraction_tools,
            tool_choice="required",
            temperature=temperature,
        )

        if not tool_calls:
            logger.warning(
                "Phase 2 zero tool_calls: session=%s trace=%s span=[%d-%d] "
                "categories=%s prompt_len=%d prompt_sha=%s",
                getattr(ctx, "session_id", ""),
                getattr(ctx, "trace_id", ""),
                span.start,
                span.end,
                span.categories,
                len(prompt),
                _short_sha(prompt_sha),
            )

        candidates: list[CandidateMemory] = []
        for call in tool_calls:
            c = self._to_candidate(call)
            if c is not None:
                candidates.append(c)
        self._assign_provenance(candidates, span, archive_id)
        return candidates

    def _structure_span_lazy(
        self,
        span: _Span,
        prompt: str,
        ctx: RequestContext,
        archive_id: str | None = None,
        focused_messages: list[dict] | None = None,
    ) -> list[CandidateMemory]:
        """Lazy mode: ReAct loop with read tools, fallback to eager on failure."""
        try:
            from extraction.react_loop import ExtractionReActLoop

            prefetch_result = None
            if self._prefetcher is not None:
                prefetch_result = self._collect_prefetch_context(
                    span.categories, ctx, focused_messages=focused_messages,
                )

            loop = ExtractionReActLoop(
                llm=self._llm,
                fs=self._fs,
                registry=self._schema_registry,
                uri_resolver=self._uri_resolver,
                prefetcher=self._prefetcher,
                timeout_seconds=self._react_timeout_seconds,
                internal_tool_usage_tracker=self._internal_tool_usage_tracker,
            )
            result = loop.run(prompt, ctx, prefetch_result)
            logger.info(
                f"ReAct loop completed: {len(result.candidates)} candidates, "
                f"{result.iterations} iterations, {len(result.read_uris)} reads"
            )
            candidates = result.candidates
            self._assign_provenance(candidates, span, archive_id)
            return candidates
        except Exception as e:
            logger.error(f"ReAct loop failed, falling back to eager mode: {e}")
            prompt = self._with_prefetch_context(span, prompt, ctx)
            prompt_sha = _prompt_sha(prompt)
            return self._structure_span_eager(span, prompt, ctx, prompt_sha, archive_id)

    def _with_prefetch_context(
        self,
        span: _Span,
        prompt: str,
        ctx: RequestContext,
    ) -> str:
        """Inject eager prefetch context into a Phase 2 prompt when available."""
        if self._prefetcher is None:
            return prompt
        prefetch_result = self._collect_prefetch_context(span.categories, ctx)
        if not prefetch_result or not prefetch_result.messages:
            return prompt
        prefetch_block = (
            "== Existing Memories (DO NOT re-extract these — update or extend them) ==\n"
            + "\n".join(prefetch_result.messages)
            + "\n==\n"
        )
        return prefetch_block + "\n" + prompt

    # ------------------------------------------------------------------
    # Shared helpers
    # ------------------------------------------------------------------

    def _collect_prefetch_context(
        self,
        categories: list[str],
        ctx: RequestContext,
        *,
        focused_messages: list[dict] | None = None,
    ) -> PrefetchResult | None:
        """Collect prefetch context for given categories."""
        if self._prefetcher is None:
            return None
        # Build conversation text from focused messages for semantic prefetch
        conv_text: str | None = None
        if focused_messages:
            parts = []
            for msg in focused_messages:
                role = msg.get("role", "")
                content = msg.get("content", "")
                if isinstance(content, str) and content.strip():
                    parts.append(content.strip())
            if parts:
                conv_text = " ".join(parts)[:500]
        try:
            return self._prefetcher.prefetch_for_span(categories, ctx, conversation_text=conv_text)
        except Exception as e:
            logger.error(f"Prefetch failed, continuing without: {e}")
            return None

    def _build_prompt(
        self,
        language: str,
        conversation: str,
        session_summary: str = "",
        ctx: RequestContext | None = None,
    ) -> str:
        """Build extraction prompt from YAML template."""
        system_prompt = self._prompt_manager.render(
            "extraction", "system_prompt",
            session_summary=session_summary,
        )
        examples = self._prompt_manager.render("extraction", "examples")
        output_instruction = self._prompt_manager.render(
            "extraction", "output_instruction",
            output_language=language,
        )

        # Identity anchoring: tell the LLM who the user is
        identity_block = ""
        if ctx is not None:
            identity_block = (
                f"\n\n== IDENTITY ANCHOR ==\n"
                f"The user (the human you serve) is identified as '{ctx.user_id}'.\n"
                f"In the conversation, this user may appear as:\n"
                f"  - A first-person 'I' / 'me' in direct dialogue\n"
                f"  - Their name in a [Name]: prefix if it matches '{ctx.user_id}'\n"
                f"  - Any alias or nickname that resolves to '{ctx.user_id}'\n"
                f"Profile facts MUST be BY this user AND ABOUT this user.\n"
                f"Statements by OTHER named speakers → extract_entity, NOT extract_profile.\n"
                f"If unsure whether a speaker is the user → extract_entity.\n==\n"
            )

        return (
            f"{system_prompt}{identity_block}\n\n{examples}\n\n"
            f"{output_instruction}\n\nConversation:\n{conversation}"
        )

    @staticmethod
    def _content_to_str(content) -> str:
        """Normalize message content to plain string."""
        if isinstance(content, str):
            return content
        if isinstance(content, list):
            parts: list[str] = []
            for block in content:
                if isinstance(block, dict):
                    parts.append(block.get("text", ""))
                elif isinstance(block, str):
                    parts.append(block)
            return " ".join(parts).strip()
        return str(content) if content else ""

    def _detect_language(self, messages: list[dict]) -> str:
        """Detect language from user messages."""
        from core.language import detect_language

        user_text = " ".join(
            self._content_to_str(m.get("content", ""))
            for m in messages
            if m.get("role") == "user"
        )
        return detect_language(user_text)

    def _format_conversation(
        self,
        messages: list[dict],
        session_time: Any | None = None,
        session_summary: str = "",
        tool_stats_text: str = "",
    ) -> str:
        """Format messages into conversation text with session time header."""
        if session_time is None:
            session_time = datetime.now()

        time_str = session_time.strftime("%Y-%m-%d %H:%M")
        day_of_week = session_time.strftime("%A")

        lines = [f"**Session Time:** {time_str} ({day_of_week})"]
        lines.append(
            "Relative times (e.g., 'last week', 'next month') are based "
            "on Session Time, not today."
        )

        if session_summary:
            lines.append("")
            lines.append(
                "## Previously Extracted Context "
                "(DO NOT re-extract these — they are already saved)"
            )
            lines.append(session_summary)

        if tool_stats_text:
            lines.append("")
            lines.append(tool_stats_text)

        lines.append("")

        for msg in messages:
            role = msg.get("role", "unknown")
            content = self._content_to_str(msg.get("content", ""))
            if role == "assistant":
                lines.append(
                    f"assistant [do not extract from this]: {content}"
                )
            else:
                lines.append(f"{role}: {content}")
        return "\n".join(lines)

    def _format_indexed_conversation(
        self,
        messages: list[dict],
        session_time: Any | None = None,
    ) -> str:
        """Format messages with [N] index prefixes for span identification."""
        if session_time is None:
            session_time = datetime.now()

        time_str = session_time.strftime("%Y-%m-%d %H:%M")
        day_of_week = session_time.strftime("%A")

        lines = [f"Session Time: {time_str} ({day_of_week})"]
        lines.append("")

        for i, msg in enumerate(messages):
            role = msg.get("role", "unknown")
            content = self._content_to_str(msg.get("content", ""))
            if role == "assistant":
                lines.append(f"[{i}] assistant: {content}")
            else:
                lines.append(f"[{i}] {role}: {content}")
        return "\n".join(lines)

    def _assign_provenance(
        self,
        candidates: list[CandidateMemory],
        span: _Span | None,
        archive_id: str | None,
    ) -> None:
        """Assign provenance ID to all candidates from a single span."""
        if not archive_id or not span or not span.message_ids:
            return
        from core.provenance_resolver import ProvenanceResolver
        prov_id = ProvenanceResolver.build_id(
            "archive", archive_id, span.message_ids
        )
        for c in candidates:
            c.provenance_ids.append(prov_id)

    def _to_candidate(self, tool_call: dict) -> CandidateMemory | None:
        """Convert tool call result to CandidateMemory."""
        tool_name = tool_call.get("tool", "")
        input_data = tool_call.get("input", {})
        if "raw" in input_data and len(input_data) == 1:
            logger.warning(
                "Skipping tool call with unparsed raw arguments: tool=%s raw=%.200s",
                tool_name, input_data["raw"],
            )
            return None
        result = self._parse_tool_call_fn(tool_name, input_data)
        if result is None:
            return None
        _, _, candidate = result
        return candidate