"""Tirith pre-exec security scanning wrapper.
Runs the tirith binary as a subprocess to scan commands for content-level
threats (homograph URLs, pipe-to-interpreter, terminal injection, etc.).
Exit code is the verdict source of truth:
0 = allow, 1 = block, 2 = warn
JSON stdout enriches findings/summary but never overrides the verdict.
Operational failures (spawn error, timeout, unknown exit code) respect
the fail_open config setting. Programming errors propagate.
Auto-install: if tirith is not found on PATH or at the configured path,
it is automatically downloaded from GitHub releases to $HERMES_HOME/bin/tirith.
The download always verifies SHA-256 checksums. When cosign is available on
PATH, provenance verification (GitHub Actions workflow signature) is also
performed. If cosign is not installed, the download proceeds with SHA-256
verification only — still secure via HTTPS + checksum, just without supply
chain provenance proof. Installation runs in a background thread so startup
never blocks.
"""
import hashlib
import json
import logging
import os
import platform
import shutil
import stat
import subprocess
import tarfile
import tempfile
import threading
import time
import urllib.request
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
_REPO = "sheeki03/tirith"
_COSIGN_IDENTITY_REGEXP = f"^https://github.com/{_REPO}/\\.github/workflows/release\\.yml@refs/tags/v"
_COSIGN_ISSUER = "https://token.actions.githubusercontent.com"
def _env_bool(key: str, default: bool) -> bool:
val = os.getenv(key)
if val is None:
return default
return val.lower() in {"1", "true", "yes"}
def _env_int(key: str, default: int) -> int:
val = os.getenv(key)
if val is None:
return default
try:
return int(val)
except ValueError:
return default
def _load_security_config() -> dict:
"""Load security settings from config.yaml, with env var overrides."""
defaults = {
"tirith_enabled": True,
"tirith_path": "tirith",
"tirith_timeout": 5,
"tirith_fail_open": True,
}
try:
from hermes_cli.config import load_config
cfg = load_config().get("security", {}) or {}
except Exception:
cfg = {}
return {
"tirith_enabled": _env_bool("TIRITH_ENABLED", cfg.get("tirith_enabled", defaults["tirith_enabled"])),
"tirith_path": os.getenv("TIRITH_BIN", cfg.get("tirith_path", defaults["tirith_path"])),
"tirith_timeout": _env_int("TIRITH_TIMEOUT", cfg.get("tirith_timeout", defaults["tirith_timeout"])),
"tirith_fail_open": _env_bool("TIRITH_FAIL_OPEN", cfg.get("tirith_fail_open", defaults["tirith_fail_open"])),
}
_resolved_path: str | None | bool = None
_INSTALL_FAILED = False
_install_failure_reason: str = ""
_install_lock = threading.Lock()
_install_thread: threading.Thread | None = None
_warned_messages: set[str] = set()
_warned_lock = threading.Lock()
def _warn_once(key: str, message: str, *args) -> None:
"""``logger.warning`` but at-most-once per ``key`` for the process
lifetime. Used to avoid drowning the log when a fail-open tirith
misconfiguration fires on every command."""
with _warned_lock:
if key in _warned_messages:
return
_warned_messages.add(key)
logger.warning(message, *args)
def _reset_spawn_warning_state() -> None:
"""Clear the warn-once dedupe set. Called when tirith is freshly
(re)installed so a subsequent failure surfaces again — e.g. user
deletes the binary mid-session.
"""
with _warned_lock:
_warned_messages.clear()
_MARKER_TTL = 86400
def _get_hermes_home() -> str:
"""Return the Hermes home directory, respecting HERMES_HOME env var."""
return str(get_hermes_home())
def _failure_marker_path() -> str:
"""Return the path to the install-failure marker file."""
return os.path.join(_get_hermes_home(), ".tirith-install-failed")
def _read_failure_reason() -> str | None:
"""Read the failure reason from the disk marker.
Returns the reason string, or None if the marker doesn't exist or is
older than _MARKER_TTL.
"""
try:
p = _failure_marker_path()
mtime = os.path.getmtime(p)
if (time.time() - mtime) >= _MARKER_TTL:
return None
with open(p, "r", encoding="utf-8") as f:
return f.read().strip()
except OSError:
return None
def _is_install_failed_on_disk() -> bool:
"""Check if a recent install failure was persisted to disk.
Returns False (allowing retry) when:
- No marker exists
- Marker is older than _MARKER_TTL (24h)
- Marker reason is 'cosign_missing' and cosign is now on PATH
"""
reason = _read_failure_reason()
if reason is None:
return False
if reason == "cosign_missing" and shutil.which("cosign"):
_clear_install_failed()
return False
return True
def _mark_install_failed(reason: str = ""):
"""Persist install failure to disk to avoid retry on next process.
Args:
reason: Short tag identifying the failure cause. Use "cosign_missing"
when cosign is not on PATH so the marker can be auto-cleared
once cosign becomes available.
"""
try:
p = _failure_marker_path()
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, "w", encoding="utf-8") as f:
f.write(reason)
except OSError:
pass
def _clear_install_failed():
"""Remove the failure marker after successful install."""
_reset_spawn_warning_state()
try:
os.unlink(_failure_marker_path())
except OSError:
pass
def _hermes_bin_dir() -> str:
"""Return $HERMES_HOME/bin, creating it if needed."""
d = os.path.join(_get_hermes_home(), "bin")
os.makedirs(d, exist_ok=True)
return d
def _detect_target() -> str | None:
"""Return the Rust target triple for the current platform, or None.
Windows is intentionally unsupported — tirith does not ship a Windows
build. Callers should treat `None` as "this platform will never have
tirith" and silently fall back to pattern-matching guards.
"""
system = platform.system()
machine = platform.machine().lower()
if system == "Darwin":
plat = "apple-darwin"
elif system in {"Linux", "Android"}:
plat = "unknown-linux-gnu"
else:
return None
if machine in {"x86_64", "amd64"}:
arch = "x86_64"
elif machine in {"aarch64", "arm64"}:
arch = "aarch64"
else:
return None
return f"{arch}-{plat}"
def is_platform_supported() -> bool:
"""True when tirith ships a prebuilt binary for this OS+arch.
Used by callers (CLI banner, etc.) to distinguish "tirith failed to
install" from "tirith was never going to install here" — the latter
is silent because there is nothing the user can do about it.
"""
return _detect_target() is not None
def _download_file(url: str, dest: str, timeout: int = 10):
"""Download a URL to a local file."""
req = urllib.request.Request(url)
token = os.getenv("GITHUB_TOKEN")
if token:
req.add_header("Authorization", f"token {token}")
with urllib.request.urlopen(req, timeout=timeout) as resp, open(dest, "wb") as f:
shutil.copyfileobj(resp, f)
def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool | None:
"""Verify cosign provenance signature on checksums.txt.
Returns:
True — cosign verified successfully
False — cosign found but verification failed
None — cosign not available (not on PATH, or execution failed)
The caller treats both False and None as "abort auto-install" — only
True allows the install to proceed.
"""
cosign = shutil.which("cosign")
if not cosign:
logger.info("cosign not found on PATH")
return None
try:
result = subprocess.run(
[cosign, "verify-blob",
"--certificate", cert_path,
"--signature", sig_path,
"--certificate-identity-regexp", _COSIGN_IDENTITY_REGEXP,
"--certificate-oidc-issuer", _COSIGN_ISSUER,
checksums_path],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0:
logger.info("cosign provenance verification passed")
return True
else:
logger.warning("cosign verification failed (exit %d): %s",
result.returncode, result.stderr.strip())
return False
except (OSError, subprocess.TimeoutExpired) as exc:
logger.warning("cosign execution failed: %s", exc)
return None
def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool:
"""Verify SHA-256 of the archive against checksums.txt."""
expected = None
with open(checksums_path, encoding="utf-8") as f:
for line in f:
parts = line.strip().split(" ", 1)
if len(parts) == 2 and parts[1] == archive_name:
expected = parts[0]
break
if not expected:
logger.warning("No checksum entry for %s", archive_name)
return False
sha = hashlib.sha256()
with open(archive_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
logger.warning("Checksum mismatch: expected %s, got %s", expected, actual)
return False
return True
def _install_tirith(*, log_failures: bool = True) -> tuple[str | None, str]:
"""Download and install tirith to $HERMES_HOME/bin/tirith.
Verifies provenance via cosign and SHA-256 checksum.
Returns (installed_path, failure_reason). On success failure_reason is "".
failure_reason is a short tag used by the disk marker to decide if the
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
"""
log = logger.warning if log_failures else logger.debug
target = _detect_target()
if not target:
logger.info("tirith auto-install: unsupported platform %s/%s",
platform.system(), platform.machine())
return None, "unsupported_platform"
archive_name = f"tirith-{target}.tar.gz"
base_url = f"https://github.com/{_REPO}/releases/latest/download"
tmpdir = tempfile.mkdtemp(prefix="tirith-install-")
try:
archive_path = os.path.join(tmpdir, archive_name)
checksums_path = os.path.join(tmpdir, "checksums.txt")
sig_path = os.path.join(tmpdir, "checksums.txt.sig")
cert_path = os.path.join(tmpdir, "checksums.txt.pem")
logger.info("tirith not found — downloading latest release for %s...", target)
try:
_download_file(f"{base_url}/{archive_name}", archive_path)
_download_file(f"{base_url}/checksums.txt", checksums_path)
except Exception as exc:
log("tirith download failed: %s", exc)
return None, "download_failed"
cosign_verified = False
if shutil.which("cosign"):
try:
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
except Exception as exc:
logger.info("cosign artifacts unavailable (%s), proceeding with SHA-256 only", exc)
else:
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
if cosign_result is True:
cosign_verified = True
elif cosign_result is False:
log("tirith install aborted: cosign provenance verification failed")
return None, "cosign_verification_failed"
else:
logger.info("cosign execution failed, proceeding with SHA-256 only")
else:
logger.info("cosign not on PATH — installing tirith with SHA-256 verification only "
"(install cosign for full supply chain verification)")
if not _verify_checksum(archive_path, checksums_path, archive_name):
return None, "checksum_failed"
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
if member.name == "tirith" or member.name.endswith("/tirith"):
if ".." in member.name:
continue
member.name = "tirith"
tar.extract(member, tmpdir)
break
else:
log("tirith binary not found in archive")
return None, "binary_not_in_archive"
src = os.path.join(tmpdir, "tirith")
dest = os.path.join(_hermes_bin_dir(), "tirith")
try:
shutil.move(src, dest)
except OSError:
try:
shutil.copy(src, dest)
except OSError:
try:
os.unlink(dest)
except OSError:
pass
return None, "cross_device_copy_failed"
os.chmod(dest, os.stat(dest).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
verification = "cosign + SHA-256" if cosign_verified else "SHA-256 only"
logger.info("tirith installed to %s (%s)", dest, verification)
return dest, ""
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def _is_explicit_path(configured_path: str) -> bool:
"""Return True if the user explicitly configured a non-default tirith path."""
return configured_path != "tirith"
def _resolve_tirith_path(configured_path: str) -> str:
"""Resolve the tirith binary path, auto-installing if necessary.
If the user explicitly set a path (anything other than the bare "tirith"
default), that path is authoritative — we never fall through to
auto-download a different binary.
For the default "tirith":
1. PATH lookup via shutil.which
2. $HERMES_HOME/bin/tirith (previously auto-installed)
3. Auto-install from GitHub releases → $HERMES_HOME/bin/tirith
Failed installs are cached for the process lifetime (and persisted to
disk for 24h) to avoid repeated network attempts.
"""
global _resolved_path, _install_failure_reason
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
return _resolved_path
expanded = os.path.expanduser(configured_path)
explicit = _is_explicit_path(configured_path)
install_failed = _resolved_path is _INSTALL_FAILED
if not explicit and not is_platform_supported():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "unsupported_platform"
return expanded
if explicit:
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
_resolved_path = expanded
return expanded
found = shutil.which(expanded)
if found:
_resolved_path = found
return found
logger.warning("Configured tirith path %r not found; scanning disabled", configured_path)
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "explicit_path_missing"
return expanded
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
_clear_install_failed()
return found
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
_clear_install_failed()
return hermes_bin
if install_failed:
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
_resolved_path = None
_install_failure_reason = ""
_clear_install_failed()
install_failed = False
else:
return expanded
if _install_thread is not None and _install_thread.is_alive():
return expanded
disk_reason = _read_failure_reason()
if disk_reason is not None and _is_install_failed_on_disk():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = disk_reason
return expanded
installed, reason = _install_tirith()
if installed:
_resolved_path = installed
_install_failure_reason = ""
_clear_install_failed()
return installed
_resolved_path = _INSTALL_FAILED
_install_failure_reason = reason
_mark_install_failed(reason)
return expanded
def _background_install(*, log_failures: bool = True):
"""Background thread target: download and install tirith."""
global _resolved_path, _install_failure_reason
with _install_lock:
if _resolved_path is not None:
return
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
return
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
return
installed, reason = _install_tirith(log_failures=log_failures)
if installed:
_resolved_path = installed
_install_failure_reason = ""
_clear_install_failed()
else:
_resolved_path = _INSTALL_FAILED
_install_failure_reason = reason
_mark_install_failed(reason)
def ensure_installed(*, log_failures: bool = True):
"""Ensure tirith is available, downloading in background if needed.
Quick PATH/local checks are synchronous; network download runs in a
daemon thread so startup never blocks. Safe to call multiple times.
Returns the resolved path immediately if available, or None.
"""
global _resolved_path, _install_thread, _install_failure_reason
cfg = _load_security_config()
if not cfg["tirith_enabled"]:
return None
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
path = _resolved_path
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None
if not is_platform_supported():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "unsupported_platform"
return None
configured_path = cfg["tirith_path"]
explicit = _is_explicit_path(configured_path)
expanded = os.path.expanduser(configured_path)
if explicit:
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
_resolved_path = expanded
return expanded
found = shutil.which(expanded)
if found:
_resolved_path = found
return found
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "explicit_path_missing"
return None
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
_clear_install_failed()
return found
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
_clear_install_failed()
return hermes_bin
if _resolved_path is _INSTALL_FAILED:
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
_resolved_path = None
_install_failure_reason = ""
_clear_install_failed()
else:
return None
disk_reason = _read_failure_reason()
if disk_reason is not None and _is_install_failed_on_disk():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = disk_reason
return None
if _install_thread is None or not _install_thread.is_alive():
_install_thread = threading.Thread(
target=_background_install,
kwargs={"log_failures": log_failures},
daemon=True,
)
_install_thread.start()
return None
_MAX_FINDINGS = 50
_MAX_SUMMARY_LEN = 500
def check_command_security(command: str) -> dict:
"""Run tirith security scan on a command.
Exit code determines action (0=allow, 1=block, 2=warn). JSON enriches
findings/summary. Spawn failures and timeouts respect fail_open config.
Programming errors propagate.
Returns:
{"action": "allow"|"warn"|"block", "findings": [...], "summary": str}
"""
cfg = _load_security_config()
if not cfg["tirith_enabled"]:
return {"action": "allow", "findings": [], "summary": ""}
if not is_platform_supported():
return {"action": "allow", "findings": [], "summary": ""}
tirith_path = _resolve_tirith_path(cfg["tirith_path"])
timeout = cfg["tirith_timeout"]
fail_open = cfg["tirith_fail_open"]
if tirith_path is None:
_warn_once(
"tirith_path_none",
"tirith path resolved to None; scanning disabled",
)
if fail_open:
return {"action": "allow", "findings": [], "summary": "tirith path unavailable"}
return {"action": "block", "findings": [], "summary": "tirith path unavailable (fail-closed)"}
try:
result = subprocess.run(
[tirith_path, "check", "--json", "--non-interactive",
"--shell", "posix", "--", command],
capture_output=True,
text=True,
timeout=timeout,
)
except OSError as exc:
spawn_key = f"tirith_spawn_failed:{type(exc).__name__}:{getattr(exc, 'errno', '')}"
_warn_once(spawn_key, "tirith spawn failed: %s", exc)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"}
return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"}
except subprocess.TimeoutExpired:
_warn_once(
f"tirith_timeout:{timeout}",
"tirith timed out after %ds",
timeout,
)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}
exit_code = result.returncode
if exit_code == 0:
action = "allow"
elif exit_code == 1:
action = "block"
elif exit_code == 2:
action = "warn"
else:
logger.warning("tirith returned unexpected exit code %d", exit_code)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith exit code {exit_code} (fail-open)"}
return {"action": "block", "findings": [], "summary": f"tirith exit code {exit_code} (fail-closed)"}
findings = []
summary = ""
try:
data = json.loads(result.stdout) if result.stdout.strip() else {}
raw_findings = data.get("findings", [])
findings = raw_findings[:_MAX_FINDINGS]
summary = (data.get("summary", "") or "")[:_MAX_SUMMARY_LEN]
except (json.JSONDecodeError, AttributeError):
logger.debug("tirith JSON parse failed, using exit code only")
if action == "block":
summary = "security issue detected (details unavailable)"
elif action == "warn":
summary = "security warning detected (details unavailable)"
if action == "warn" and findings:
non_suppressible = [f for f in findings if not _is_app_tld_finding(f)]
if not non_suppressible:
action = "allow"
findings = []
summary = ""
return {"action": action, "findings": findings, "summary": summary}
def _is_app_tld_finding(finding: dict) -> bool:
"""Return True if this finding is a lookalike_tld warning for the .app TLD only.
Checks the rule_id and inspects common value/detail field names that
Tirith may use to carry the TLD string.
"""
if not isinstance(finding, dict):
return False
if finding.get("rule_id") != "lookalike_tld":
return False
for field in ("value", "tld", "detail", "description", "message"):
val = finding.get(field)
if val is not None and ".app" in str(val).lower():
return True
return False