"""IndexRecordBuilder: Expand ContextNode into three-level IndexRecords.
Each ContextNode expands to 3 IndexRecords (L0/L1/L2) for layered retrieval.
Directory summaries (via DirectoryEventHandler) keep L0/L1 for hierarchical retrieval.
CRITICAL: All IndexRecords MUST contain account_id + owner_space in filters.
This is the LAST layer of tenant isolation at the vector level.
Missing filters cause SILENT cross-tenant leakage (no runtime error).
"""
import re
from typing import Final
from core.models import ContextNode, IndexRecord
_URI_ACCOUNT_PATTERN: Final[re.Pattern[str]] = re.compile(
r"^ctx://([^/]+)/"
)
_REQUIRED_FILTER_KEYS: Final[tuple[str, ...]] = ("account_id", "owner_space")
class FilterValidationError(Exception):
"""Raised when IndexRecord filters fail validation."""
def __init__(self, missing_keys: tuple[str, ...]) -> None:
self.missing_keys = missing_keys
super().__init__(
f"IndexRecord.filters missing required keys: {missing_keys}. "
"This causes SILENT cross-tenant leakage!"
)
def _extract_account_id(uri: str) -> str:
"""Extract account_id from ContextNode URI.
Args:
uri: ContextNode URI like "ctx://acme/users/alice/memories/profile"
Returns:
Account ID (e.g., "acme")
Raises:
ValueError: If URI format is invalid
"""
match = _URI_ACCOUNT_PATTERN.match(uri)
if not match:
raise ValueError(f"Invalid URI format, cannot extract account_id: {uri}")
return match.group(1)
def build_index_records(node: ContextNode) -> list[IndexRecord]:
"""Expand ContextNode into 3 IndexRecords (L0/L1/L2).
Each IndexRecord corresponds to a content layer:
- Level 0: abstract (L0) - first-pass recall
- Level 1: overview (L1) - agent decision layer
- Level 2: content (L2) - deep reading layer
CRITICAL: All records include account_id + owner_space in filters
to prevent cross-tenant leakage at the vector index level.
"""
account_id = _extract_account_id(node.uri)
base_filters: dict[str, str | int | float | bool] = {
"account_id": account_id,
"owner_space": node.owner_space,
"category": node.category,
"context_type": node.context_type,
}
base_metadata: dict[str, str | bool] = {
"category": node.category,
"context_type": node.context_type,
"parent_uri": node.parent_uri,
"has_overview": bool(node.overview),
"has_content": bool(node.content),
}
for key in ("when", "who", "where", "routing_key"):
val = node.metadata.get(key)
if val:
base_metadata[key] = val
records: list[IndexRecord] = []
records.append(IndexRecord(
id=IndexRecord.generate_id(node.uri, 0),
uri=node.uri,
level=0,
text=node.abstract,
filters=dict(base_filters),
metadata={"level": "abstract", **base_metadata},
))
if node.content:
l2_uri = node.uri.rstrip("/") + "/content.md"
l2_metadata = {
**base_metadata,
"parent_uri": node.uri,
}
records.append(IndexRecord(
id=IndexRecord.generate_id(l2_uri, 2),
uri=l2_uri,
level=2,
text=node.content,
filters=dict(base_filters),
metadata={"level": "content", **l2_metadata},
))
_validate_filters(records)
return records
def _validate_filters(records: list[IndexRecord]) -> None:
"""Validate that all IndexRecords have required filter keys.
This is the LAST line of defense against cross-tenant leakage.
Missing filters cause SILENT leakage (no runtime error in vector search).
Args:
records: IndexRecords to validate
Raises:
FilterValidationError: If any record is missing required keys
"""
for record in records:
missing_keys = tuple(
key for key in _REQUIRED_FILTER_KEYS
if key not in record.filters
)
if missing_keys:
raise FilterValidationError(missing_keys)
def build_single_record(
uri: str,
level: int,
text: str,
account_id: str,
owner_space: str,
category: str,
context_type: str,
) -> IndexRecord:
"""Build a single IndexRecord with explicit parameters.
Utility function for building individual records.
Used by OutboxWorker when processing events.
Args:
uri: ContextNode URI
level: Index level (0, 1, or 2)
text: Text content for embedding
account_id: Tenant ID (REQUIRED for isolation)
owner_space: user_space or agent_space (REQUIRED for isolation)
category: Memory category (profile, preference, etc.)
context_type: MEMORY | SKILL | RESOURCE
Returns:
IndexRecord with mandatory filters
Raises:
FilterValidationError: If account_id or owner_space is empty
"""
if not account_id:
raise FilterValidationError(("account_id",))
if not owner_space:
raise FilterValidationError(("owner_space",))
return IndexRecord(
id=IndexRecord.generate_id(uri, level),
uri=uri,
level=level,
text=text,
filters={
"account_id": account_id,
"owner_space": owner_space,
"category": category,
"context_type": context_type,
},
metadata={
"level": str(level),
"category": category,
"context_type": context_type,
},
)
def build_record_id(uri: str, level: int) -> str:
"""Build IndexRecord ID from URI and level.
Args:
uri: ContextNode URI
level: Index level (0, 1, or 2)
Returns:
IndexRecord ID (sha256 hash)
"""
return IndexRecord.generate_id(uri, level)