"""Implementation of :meth:`AIAgent.__init__` — extracted as a module function.
``AIAgent.__init__`` is one of the longest methods in the codebase (60+
parameters, ~1,400 lines of attribute initialization, provider
auto-detection, credential resolution, context-engine bootstrap, etc.).
Keeping it in ``run_agent.py`` bloats that file with code that's mostly
"setup state, then forget".
After this extraction the body lives here as ``init_agent(agent, ...)``
and :meth:`AIAgent.__init__` is a thin wrapper that calls
``init_agent(self, ...)``. All imports the body needs at module-load
time are listed below; the body also performs many lazy imports inside
its own scope that come along unchanged.
Symbols that tests patch on ``run_agent.*`` (``OpenAI``, ``cleanup_vm``,
etc.) are resolved through :func:`_ra` so the patch contract is
preserved.
"""
from __future__ import annotations
import logging
import os
import re
import sys
import threading
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse, parse_qs, urlunparse
from agent.context_compressor import ContextCompressor
from agent.iteration_budget import IterationBudget
from agent.memory_manager import StreamingContextScrubber
from agent.model_metadata import (
MINIMUM_CONTEXT_LENGTH,
fetch_model_metadata,
get_model_context_length,
is_local_endpoint,
query_ollama_num_ctx,
)
from agent.process_bootstrap import _install_safe_stdio
from agent.subdirectory_hints import SubdirectoryHintTracker
from agent.think_scrubber import StreamingThinkScrubber
from agent.tool_guardrails import (
ToolCallGuardrailConfig,
ToolCallGuardrailController,
ToolGuardrailDecision,
)
from hermes_cli.config import cfg_get
from hermes_cli.timeouts import get_provider_request_timeout
from hermes_constants import get_hermes_home
from model_tools import check_toolset_requirements, get_tool_definitions
from utils import base_url_host_matches
logger = logging.getLogger("run_agent")
def _ra():
"""Lazy reference to ``run_agent`` so callers can patch
``run_agent.OpenAI`` / ``run_agent.cleanup_vm`` / ... and have those
patches reach this code path.
"""
import run_agent
return run_agent
def init_agent(
agent,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
acp_command: str = None,
acp_args: list[str] | None = None,
command: str = None,
args: list[str] | None = None,
model: str = "",
max_iterations: int = 90,
tool_delay: float = 1.0,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
quiet_mode: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
log_prefix: str = "",
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
provider_require_parameters: bool = False,
provider_data_collection: str = None,
openrouter_min_coding_score: Optional[float] = None,
session_id: str = None,
tool_progress_callback: callable = None,
tool_start_callback: callable = None,
tool_complete_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
interim_assistant_callback: callable = None,
tool_gen_callback: callable = None,
status_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
service_tier: str = None,
request_overrides: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
user_id: str = None,
user_name: str = None,
chat_id: str = None,
chat_name: str = None,
chat_type: str = None,
thread_id: str = None,
gateway_session_key: str = None,
skip_context_files: bool = False,
load_soul_identity: bool = False,
skip_memory: bool = False,
session_db=None,
parent_session_id: str = None,
iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None,
credential_pool=None,
checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 20,
checkpoint_max_total_size_mb: int = 500,
checkpoint_max_file_size_mb: int = 10,
pass_session_id: bool = False,
):
"""
Initialize the AI Agent.
Args:
base_url (str): Base URL for the model API (optional)
api_key (str): API key for authentication (optional, uses env var if not provided)
provider (str): Provider identifier (optional; used for telemetry/routing hints)
api_mode (str): API mode override: "chat_completions" or "codex_responses"
model (str): Model name to use (default: "anthropic/claude-opus-4.6")
max_iterations (int): Maximum number of tool calling iterations (default: 90)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
enabled_toolsets (List[str]): Only enable tools from these toolsets (optional)
disabled_toolsets (List[str]): Disable tools from these toolsets (optional)
save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False)
verbose_logging (bool): Enable verbose logging for debugging (default: False)
quiet_mode (bool): Suppress progress output for clean CLI experience (default: False)
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 100)
log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "")
providers_allowed (List[str]): OpenRouter providers to allow (optional)
providers_ignored (List[str]): OpenRouter providers to ignore (optional)
providers_order (List[str]): OpenRouter providers to try in order (optional)
provider_sort (str): Sort providers by price/throughput/latency (optional)
openrouter_min_coding_score (float): Coding-score floor (0.0-1.0) for the
openrouter/pareto-code router. Only applied when model == "openrouter/pareto-code".
None or empty = let OpenRouter pick the strongest available coder.
session_id (str): Pre-generated session ID for logging (optional, auto-generated if not provided)
tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications
clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions.
Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error.
max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set)
reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking).
If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning.
prefill_messages (List[Dict]): Messages to prepend to conversation history as prefilled context.
Useful for injecting a few-shot example or priming the model's response style.
Example: [{"role": "user", "content": "Hi!"}, {"role": "assistant", "content": "Hello!"}]
NOTE: Anthropic Sonnet 4.6+ and Opus 4.6+ reject a conversation that ends on an
assistant-role message (400 error). For those models use structured outputs or
output_config.format instead of a trailing-assistant prefill.
platform (str): The interface platform the user is on (e.g. "cli", "telegram", "discord", "whatsapp").
Used to inject platform-specific formatting hints into the system prompt.
skip_context_files (bool): If True, skip auto-injection of SOUL.md, AGENTS.md, and .cursorrules
into the system prompt. Use this for batch processing and data generation to avoid
polluting trajectories with user-specific persona or project instructions.
load_soul_identity (bool): If True, still use ~/.hermes/SOUL.md as the primary
identity even when skip_context_files=True. Project context files from the cwd
remain skipped.
"""
_install_safe_stdio()
agent.model = model
agent.max_iterations = max_iterations
agent.iteration_budget = iteration_budget or IterationBudget(max_iterations)
agent.tool_delay = tool_delay
agent.save_trajectories = save_trajectories
agent.verbose_logging = verbose_logging
agent.quiet_mode = quiet_mode
agent.ephemeral_system_prompt = ephemeral_system_prompt
agent.platform = platform
agent._user_id = user_id
agent._user_name = user_name
agent._chat_id = chat_id
agent._chat_name = chat_name
agent._chat_type = chat_type
agent._thread_id = thread_id
agent._gateway_session_key = gateway_session_key
agent._print_fn = None
agent.background_review_callback = None
agent.skip_context_files = skip_context_files
agent.load_soul_identity = load_soul_identity
agent.pass_session_id = pass_session_id
agent._credential_pool = credential_pool
agent.log_prefix_chars = log_prefix_chars
agent.log_prefix = f"{log_prefix} " if log_prefix else ""
agent.base_url = base_url or ""
provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None
agent.provider = provider_name or ""
agent.acp_command = acp_command or command
agent.acp_args = list(acp_args or args or [])
if api_mode in {"chat_completions", "codex_responses", "anthropic_messages", "bedrock_converse", "codex_app_server"}:
agent.api_mode = api_mode
elif agent.provider == "openai-codex":
agent.api_mode = "codex_responses"
elif agent.provider in {"xai", "xai-oauth"}:
agent.api_mode = "codex_responses"
elif (provider_name is None) and (
agent._base_url_hostname == "chatgpt.com"
and "/backend-api/codex" in agent._base_url_lower
):
agent.api_mode = "codex_responses"
agent.provider = "openai-codex"
elif (provider_name is None) and agent._base_url_hostname == "api.x.ai":
agent.api_mode = "codex_responses"
agent.provider = "xai"
elif agent.provider == "anthropic" or (provider_name is None and agent._base_url_hostname == "api.anthropic.com"):
agent.api_mode = "anthropic_messages"
agent.provider = "anthropic"
elif agent._base_url_lower.rstrip("/").endswith("/anthropic"):
agent.api_mode = "anthropic_messages"
elif agent.provider == "bedrock" or (
agent._base_url_hostname.startswith("bedrock-runtime.")
and base_url_host_matches(agent._base_url_lower, "amazonaws.com")
):
agent.api_mode = "bedrock_converse"
else:
agent.api_mode = "chat_completions"
try:
agent._get_transport()
except Exception:
pass
try:
from hermes_cli.model_normalize import (
_AGGREGATOR_PROVIDERS,
normalize_model_for_provider,
)
if agent.provider not in _AGGREGATOR_PROVIDERS:
agent.model = normalize_model_for_provider(agent.model, agent.provider)
except Exception:
pass
if (
api_mode is None
and agent.api_mode == "chat_completions"
and agent.provider != "copilot-acp"
and not str(agent.base_url or "").lower().startswith("acp://copilot")
and not str(agent.base_url or "").lower().startswith("acp+tcp://")
and not agent._is_azure_openai_url()
and (
agent._is_direct_openai_url()
or agent._provider_model_requires_responses_api(
agent.model,
provider=agent.provider,
)
)
):
agent.api_mode = "codex_responses"
if hasattr(agent, "_transport_cache"):
agent._transport_cache.clear()
if (agent.provider == "openrouter" or agent._is_openrouter_url()) and \
not _ra()._openrouter_prewarm_done.is_set():
_ra()._openrouter_prewarm_done.set()
threading.Thread(
target=fetch_model_metadata,
daemon=True,
name="openrouter-prewarm",
).start()
agent.tool_progress_callback = tool_progress_callback
agent.tool_start_callback = tool_start_callback
agent.tool_complete_callback = tool_complete_callback
agent.suppress_status_output = False
agent.thinking_callback = thinking_callback
agent.reasoning_callback = reasoning_callback
agent.clarify_callback = clarify_callback
agent.step_callback = step_callback
agent.stream_delta_callback = stream_delta_callback
agent.interim_assistant_callback = interim_assistant_callback
agent.status_callback = status_callback
agent.tool_gen_callback = tool_gen_callback
agent._executing_tools = False
agent._tool_guardrails = ToolCallGuardrailController()
agent._tool_guardrail_halt_decision: ToolGuardrailDecision | None = None
agent._interrupt_requested = False
agent._interrupt_message = None
agent._execution_thread_id: int | None = None
agent._interrupt_thread_signal_pending = False
agent._client_lock = threading.RLock()
agent._pending_steer: Optional[str] = None
agent._pending_steer_lock = threading.Lock()
agent._tool_worker_threads: set[int] = set()
agent._tool_worker_threads_lock = threading.Lock()
agent._delegate_depth = 0
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.providers_allowed = providers_allowed
agent.providers_ignored = providers_ignored
agent.providers_order = providers_order
agent.provider_sort = provider_sort
agent.provider_require_parameters = provider_require_parameters
agent.provider_data_collection = provider_data_collection
agent.openrouter_min_coding_score = openrouter_min_coding_score
agent.enabled_toolsets = enabled_toolsets
agent.disabled_toolsets = disabled_toolsets
agent.max_tokens = max_tokens
agent.reasoning_config = reasoning_config
agent.service_tier = service_tier
agent.request_overrides = dict(request_overrides or {})
agent.prefill_messages = prefill_messages or []
agent._force_ascii_payload = False
agent._use_prompt_caching, agent._use_native_cache_layout = (
agent._anthropic_prompt_cache_policy()
)
agent._cache_ttl = "5m"
try:
from hermes_cli.config import load_config as _load_pc_cfg
_pc_cfg = _load_pc_cfg().get("prompt_caching", {}) or {}
_ttl = _pc_cfg.get("cache_ttl", "5m")
if _ttl in {"5m", "1h"}:
agent._cache_ttl = _ttl
except Exception:
pass
agent._budget_exhausted_injected = False
agent._budget_grace_call = False
agent._last_activity_ts: float = time.time()
agent._last_activity_desc: str = "initializing"
agent._current_tool: str | None = None
agent._api_call_count: int = 0
agent._rate_limit_state: Optional["RateLimitState"] = None
agent._or_cache_hits: int = 0
from hermes_logging import setup_logging, setup_verbose_logging
setup_logging(hermes_home=_ra()._hermes_home)
if agent.verbose_logging:
setup_verbose_logging()
_ra().logger.info("Verbose logging enabled (third-party library logs suppressed)")
elif agent.quiet_mode:
pass
agent._stream_callback = None
agent._stream_needs_break = False
agent._stream_context_scrubber = StreamingContextScrubber()
agent._stream_think_scrubber = StreamingThinkScrubber()
agent._current_streamed_assistant_text = ""
agent._persist_user_message_idx = None
agent._persist_user_message_override = None
agent._anthropic_image_fallback_cache: Dict[str, str] = {}
agent._anthropic_client = None
agent._is_anthropic_oauth = False
_provider_timeout = get_provider_request_timeout(agent.provider, agent.model)
if agent.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, resolve_anthropic_token
_is_bedrock_anthropic = agent.provider == "bedrock"
if _is_bedrock_anthropic:
from agent.anthropic_adapter import build_anthropic_bedrock_client
_region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "")
_br_region = _region_match.group(1) if _region_match else "us-east-1"
agent._bedrock_region = _br_region
agent._anthropic_client = build_anthropic_bedrock_client(_br_region)
agent._anthropic_api_key = "aws-sdk"
agent._anthropic_base_url = base_url
agent._is_anthropic_oauth = False
agent.api_key = "aws-sdk"
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock + AnthropicBedrock SDK, {_br_region})")
else:
_is_native_anthropic = agent.provider == "anthropic"
effective_key = (api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or "")
agent.api_key = effective_key
agent._anthropic_api_key = effective_key
agent._anthropic_base_url = base_url
from agent.anthropic_adapter import _is_oauth_token as _is_oat
agent._is_anthropic_oauth = _is_oat(effective_key) if _is_native_anthropic else False
agent._anthropic_client = build_anthropic_client(effective_key, base_url, timeout=_provider_timeout)
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model} (Anthropic native)")
from agent.azure_identity_adapter import is_token_provider
if is_token_provider(effective_key):
print("🔑 Using credentials: Microsoft Entra ID")
elif isinstance(effective_key, str) and len(effective_key) > 12:
print(f"🔑 Using token: {effective_key[:8]}...{effective_key[-4:]}")
elif agent.api_mode == "bedrock_converse":
_region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "")
agent._bedrock_region = _region_match.group(1) if _region_match else "us-east-1"
agent._bedrock_guardrail_config = None
try:
from hermes_cli.config import load_config as _load_br_cfg
_gr = _load_br_cfg().get("bedrock", {}).get("guardrail", {})
if _gr.get("guardrail_identifier") and _gr.get("guardrail_version"):
agent._bedrock_guardrail_config = {
"guardrailIdentifier": _gr["guardrail_identifier"],
"guardrailVersion": _gr["guardrail_version"],
}
if _gr.get("stream_processing_mode"):
agent._bedrock_guardrail_config["streamProcessingMode"] = _gr["stream_processing_mode"]
if _gr.get("trace"):
agent._bedrock_guardrail_config["trace"] = _gr["trace"]
except Exception:
pass
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
_gr_label = " + Guardrails" if agent._bedrock_guardrail_config else ""
print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock, {agent._bedrock_region}{_gr_label})")
else:
if api_key and base_url:
_parsed_url = urlparse(base_url)
if _parsed_url.query:
_clean_url = urlunparse(_parsed_url._replace(query=""))
_query_params = {
k: v[0] for k, v in parse_qs(_parsed_url.query).items()
}
client_kwargs = {
"api_key": api_key,
"base_url": _clean_url,
"default_query": _query_params,
}
else:
client_kwargs = {"api_key": api_key, "base_url": base_url}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
if agent.provider == "copilot-acp":
client_kwargs["command"] = agent.acp_command
client_kwargs["args"] = agent.acp_args
effective_base = base_url
if base_url_host_matches(effective_base, "openrouter.ai"):
from agent.auxiliary_client import build_or_headers
client_kwargs["default_headers"] = build_or_headers()
elif base_url_host_matches(effective_base, "integrate.api.nvidia.com"):
from agent.auxiliary_client import build_nvidia_nim_headers
client_kwargs["default_headers"] = build_nvidia_nim_headers(effective_base)
elif base_url_host_matches(effective_base, "api.routermint.com"):
client_kwargs["default_headers"] = _ra()._routermint_headers()
elif base_url_host_matches(effective_base, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers
client_kwargs["default_headers"] = copilot_default_headers()
elif base_url_host_matches(effective_base, "api.kimi.com"):
client_kwargs["default_headers"] = {
"User-Agent": "claude-code/0.1.0",
}
elif base_url_host_matches(effective_base, "portal.qwen.ai"):
client_kwargs["default_headers"] = _ra()._qwen_portal_headers()
elif base_url_host_matches(effective_base, "chatgpt.com"):
from agent.auxiliary_client import _codex_cloudflare_headers
client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key)
elif "default_headers" not in client_kwargs:
try:
from providers import get_provider_profile as _gpf
_ph = _gpf(agent.provider)
if _ph and _ph.default_headers:
client_kwargs["default_headers"] = dict(_ph.default_headers)
except Exception:
pass
else:
from agent.auxiliary_client import resolve_provider_client
_routed_client, _ = resolve_provider_client(
agent.provider or "auto", model=agent.model, raw_codex=True)
if _routed_client is not None:
client_kwargs = {
"api_key": _routed_client.api_key,
"base_url": str(_routed_client.base_url),
}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
_routed_headers = getattr(_routed_client, "_custom_headers", None)
if not _routed_headers:
_routed_headers = getattr(_routed_client, "_default_headers", None)
if _routed_headers:
client_kwargs["default_headers"] = dict(_routed_headers)
else:
_explicit = (agent.provider or "").strip().lower()
if _explicit and _explicit not in {"auto", "openrouter", "custom"}:
_env_hint = f"{_explicit.upper()}_API_KEY"
try:
from hermes_cli.auth import PROVIDER_REGISTRY
_pcfg = PROVIDER_REGISTRY.get(_explicit)
if _pcfg and _pcfg.api_key_env_vars:
_env_hint = _pcfg.api_key_env_vars[0]
except Exception:
pass
_fb_entries = []
if isinstance(fallback_model, list):
_fb_entries = [
f for f in fallback_model
if isinstance(f, dict) and f.get("provider") and f.get("model")
]
elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"):
_fb_entries = [fallback_model]
_fb_resolved = False
for _fb in _fb_entries:
_fb_explicit_key = (_fb.get("api_key") or "").strip() or None
if not _fb_explicit_key:
_fb_key_env = (_fb.get("key_env") or _fb.get("api_key_env") or "").strip()
if _fb_key_env:
_fb_explicit_key = os.getenv(_fb_key_env, "").strip() or None
_fb_client, _fb_model = resolve_provider_client(
_fb["provider"], model=_fb["model"], raw_codex=True,
explicit_base_url=_fb.get("base_url"),
explicit_api_key=_fb_explicit_key,
)
if _fb_client is not None:
agent.provider = _fb["provider"]
agent.model = _fb_model or _fb["model"]
agent._fallback_activated = True
client_kwargs = {
"api_key": _fb_client.api_key,
"base_url": str(_fb_client.base_url),
}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
_fb_headers = getattr(_fb_client, "_custom_headers", None)
if not _fb_headers:
_fb_headers = getattr(_fb_client, "_default_headers", None)
if _fb_headers:
client_kwargs["default_headers"] = dict(_fb_headers)
_fb_resolved = True
break
if not _fb_resolved:
raise RuntimeError(
f"Provider '{_explicit}' is set in config.yaml but no API key "
f"was found. Set the {_env_hint} environment "
f"variable, or switch to a different provider with `hermes model`."
)
if not getattr(agent, "_fallback_activated", False):
raise RuntimeError(
"No LLM provider configured. Run `hermes model` to "
"select a provider, or run `hermes setup` for first-time "
"configuration."
)
agent._client_kwargs = client_kwargs
_effective_base = str(client_kwargs.get("base_url", "")).lower()
if base_url_host_matches(_effective_base, "openrouter.ai") and "claude" in (agent.model or "").lower():
headers = client_kwargs.get("default_headers") or {}
existing_beta = headers.get("x-anthropic-beta", "")
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
if _FINE_GRAINED not in existing_beta:
if existing_beta:
headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}"
else:
headers["x-anthropic-beta"] = _FINE_GRAINED
client_kwargs["default_headers"] = headers
agent.api_key = client_kwargs.get("api_key", "")
agent.base_url = client_kwargs.get("base_url", agent.base_url)
try:
agent.client = agent._create_openai_client(client_kwargs, reason="agent_init", shared=True)
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model}")
if base_url:
print(f"🔗 Using custom base URL: {base_url}")
from agent.azure_identity_adapter import is_token_provider
key_used = client_kwargs.get("api_key", "none")
if is_token_provider(key_used):
print("🔑 Using credentials: Microsoft Entra ID")
elif isinstance(key_used, str) and key_used and key_used != "dummy-key" and len(key_used) > 12:
print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}")
else:
print("⚠️ Warning: API key appears invalid or missing")
except Exception as e:
raise RuntimeError(f"Failed to initialize OpenAI client: {e}")
if isinstance(fallback_model, list):
agent._fallback_chain = [
f for f in fallback_model
if isinstance(f, dict) and f.get("provider") and f.get("model")
]
elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"):
agent._fallback_chain = [fallback_model]
else:
agent._fallback_chain = []
agent._fallback_index = 0
agent._fallback_activated = getattr(agent, "_fallback_activated", False)
agent._fallback_model = agent._fallback_chain[0] if agent._fallback_chain else None
if agent._fallback_chain and not agent.quiet_mode:
if len(agent._fallback_chain) == 1:
fb = agent._fallback_chain[0]
print(f"🔄 Fallback model: {fb['model']} ({fb['provider']})")
else:
print(f"🔄 Fallback chain ({len(agent._fallback_chain)} providers): " +
" → ".join(f"{f['model']} ({f['provider']})" for f in agent._fallback_chain))
agent.tools = _ra().get_tool_definitions(
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
quiet_mode=agent.quiet_mode,
)
agent.valid_tool_names = set()
if agent.tools:
agent.valid_tool_names = {tool["function"]["name"] for tool in agent.tools}
tool_names = sorted(agent.valid_tool_names)
if not agent.quiet_mode:
print(f"🛠️ Loaded {len(agent.tools)} tools: {', '.join(tool_names)}")
if enabled_toolsets:
print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}")
if disabled_toolsets:
print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}")
elif not agent.quiet_mode:
print("🛠️ No tools loaded (all tools filtered out or unavailable)")
from agent.prompt_builder import KANBAN_GUIDANCE
agent._kanban_worker_guidance = (
KANBAN_GUIDANCE if "kanban_show" in agent.valid_tool_names else ""
)
if agent.tools and not agent.quiet_mode:
requirements = _ra().check_toolset_requirements()
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
if agent.save_trajectories and not agent.quiet_mode:
print("📝 Trajectory saving enabled")
if agent.ephemeral_system_prompt and not agent.quiet_mode:
prompt_preview = agent.ephemeral_system_prompt[:60] + "..." if len(agent.ephemeral_system_prompt) > 60 else agent.ephemeral_system_prompt
print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)")
if agent._use_prompt_caching and not agent.quiet_mode:
if agent._use_native_cache_layout and agent.provider == "anthropic":
source = "native Anthropic"
elif agent._use_native_cache_layout:
source = "Anthropic-compatible endpoint"
else:
source = "Claude via OpenRouter"
print(f"💾 Prompt caching: ENABLED ({source}, {agent._cache_ttl} TTL)")
agent.session_start = datetime.now()
if session_id:
agent.session_id = session_id
else:
timestamp_str = agent.session_start.strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:6]
agent.session_id = f"{timestamp_str}_{short_uuid}"
os.environ["HERMES_SESSION_ID"] = agent.session_id
try:
from gateway.session_context import _SESSION_ID
_SESSION_ID.set(agent.session_id)
except Exception:
pass
hermes_home = get_hermes_home()
agent.logs_dir = hermes_home / "sessions"
agent.logs_dir.mkdir(parents=True, exist_ok=True)
agent._session_json_enabled = False
try:
from hermes_cli.config import load_config as _load_sess_cfg
_sess_cfg = (_load_sess_cfg().get("sessions") or {})
agent._session_json_enabled = bool(_sess_cfg.get("write_json_snapshots", False))
except Exception:
pass
agent._session_messages: List[Dict[str, Any]] = []
agent._memory_write_origin = "assistant_tool"
agent._memory_write_context = "foreground"
agent._cached_system_prompt: Optional[str] = None
from tools.checkpoint_manager import CheckpointManager
agent._checkpoint_mgr = CheckpointManager(
enabled=checkpoints_enabled,
max_snapshots=checkpoint_max_snapshots,
max_total_size_mb=checkpoint_max_total_size_mb,
max_file_size_mb=checkpoint_max_file_size_mb,
)
agent._session_db = session_db
agent._parent_session_id = parent_session_id
agent._last_flushed_db_idx = 0
agent._session_db_created = False
agent._session_init_model_config = {
"max_iterations": agent.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
}
from tools.todo_tool import TodoStore
agent._todo_store = TodoStore()
try:
from hermes_cli.config import load_config as _load_agent_config
_agent_cfg = _load_agent_config()
except Exception:
_agent_cfg = {}
try:
agent._tool_guardrails = ToolCallGuardrailController(
ToolCallGuardrailConfig.from_mapping(
_agent_cfg.get("tool_loop_guardrails", {})
)
)
except Exception as _tlg_err:
_ra().logger.warning("Tool loop guardrail config ignored: %s", _tlg_err)
agent._aux_compression_context_length_config = None
agent._memory_store = None
agent._memory_enabled = False
agent._user_profile_enabled = False
agent._memory_nudge_interval = 10
agent._turns_since_memory = 0
agent._iters_since_skill = 0
if not skip_memory:
try:
mem_config = _agent_cfg.get("memory", {})
agent._memory_enabled = mem_config.get("memory_enabled", False)
agent._user_profile_enabled = mem_config.get("user_profile_enabled", False)
agent._memory_nudge_interval = int(mem_config.get("nudge_interval", 10))
if agent._memory_enabled or agent._user_profile_enabled:
from tools.memory_tool import MemoryStore
agent._memory_store = MemoryStore(
memory_char_limit=mem_config.get("memory_char_limit", 2200),
user_char_limit=mem_config.get("user_char_limit", 1375),
)
agent._memory_store.load_from_disk()
except Exception:
pass
agent._memory_manager = None
if not skip_memory:
try:
_mem_provider_name = mem_config.get("provider", "") if mem_config else ""
if _mem_provider_name and _mem_provider_name.strip():
from agent.memory_manager import MemoryManager as _MemoryManager
from plugins.memory import load_memory_provider as _load_mem
agent._memory_manager = _MemoryManager()
_mp = _load_mem(_mem_provider_name)
if _mp and _mp.is_available():
agent._memory_manager.add_provider(_mp)
if agent._memory_manager.providers:
_init_kwargs = {
"session_id": agent.session_id,
"platform": platform or "cli",
"hermes_home": str(get_hermes_home()),
"agent_context": "primary",
}
if agent._session_db:
try:
_st = agent._session_db.get_session_title(agent.session_id)
if _st:
_init_kwargs["session_title"] = _st
except Exception:
pass
if agent._user_id:
_init_kwargs["user_id"] = agent._user_id
if agent._user_name:
_init_kwargs["user_name"] = agent._user_name
if agent._chat_id:
_init_kwargs["chat_id"] = agent._chat_id
if agent._chat_name:
_init_kwargs["chat_name"] = agent._chat_name
if agent._chat_type:
_init_kwargs["chat_type"] = agent._chat_type
if agent._thread_id:
_init_kwargs["thread_id"] = agent._thread_id
if agent._gateway_session_key:
_init_kwargs["gateway_session_key"] = agent._gateway_session_key
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name()
_init_kwargs["agent_identity"] = _profile
_init_kwargs["agent_workspace"] = "hermes"
except Exception:
pass
agent._memory_manager.initialize_all(**_init_kwargs)
_ra().logger.info("Memory provider '%s' activated", _mem_provider_name)
else:
_ra().logger.debug("Memory provider '%s' not found or not available", _mem_provider_name)
agent._memory_manager = None
except Exception as _mpe:
_ra().logger.warning("Memory provider plugin init failed: %s", _mpe)
agent._memory_manager = None
if agent._memory_manager and agent.tools is not None:
_existing_tool_names = {
t.get("function", {}).get("name")
for t in agent.tools
if isinstance(t, dict)
}
for _schema in agent._memory_manager.get_all_tool_schemas():
_tname = _schema.get("name", "")
if _tname and _tname in _existing_tool_names:
continue
_wrapped = {"type": "function", "function": _schema}
agent.tools.append(_wrapped)
if _tname:
agent.valid_tool_names.add(_tname)
_existing_tool_names.add(_tname)
agent._skill_nudge_interval = 10
try:
skills_config = _agent_cfg.get("skills", {})
agent._skill_nudge_interval = int(skills_config.get("creation_nudge_interval", 10))
except Exception:
pass
_agent_section = _agent_cfg.get("agent", {})
if not isinstance(_agent_section, dict):
_agent_section = {}
agent._tool_use_enforcement = _agent_section.get("tool_use_enforcement", "auto")
try:
_raw_api_retries = _agent_section.get("api_max_retries", 3)
_api_retries = int(_raw_api_retries)
_api_retries = max(_api_retries, 1)
except (TypeError, ValueError):
_api_retries = 3
agent._api_max_retries = _api_retries
_compression_cfg = _agent_cfg.get("compression", {})
if not isinstance(_compression_cfg, dict):
_compression_cfg = {}
compression_threshold = float(_compression_cfg.get("threshold", 0.50))
try:
from agent.auxiliary_client import _compression_threshold_for_model as _cthresh_fn
_model_cthresh = _cthresh_fn(agent.model)
if _model_cthresh is not None:
compression_threshold = _model_cthresh
except Exception:
pass
compression_enabled = str(_compression_cfg.get("enabled", True)).lower() in {"true", "1", "yes"}
compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20))
compression_protect_last = int(_compression_cfg.get("protect_last_n", 20))
compression_protect_first = max(
0, int(_compression_cfg.get("protect_first_n", 3))
)
compression_abort_on_summary_failure = str(
_compression_cfg.get("abort_on_summary_failure", False)
).lower() in {"true", "1", "yes"}
try:
_aux_cfg = cfg_get(_agent_cfg, "auxiliary", "compression", default={})
except Exception:
_aux_cfg = {}
if isinstance(_aux_cfg, dict):
_aux_context_config = _aux_cfg.get("context_length")
else:
_aux_context_config = None
if _aux_context_config is not None:
try:
_aux_context_config = int(_aux_context_config)
except (TypeError, ValueError):
_aux_context_config = None
agent._aux_compression_context_length_config = _aux_context_config
_model_cfg = _agent_cfg.get("model", {})
if agent.max_tokens is None and isinstance(_model_cfg, dict):
_config_max_tokens = _model_cfg.get("max_tokens")
if _config_max_tokens is not None:
try:
if isinstance(_config_max_tokens, bool):
raise ValueError
_parsed_max_tokens = int(_config_max_tokens)
if _parsed_max_tokens <= 0:
raise ValueError
agent.max_tokens = _parsed_max_tokens
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid model.max_tokens in config.yaml: %r — "
"must be a positive integer (e.g. 4096). "
"Falling back to provider default.",
_config_max_tokens,
)
print(
f"\n⚠ Invalid model.max_tokens in config.yaml: {_config_max_tokens!r}\n"
f" Must be a positive integer (e.g. 4096).\n"
f" Falling back to provider default.\n",
file=sys.stderr,
)
agent._session_init_model_config["max_tokens"] = agent.max_tokens
if isinstance(_model_cfg, dict):
_config_context_length = _model_cfg.get("context_length")
else:
_config_context_length = None
if _config_context_length is not None:
try:
_config_context_length = int(_config_context_length)
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid model.context_length in config.yaml: %r — "
"must be a plain integer (e.g. 256000, not '256K'). "
"Falling back to auto-detection.",
_config_context_length,
)
print(
f"\n⚠ Invalid model.context_length in config.yaml: {_config_context_length!r}\n"
f" Must be a plain integer (e.g. 256000, not '256K').\n"
f" Falling back to auto-detected context window.\n",
file=sys.stderr,
)
_config_context_length = None
try:
from hermes_cli.config import get_compatible_custom_providers
_custom_providers = get_compatible_custom_providers(_agent_cfg)
except Exception:
_custom_providers = _agent_cfg.get("custom_providers")
if not isinstance(_custom_providers, list):
_custom_providers = []
agent._custom_providers = _custom_providers
if _config_context_length is None and _custom_providers:
try:
from hermes_cli.config import get_custom_provider_context_length
_cp_ctx_resolved = get_custom_provider_context_length(
model=agent.model,
base_url=agent.base_url,
custom_providers=_custom_providers,
)
if _cp_ctx_resolved:
_config_context_length = int(_cp_ctx_resolved)
except Exception:
_cp_ctx_resolved = None
if _config_context_length is None:
_target = agent.base_url.rstrip("/") if agent.base_url else ""
for _cp_entry in _custom_providers:
if not isinstance(_cp_entry, dict):
continue
_cp_url = (_cp_entry.get("base_url") or "").rstrip("/")
if _target and _cp_url == _target:
_cp_models = _cp_entry.get("models", {})
if isinstance(_cp_models, dict):
_cp_model_cfg = _cp_models.get(agent.model, {})
if isinstance(_cp_model_cfg, dict):
_cp_ctx = _cp_model_cfg.get("context_length")
if _cp_ctx is not None:
try:
_parsed = int(_cp_ctx)
if _parsed <= 0:
raise ValueError
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid context_length for model %r in "
"custom_providers: %r — must be a positive "
"integer (e.g. 256000, not '256K'). "
"Falling back to auto-detection.",
agent.model, _cp_ctx,
)
print(
f"\n⚠ Invalid context_length for model {agent.model!r} in custom_providers: {_cp_ctx!r}\n"
f" Must be a positive integer (e.g. 256000, not '256K').\n"
f" Falling back to auto-detected context window.\n",
file=sys.stderr,
)
break
agent._config_context_length = _config_context_length
agent._ensure_lmstudio_runtime_loaded(_config_context_length)
_selected_engine = None
_engine_name = "compressor"
try:
_ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {}
_engine_name = _ctx_cfg.get("engine", "compressor") or "compressor"
except Exception:
pass
if _engine_name != "compressor":
try:
from plugins.context_engine import load_context_engine
_selected_engine = load_context_engine(_engine_name)
except Exception as _ce_load_err:
_ra().logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err)
if _selected_engine is None:
try:
from hermes_cli.plugins import get_plugin_context_engine
_candidate = get_plugin_context_engine()
if _candidate and _candidate.name == _engine_name:
_selected_engine = _candidate
except Exception:
pass
if _selected_engine is None:
_ra().logger.warning(
"Context engine '%s' not found — falling back to built-in compressor",
_engine_name,
)
if _selected_engine is not None:
agent.context_compressor = _selected_engine
from agent.model_metadata import get_model_context_length
_plugin_ctx_len = get_model_context_length(
agent.model,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
config_context_length=_config_context_length,
provider=agent.provider,
custom_providers=_custom_providers,
)
agent.context_compressor.update_model(
model=agent.model,
context_length=_plugin_ctx_len,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
provider=agent.provider,
)
if not agent.quiet_mode:
_ra().logger.info("Using context engine: %s", _selected_engine.name)
else:
agent.context_compressor = ContextCompressor(
model=agent.model,
threshold_percent=compression_threshold,
protect_first_n=compression_protect_first,
protect_last_n=compression_protect_last,
summary_target_ratio=compression_target_ratio,
summary_model_override=None,
quiet_mode=agent.quiet_mode,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
config_context_length=_config_context_length,
provider=agent.provider,
api_mode=agent.api_mode,
abort_on_summary_failure=compression_abort_on_summary_failure,
)
agent.compression_enabled = compression_enabled
from agent.model_metadata import MINIMUM_CONTEXT_LENGTH
_ctx = getattr(agent.context_compressor, "context_length", 0)
if _ctx and _ctx < MINIMUM_CONTEXT_LENGTH:
raise ValueError(
f"Model {agent.model} has a context window of {_ctx:,} tokens, "
f"which is below the minimum {MINIMUM_CONTEXT_LENGTH:,} required "
f"by Hermes Agent. Choose a model with at least "
f"{MINIMUM_CONTEXT_LENGTH // 1000}K context, or set "
f"model.context_length in config.yaml to override."
)
agent._context_engine_tool_names: set = set()
if hasattr(agent, "context_compressor") and agent.context_compressor and agent.tools is not None:
_existing_tool_names = {
t.get("function", {}).get("name")
for t in agent.tools
if isinstance(t, dict)
}
for _schema in agent.context_compressor.get_tool_schemas():
_tname = _schema.get("name", "")
if _tname and _tname in _existing_tool_names:
continue
_wrapped = {"type": "function", "function": _schema}
agent.tools.append(_wrapped)
if _tname:
agent.valid_tool_names.add(_tname)
agent._context_engine_tool_names.add(_tname)
_existing_tool_names.add(_tname)
if hasattr(agent, "context_compressor") and agent.context_compressor:
try:
agent.context_compressor.on_session_start(
agent.session_id,
hermes_home=str(get_hermes_home()),
platform=agent.platform or "cli",
model=agent.model,
context_length=getattr(agent.context_compressor, "context_length", 0),
)
except Exception as _ce_err:
_ra().logger.debug("Context engine on_session_start: %s", _ce_err)
agent._subdirectory_hints = SubdirectoryHintTracker(
working_dir=os.getenv("TERMINAL_CWD") or None,
)
agent._user_turn_count = 0
agent.session_prompt_tokens = 0
agent.session_completion_tokens = 0
agent.session_total_tokens = 0
agent.session_api_calls = 0
agent.session_input_tokens = 0
agent.session_output_tokens = 0
agent.session_cache_read_tokens = 0
agent.session_cache_write_tokens = 0
agent.session_reasoning_tokens = 0
agent.session_estimated_cost_usd = 0.0
agent.session_cost_status = "unknown"
agent.session_cost_source = "none"
agent._ollama_num_ctx: int | None = None
_ollama_num_ctx_override = None
if isinstance(_model_cfg, dict):
_ollama_num_ctx_override = _model_cfg.get("ollama_num_ctx")
if _ollama_num_ctx_override is not None:
try:
agent._ollama_num_ctx = int(_ollama_num_ctx_override)
except (TypeError, ValueError):
_ra().logger.debug("Invalid ollama_num_ctx config value: %r", _ollama_num_ctx_override)
if agent._ollama_num_ctx is None and agent.base_url and is_local_endpoint(agent.base_url):
try:
_key_for_ollama = agent.api_key if isinstance(agent.api_key, str) else ""
_detected = query_ollama_num_ctx(agent.model, agent.base_url, api_key=_key_for_ollama or "")
if _detected and _detected > 0:
agent._ollama_num_ctx = _detected
except Exception as exc:
_ra().logger.debug("Ollama num_ctx detection failed: %s", exc)
if (
agent._ollama_num_ctx
and _config_context_length
and _ollama_num_ctx_override is None
and agent._ollama_num_ctx > _config_context_length
):
_ra().logger.info(
"Ollama num_ctx capped: %d -> %d (model.context_length override)",
agent._ollama_num_ctx, _config_context_length,
)
agent._ollama_num_ctx = _config_context_length
if agent._ollama_num_ctx and not agent.quiet_mode:
_ra().logger.info(
"Ollama num_ctx: will request %d tokens (model max from /api/show)",
agent._ollama_num_ctx,
)
if not agent.quiet_mode:
if compression_enabled:
print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (compress at {int(compression_threshold*100)}% = {agent.context_compressor.threshold_tokens:,})")
else:
print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (auto-compression disabled)")
agent._compression_warning = None
agent._compression_feasibility_checked = False
_cc = agent.context_compressor
agent._primary_runtime = {
"model": agent.model,
"provider": agent.provider,
"base_url": agent.base_url,
"api_mode": agent.api_mode,
"api_key": getattr(agent, "api_key", ""),
"client_kwargs": dict(agent._client_kwargs),
"use_prompt_caching": agent._use_prompt_caching,
"use_native_cache_layout": agent._use_native_cache_layout,
"compressor_model": getattr(_cc, "model", agent.model),
"compressor_base_url": getattr(_cc, "base_url", agent.base_url),
"compressor_api_key": getattr(_cc, "api_key", ""),
"compressor_provider": getattr(_cc, "provider", agent.provider),
"compressor_context_length": _cc.context_length,
"compressor_threshold_tokens": _cc.threshold_tokens,
}
if agent.api_mode == "anthropic_messages":
agent._primary_runtime.update({
"anthropic_api_key": agent._anthropic_api_key,
"anthropic_base_url": agent._anthropic_base_url,
"is_anthropic_oauth": agent._is_anthropic_oauth,
})
__all__ = ["init_agent"]