"""Directory Event Handler: Process UPSERT_DIRECTORY events with DAG execution.

Implements DAG execution for directory summaries:
1. Accept UPSERT_DIRECTORY(root_dir) event
2. Trigger DAG execution for bottom-up processing
3. Wait for subdirectory summaries before generating parent summaries
4. Use fallback summaries when LLM fails
5. Vectorize and store to database after summary generation

This ensures summaries are generated bottom-up:
    leaf files → subdirs → parent dirs → root
"""

import asyncio
import logging
from dataclasses import dataclass, field

from core.interfaces import LLM, ContextFS, Embedder, VectorIndex
from core.models import ContextNode, IndexRecord, OutboxEvent, RequestContext
from index.directory_summarizer import (
    ChildSummary,
    DirectorySummarizer,
    DirectorySummary,
    is_directory_uri,
)
from index.index_record_builder import build_single_record

logger = logging.getLogger(__name__)


@dataclass
class DirNode:
    """Directory node state for DAG execution."""

    uri: str
    children_dirs: list[str]
    file_children: list[str]
    child_index: dict[str, int]
    children_summaries: list[ChildSummary | None]
    pending: int
    parent_uri: str | None = None
    summary_scheduled: bool = False
    lock: asyncio.Lock = field(default_factory=asyncio.Lock)


@dataclass
class DagStats:
    """Statistics for DAG execution."""

    total_dirs: int = 0
    pending_dirs: int = 0
    completed_dirs: int = 0
    failed_dirs: int = 0
    record_ids: list = field(default_factory=list)
    written_dir_uris: list = field(default_factory=list)


@dataclass
class DirectoryEventResult:
    """Result of processing a directory event."""

    success: bool
    root_uri: str
    stats: DagStats
    error_message: str = ""
    record_ids: list = field(default_factory=list)
    written_dir_uris: list = field(default_factory=list)


class DirectoryEventHandler:
    """Handle UPSERT_DIRECTORY events with DAG-style bottom-up processing.

    Flow:
    1. Receive UPSERT_DIRECTORY(root_dir) event
    2. Recursively dispatch all subdirectories
    3. Wait for subdirectory summaries (pending == 0)
    4. Generate parent summary with complete child information
    5. Use fallback when LLM fails
    6. Vectorize and store to database

    Usage:
        handler = DirectoryEventHandler(fs, llm, embedder, vector_index)
        result = handler.process_directory_event(event, ctx)
    """

    def __init__(
        self,
        fs: ContextFS,
        llm: LLM,
        embedder: Embedder,
        vector_index: VectorIndex,
        max_children: int = 50,
        max_concurrent_llm: int = 10,
    ):
        """Initialize handler.

        Args:
            fs: ContextFS for reading/writing nodes
            llm: LLM for generating summaries
            embedder: Embedder for vectorization
            vector_index: VectorIndex for upsert
            max_children: Max children per directory
            max_concurrent_llm: Max concurrent LLM calls
        """
        self._fs = fs
        self._llm = llm
        self._embedder = embedder
        self._vector_index = vector_index
        self._max_children = max_children
        self._max_concurrent_llm = max_concurrent_llm
        self._summarizer = DirectorySummarizer(fs, llm, max_children)

    def process_directory_event(
        self,
        event: OutboxEvent,
        ctx: RequestContext,
    ) -> DirectoryEventResult:
        """Process UPSERT_DIRECTORY event synchronously.

        Args:
            event: OutboxEvent with directory URI
            ctx: RequestContext for access control

        Returns:
            DirectoryEventResult with success status and stats
        """
        root_uri = event.uri

        if not is_directory_uri(root_uri):
            return DirectoryEventResult(
                success=False,
                root_uri=root_uri,
                stats=DagStats(),
                error_message=f"URI {root_uri} is not a directory",
            )

        try:
            result = asyncio.run(self._run_dag(root_uri, ctx))
            return result
        except Exception as e:
            logger.error("Failed to process directory event %s: %s", root_uri, e, exc_info=True)
            return DirectoryEventResult(
                success=False,
                root_uri=root_uri,
                stats=DagStats(),
                error_message=str(e),
            )

    async def _run_dag(
        self,
        root_uri: str,
        ctx: RequestContext,
    ) -> DirectoryEventResult:
        """Run DAG execution asynchronously."""
        self._nodes: dict[str, DirNode] = {}
        self._parent_map: dict[str, str | None] = {}
        self._root_done: asyncio.Event | None = None
        self._stats = DagStats()
        self._llm_sem = asyncio.Semaphore(self._max_concurrent_llm)

        self._root_done = asyncio.Event()
        self._nodes.clear()
        self._parent_map.clear()
        self._stats = DagStats()

        await self._dispatch_dir(root_uri, parent_uri=None, ctx=ctx)
        await self._root_done.wait()

        return DirectoryEventResult(
            success=True,
            root_uri=root_uri,
            stats=DagStats(
                total_dirs=self._stats.total_dirs,
                pending_dirs=self._stats.pending_dirs,
                completed_dirs=self._stats.completed_dirs,
                failed_dirs=self._stats.failed_dirs,
                record_ids=list(self._stats.record_ids),
                written_dir_uris=list(self._stats.written_dir_uris),
            ),
            record_ids=list(self._stats.record_ids),
            written_dir_uris=list(self._stats.written_dir_uris),
        )

    async def _dispatch_dir(
        self,
        dir_uri: str,
        parent_uri: str | None,
        ctx: RequestContext,
    ) -> None:
        """Dispatch a directory for processing.

        If already dispatched, skip.
        Otherwise, list children and schedule subtasks.
        """
        if dir_uri in self._nodes:
            return

        self._parent_map[dir_uri] = parent_uri

        try:
            children = self._fs.list_children(dir_uri, ctx)

            if len(children) > self._max_children:
                logger.warning(
                    "Directory %s has %d children, limiting to %d",
                    dir_uri, len(children), self._max_children
                )
                children = children[:self._max_children]

            children_dirs: list[str] = []
            file_children: list[str] = []

            for child_uri in children:
                if is_directory_uri(child_uri):
                    children_dirs.append(child_uri)
                else:
                    file_children.append(child_uri)

            child_index = {uri: idx for idx, uri in enumerate(children_dirs)}
            pending = len(children_dirs) + len(file_children)

            node = DirNode(
                uri=dir_uri,
                children_dirs=children_dirs,
                file_children=file_children,
                child_index=child_index,
                children_summaries=[None] * len(children_dirs),
                pending=pending,
                parent_uri=parent_uri,
            )
            self._nodes[dir_uri] = node
            self._stats.total_dirs += 1
            self._stats.pending_dirs += 1

            if pending == 0:
                self._schedule_summary(dir_uri, ctx)
                return

            for file_uri in file_children:
                asyncio.create_task(self._file_task(dir_uri, file_uri, ctx))

            for child_uri in children_dirs:
                asyncio.create_task(self._dispatch_dir(child_uri, dir_uri, ctx))

        except Exception as e:
            logger.error("Failed to dispatch directory %s: %s", dir_uri, e, exc_info=True)
            self._stats.failed_dirs += 1
            if parent_uri:
                await self._on_child_done(parent_uri, dir_uri, None, ctx)
            elif self._root_done:
                self._root_done.set()

    async def _file_task(
        self,
        parent_uri: str,
        file_uri: str,
        ctx: RequestContext,
    ) -> None:
        """Process a file child.

        Files are considered "done" immediately since they already have abstracts.
        """
        await self._on_file_done(parent_uri, file_uri, ctx)

    async def _on_file_done(
        self,
        parent_uri: str,
        file_uri: str,
        ctx: RequestContext,
    ) -> None:
        """Handle file completion, decrement parent pending."""
        node = self._nodes.get(parent_uri)
        if not node:
            return

        async with node.lock:
            node.pending -= 1
            if node.pending == 0 and not node.summary_scheduled:
                node.summary_scheduled = True
                asyncio.create_task(self._summary_task(parent_uri, ctx))

    async def _on_child_done(
        self,
        parent_uri: str,
        child_uri: str,
        child_summary: ChildSummary | None,
        ctx: RequestContext,
    ) -> None:
        """Handle child directory completion.

        Args:
            parent_uri: Parent directory URI
            child_uri: Child directory URI that just completed
            child_summary: Summary of the child (or None if failed)
            ctx: RequestContext
        """
        node = self._nodes.get(parent_uri)
        if not node:
            return

        async with node.lock:
            idx = node.child_index.get(child_uri)
            if idx is not None:
                node.children_summaries[idx] = child_summary
            node.pending -= 1
            if node.pending == 0 and not node.summary_scheduled:
                node.summary_scheduled = True
                asyncio.create_task(self._summary_task(parent_uri, ctx))

    def _schedule_summary(self, dir_uri: str, ctx: RequestContext) -> None:
        """Schedule summary generation for a directory."""
        node = self._nodes.get(dir_uri)
        if not node or node.summary_scheduled:
            return

        node.summary_scheduled = True
        asyncio.create_task(self._summary_task(dir_uri, ctx))

    def _collect_file_summaries(
        self,
        node: DirNode,
        ctx: RequestContext,
    ) -> list[ChildSummary]:
        """Collect summaries from file children."""
        summaries: list[ChildSummary] = []

        for file_uri in node.file_children:
            try:
                file_node = self._fs.read_node(file_uri, ctx)
                if file_node and file_node.abstract:
                    summaries.append(ChildSummary(
                        uri=file_uri,
                        name=file_uri.split("/")[-1],
                        abstract=file_node.abstract,
                        category=file_node.category or "unknown",
                        is_directory=False,
                        has_abstract=True,
                    ))
            except Exception as e:
                logger.warning("Failed to read file %s: %s", file_uri, e)

        return summaries

    def _collect_children_summaries(self, node: DirNode) -> list[ChildSummary]:
        """Collect summaries from child directories (already completed)."""
        summaries: list[ChildSummary] = []

        for idx, child_uri in enumerate(node.children_dirs):
            item = node.children_summaries[idx]
            if item is not None:
                summaries.append(item)
            else:
                summaries.append(ChildSummary(
                    uri=child_uri,
                    name=child_uri.rstrip("/").split("/")[-1],
                    abstract="",
                    category="directory",
                    is_directory=True,
                    has_abstract=False,
                ))

        return summaries

    async def _summary_task(self, dir_uri: str, ctx: RequestContext) -> None:
        """Generate summary for a directory.

        This is called only when all children (files + subdirs) are done.
        """
        node = self._nodes.get(dir_uri)
        if not node:
            return

        file_summaries = self._collect_file_summaries(node, ctx)
        children_summaries = self._collect_children_summaries(node)

        all_summaries = file_summaries + children_summaries
        valid_summaries = [s for s in all_summaries if s.has_abstract]

        try:
            async with self._llm_sem:
                summary = await asyncio.to_thread(
                    self._generate_summary_with_fallback,
                    dir_uri,
                    valid_summaries,
                )

            owner_space = self._extract_owner_space_from_uri(dir_uri, ctx)

            dir_node = ContextNode(
                uri=dir_uri,
                context_type="MEMORY",
                category=dir_uri.rstrip("/").split("/")[-1] or "directory",
                level=0,
                owner_space=owner_space,
                abstract=summary.abstract,
                overview=summary.overview,
                content="",
                metadata={
                    "node_type": "directory",
                    "child_count": summary.child_count,
                    "categories": summary.categories,
                },
            )

            await asyncio.to_thread(self._fs.write_node, dir_node, ctx)
            self._stats.written_dir_uris.append(dir_uri)
            logger.info("Generated summary for directory %s", dir_uri)

            await self._vectorize_directory(dir_uri, dir_node, summary, ctx)

            self._stats.completed_dirs += 1
            self._stats.pending_dirs = max(0, self._stats.pending_dirs - 1)

            child_summary = ChildSummary(
                uri=dir_uri,
                name=dir_uri.rstrip("/").split("/")[-1],
                abstract=summary.abstract,
                category="directory",
                is_directory=True,
                has_abstract=True,
            )

        except Exception as e:
            logger.error("Failed to generate summary for %s: %s", dir_uri, e, exc_info=True)
            self._stats.failed_dirs += 1
            self._stats.pending_dirs = max(0, self._stats.pending_dirs - 1)
            child_summary = None

        parent_uri = self._parent_map.get(dir_uri)
        if parent_uri is None:
            if self._root_done:
                self._root_done.set()
            return

        await self._on_child_done(parent_uri, dir_uri, child_summary, ctx)

    def _extract_owner_space_from_uri(self, uri: str, ctx: RequestContext) -> str:
        """Extract owner_space from URI.

        URI patterns:
        - ctx://{account}/users/{user}/memories/... → "user:{user}"
        - ctx://{account}/agents/{agent}/memories/... → "agent:{agent}"

        Args:
            uri: ContextNode URI
            ctx: RequestContext (fallback to user_space if URI parsing fails)

        Returns:
            owner_space string
        """
        parts = uri.split("/")
        if len(parts) >= 4:
            if parts[3] == "users" and len(parts) >= 5:
                return f"user:{parts[4]}"
            elif parts[3] == "agents" and len(parts) >= 5:
                return f"agent:{parts[4]}"

        return ctx.user_space_name()

    def _generate_summary_with_fallback(
        self,
        dir_uri: str,
        child_summaries: list[ChildSummary],
    ) -> DirectorySummary:
        """Generate summary with fallback when LLM fails.

        Reuses DirectorySummarizer's fallback methods for consistency.

        Args:
            dir_uri: Directory URI
            child_summaries: List of child summaries

        Returns:
            DirectorySummary (from LLM or fallback)
        """
        if not child_summaries:
            return DirectorySummary(
                abstract="空目录",
                overview="",
                child_count=0,
                categories=[],
            )

        summaries_dict = [
            {
                "uri": s.uri,
                "abstract": s.abstract,
                "category": s.category,
            }
            for s in child_summaries
        ]

        try:
            summary = self._summarizer._generate_summary(dir_uri, summaries_dict)
            return summary
        except Exception as e:
            logger.warning("LLM failed for %s, using fallback: %s", dir_uri, e)
            abstract = self._summarizer._fallback_abstract(summaries_dict)
            overview = self._summarizer._fallback_overview(summaries_dict)
            categories = list(set(s.category for s in child_summaries))
            return DirectorySummary(
                abstract=abstract[:200],
                overview=overview,
                child_count=len(child_summaries),
                categories=categories,
            )

    async def _vectorize_directory(
        self,
        dir_uri: str,
        dir_node: ContextNode,
        summary: DirectorySummary,
        ctx: RequestContext,
    ) -> None:
        """Vectorize directory summary and upsert to index."""
        owner_space = dir_node.owner_space
        records: list[IndexRecord] = []

        l0_record = build_single_record(
            uri=dir_uri,
            level=0,
            text=summary.abstract,
            account_id=ctx.account_id,
            owner_space=owner_space,
            category=dir_node.category,
            context_type=dir_node.context_type,
        )

        new_metadata = dict(l0_record.metadata)
        new_metadata["node_type"] = "directory"
        new_metadata["parent_uri"] = dir_node.parent_uri
        new_metadata["child_count"] = summary.child_count
        new_metadata["categories"] = summary.categories
        l0_record = IndexRecord(
            id=l0_record.id,
            uri=l0_record.uri,
            level=l0_record.level,
            text=l0_record.text,
            filters=l0_record.filters,
            metadata=new_metadata,
        )
        records.append(l0_record)

        if summary.overview:
            l1_record = build_single_record(
                uri=dir_uri,
                level=1,
                text=summary.overview,
                account_id=ctx.account_id,
                owner_space=owner_space,
                category=dir_node.category,
                context_type=dir_node.context_type,
            )

            new_metadata = dict(l1_record.metadata)
            new_metadata["node_type"] = "directory"
            new_metadata["parent_uri"] = dir_node.parent_uri
            new_metadata["child_count"] = summary.child_count
            l1_record = IndexRecord(
                id=l1_record.id,
                uri=l1_record.uri,
                level=l1_record.level,
                text=l1_record.text,
                filters=l1_record.filters,
                metadata=new_metadata,
            )
            records.append(l1_record)

        texts = [r.text for r in records]
        embeddings = await asyncio.to_thread(self._embedder.embed_texts, texts)

        augmented_records = []
        for record, embedding in zip(records, embeddings):
            new_metadata = dict(record.metadata)
            new_metadata["_embedding"] = embedding
            augmented_records.append(IndexRecord(
                id=record.id,
                uri=record.uri,
                level=record.level,
                text=record.text,
                filters=record.filters,
                metadata=new_metadata,
            ))

        await asyncio.to_thread(self._vector_index.upsert, augmented_records)
        self._stats.record_ids.extend(r.id for r in augmented_records)
        logger.info("Vectorized directory %s with %d records", dir_uri, len(records))