"""AWS Bedrock Converse API adapter for Hermes Agent.
Provides native integration with Amazon Bedrock using the Converse API,
bypassing the OpenAI-compatible endpoint in favor of direct AWS SDK calls.
This enables full access to the Bedrock ecosystem:
- **Native Converse API**: Unified interface for all Bedrock models
(Claude, Nova, Llama, Mistral, etc.) with streaming support.
- **AWS credential chain**: IAM roles, SSO profiles, environment variables,
instance metadata — zero API key management for AWS-native environments.
- **Dynamic model discovery**: Auto-discovers available foundation models
and cross-region inference profiles via the Bedrock control plane.
- **Guardrails support**: Optional Bedrock Guardrails configuration for
content filtering and safety policies.
- **Inference profiles**: Supports cross-region inference profiles
(us.anthropic.claude-*, global.anthropic.claude-*) for better capacity
and automatic failover.
Architecture follows the same pattern as ``anthropic_adapter.py``:
- All Bedrock-specific logic is isolated in this module.
- Messages/tools are converted between OpenAI format and Converse format.
- Responses are normalized back to OpenAI-compatible objects for the agent loop.
Reference: OpenClaw's ``extensions/amazon-bedrock/`` plugin, which implements
the same Converse API integration in TypeScript via ``@aws-sdk/client-bedrock``.
Requires: ``boto3`` (optional dependency — only needed when using the Bedrock provider).
"""
import json
import logging
import os
import re
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
try:
from tools.lazy_deps import ensure
ensure("provider.bedrock", prompt=False)
except Exception:
pass
_bedrock_runtime_client_cache: Dict[str, Any] = {}
_bedrock_control_client_cache: Dict[str, Any] = {}
def _require_boto3():
"""Import boto3, raising a clear error if not installed."""
try:
import boto3
return boto3
except ImportError:
raise ImportError(
"The 'boto3' package is required for the AWS Bedrock provider. "
"Install it with: pip install boto3\n"
"Or install Hermes with Bedrock support: pip install -e '.[bedrock]'"
)
def _get_bedrock_runtime_client(region: str):
"""Get or create a cached ``bedrock-runtime`` client for the given region.
Uses the default AWS credential chain (env vars → profile → instance role).
"""
if region not in _bedrock_runtime_client_cache:
boto3 = _require_boto3()
_bedrock_runtime_client_cache[region] = boto3.client(
"bedrock-runtime", region_name=region,
)
return _bedrock_runtime_client_cache[region]
def _get_bedrock_control_client(region: str):
"""Get or create a cached ``bedrock`` control-plane client for model discovery."""
if region not in _bedrock_control_client_cache:
boto3 = _require_boto3()
_bedrock_control_client_cache[region] = boto3.client(
"bedrock", region_name=region,
)
return _bedrock_control_client_cache[region]
def reset_client_cache():
"""Clear cached boto3 clients. Used in tests and profile switches."""
_bedrock_runtime_client_cache.clear()
_bedrock_control_client_cache.clear()
def invalidate_runtime_client(region: str) -> bool:
"""Evict the cached ``bedrock-runtime`` client for a single region.
Per-region counterpart to :func:`reset_client_cache`. Used by the converse
call wrappers to discard clients whose underlying HTTP connection has
gone stale, so the next call allocates a fresh client (with a fresh
connection pool) instead of reusing a dead socket.
Returns True if a cached entry was evicted, False if the region was not
cached.
"""
existed = region in _bedrock_runtime_client_cache
_bedrock_runtime_client_cache.pop(region, None)
return existed
_STALE_LIB_MODULE_PREFIXES = (
"urllib3.",
"botocore.",
"boto3.",
)
def _traceback_frames_modules(exc: BaseException):
"""Yield ``__name__``-style module strings for each frame in exc's traceback."""
tb = getattr(exc, "__traceback__", None)
while tb is not None:
frame = tb.tb_frame
module = frame.f_globals.get("__name__", "")
yield module or ""
tb = tb.tb_next
def is_stale_connection_error(exc: BaseException) -> bool:
"""Return True if ``exc`` indicates a dead/stale Bedrock HTTP connection.
Matches:
* ``botocore.exceptions.ConnectionError`` and subclasses
(``ConnectionClosedError``, ``EndpointConnectionError``,
``ReadTimeoutError``, ``ConnectTimeoutError``).
* ``urllib3.exceptions.ProtocolError`` / ``NewConnectionError`` /
``ConnectionError`` (best-effort import — urllib3 is a transitive
dependency of botocore so it is always available in practice).
* Bare ``AssertionError`` raised from a frame inside urllib3, botocore,
or boto3. These are internal-invariant failures (typically triggered
by corrupted connection-pool state after a dropped socket) and are
recoverable by swapping the client.
Non-library ``AssertionError``s (from application code or tests) are
intentionally not matched — only library-internal asserts signal stale
connection state.
"""
try:
from botocore.exceptions import (
ConnectionError as BotoConnectionError,
HTTPClientError,
)
botocore_errors: tuple = (BotoConnectionError, HTTPClientError)
except ImportError:
botocore_errors = ()
if botocore_errors and isinstance(exc, botocore_errors):
return True
try:
from urllib3.exceptions import (
ProtocolError,
NewConnectionError,
ConnectionError as Urllib3ConnectionError,
)
urllib3_errors = (ProtocolError, NewConnectionError, Urllib3ConnectionError)
except ImportError:
urllib3_errors = ()
if urllib3_errors and isinstance(exc, urllib3_errors):
return True
if isinstance(exc, AssertionError):
for module in _traceback_frames_modules(exc):
if any(module.startswith(prefix) for prefix in _STALE_LIB_MODULE_PREFIXES):
return True
return False
_AWS_CREDENTIAL_ENV_VARS = [
"AWS_BEARER_TOKEN_BEDROCK",
"AWS_ACCESS_KEY_ID",
"AWS_PROFILE",
"AWS_CONTAINER_CREDENTIALS_RELATIVE_URI",
"AWS_WEB_IDENTITY_TOKEN_FILE",
]
def resolve_aws_auth_env_var(env: Optional[Dict[str, str]] = None) -> Optional[str]:
"""Return the name of the AWS auth source that is active, or None.
Checks environment variables first, then falls back to boto3's credential
chain for implicit sources (EC2 IMDS, ECS task role, etc.).
This mirrors OpenClaw's ``resolveAwsSdkEnvVarName()`` — used to detect
whether the user has any AWS credentials configured without actually
attempting to authenticate.
"""
env = env if env is not None else os.environ
if env.get("AWS_BEARER_TOKEN_BEDROCK", "").strip():
return "AWS_BEARER_TOKEN_BEDROCK"
if (env.get("AWS_ACCESS_KEY_ID", "").strip()
and env.get("AWS_SECRET_ACCESS_KEY", "").strip()):
return "AWS_ACCESS_KEY_ID"
if env.get("AWS_PROFILE", "").strip():
return "AWS_PROFILE"
if env.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", "").strip():
return "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"
if env.get("AWS_WEB_IDENTITY_TOKEN_FILE", "").strip():
return "AWS_WEB_IDENTITY_TOKEN_FILE"
try:
import botocore.session
session = botocore.session.get_session()
credentials = session.get_credentials()
if credentials is not None:
resolved = credentials.get_frozen_credentials()
if resolved and resolved.access_key:
return "iam-role"
except Exception:
pass
return None
def has_aws_credentials(env: Optional[Dict[str, str]] = None) -> bool:
"""Return True if any AWS credential source is detected.
Checks environment variables first (fast, no I/O), then falls back to
boto3's credential chain which covers EC2 instance roles, ECS task roles,
Lambda execution roles, and other IMDS-based sources that don't set
environment variables.
This two-tier approach mirrors the pattern from OpenClaw PR #62673:
cloud environments (EC2, ECS, Lambda) provide credentials via instance
metadata, not environment variables. The env-var check is a fast path
for local development; the boto3 fallback covers all cloud deployments.
"""
if resolve_aws_auth_env_var(env) is not None:
return True
try:
import botocore.session
session = botocore.session.get_session()
credentials = session.get_credentials()
if credentials is not None:
resolved = credentials.get_frozen_credentials()
if resolved and resolved.access_key:
return True
except Exception:
pass
return False
def resolve_bedrock_region(env: Optional[Dict[str, str]] = None) -> str:
"""Resolve the AWS region for Bedrock API calls.
Priority:
1. AWS_REGION env var
2. AWS_DEFAULT_REGION env var
3. boto3/botocore configured region (from ~/.aws/config or SSO profile)
4. us-east-1 (hard fallback)
The boto3 fallback is critical for EU/AP users who configure their region
in ~/.aws/config via a named profile rather than env vars — without it,
live model discovery would always return us.* profile IDs regardless of
the user's actual region.
"""
env = env if env is not None else os.environ
explicit = (
env.get("AWS_REGION", "").strip()
or env.get("AWS_DEFAULT_REGION", "").strip()
)
if explicit:
return explicit
try:
import botocore.session
region = botocore.session.get_session().get_config_variable("region")
if region:
return region
except Exception:
pass
return "us-east-1"
def bedrock_model_ids_or_none() -> Optional[List[str]]:
"""Live-discover Bedrock model IDs for the active region.
Returns a list of model ID strings if discovery succeeds and yields
at least one model, or ``None`` on failure / empty result. Callers
should fall back to the static curated list when ``None`` is returned.
This helper consolidates the discover → extract-ids → fallback
pattern that was previously duplicated across ``provider_model_ids``,
``list_authenticated_providers`` section 2, and section 3.
"""
try:
discovered = discover_bedrock_models(resolve_bedrock_region())
if discovered:
return [m["id"] for m in discovered]
except Exception:
pass
return None
_NON_TOOL_CALLING_PATTERNS = [
"deepseek.r1",
"deepseek-r1",
"stability.",
"cohere.embed",
"amazon.titan-embed",
]
def _model_supports_tool_use(model_id: str) -> bool:
"""Return True if the model is expected to support tool/function calling.
Models in the denylist are known to reject toolConfig in the Converse API.
Unknown models default to True (assume tool support).
"""
model_lower = model_id.lower()
return not any(pattern in model_lower for pattern in _NON_TOOL_CALLING_PATTERNS)
def is_anthropic_bedrock_model(model_id: str) -> bool:
"""Return True if the model is an Anthropic Claude model on Bedrock.
These models should use the AnthropicBedrock SDK path for full feature
parity (prompt caching, thinking budgets, adaptive thinking).
Non-Claude models use the Converse API path.
Matches:
- ``anthropic.claude-*`` (foundation model IDs)
- ``us.anthropic.claude-*`` (US inference profiles)
- ``global.anthropic.claude-*`` (global inference profiles)
- ``eu.anthropic.claude-*`` (EU inference profiles)
"""
model_lower = model_id.lower()
for prefix in ("us.", "global.", "eu.", "ap.", "jp."):
if model_lower.startswith(prefix):
model_lower = model_lower[len(prefix):]
break
return model_lower.startswith("anthropic.claude")
def convert_tools_to_converse(tools: List[Dict]) -> List[Dict]:
"""Convert OpenAI-format tool definitions to Bedrock Converse ``toolConfig``.
OpenAI format::
{"type": "function", "function": {"name": "...", "description": "...",
"parameters": {"type": "object", "properties": {...}}}}
Converse format::
{"toolSpec": {"name": "...", "description": "...",
"inputSchema": {"json": {"type": "object", "properties": {...}}}}}
"""
if not tools:
return []
result = []
for t in tools:
fn = t.get("function", {})
name = fn.get("name", "")
description = fn.get("description", "")
parameters = fn.get("parameters", {"type": "object", "properties": {}})
result.append({
"toolSpec": {
"name": name,
"description": description,
"inputSchema": {"json": parameters},
}
})
return result
def _convert_content_to_converse(content) -> List[Dict]:
"""Convert OpenAI message content (string or list) to Converse content blocks.
Handles:
- Plain text strings → [{"text": "..."}]
- Content arrays with text/image_url parts → mixed text/image blocks
Filters out empty text blocks — Bedrock's Converse API rejects messages
where a text content block has an empty ``text`` field (ValidationException:
"text content blocks must be non-empty"). Ref: issue #9486.
"""
if content is None:
return [{"text": " "}]
if isinstance(content, str):
return [{"text": content}] if content.strip() else [{"text": " "}]
if isinstance(content, list):
blocks = []
for part in content:
if isinstance(part, str):
blocks.append({"text": part})
continue
if not isinstance(part, dict):
continue
part_type = part.get("type", "")
if part_type == "text":
text = part.get("text", "")
blocks.append({"text": text if text else " "})
elif part_type == "image_url":
image_url = part.get("image_url", {})
url = image_url.get("url", "") if isinstance(image_url, dict) else ""
if url.startswith("data:"):
header, _, data = url.partition(",")
media_type = "image/jpeg"
if header.startswith("data:"):
mime_part = header[5:].split(";")[0]
if mime_part:
media_type = mime_part
blocks.append({
"image": {
"format": media_type.split("/")[-1] if "/" in media_type else "jpeg",
"source": {"bytes": data},
}
})
else:
blocks.append({"text": f"[Image: {url}]"})
return blocks if blocks else [{"text": " "}]
return [{"text": str(content)}]
def convert_messages_to_converse(
messages: List[Dict],
) -> Tuple[Optional[List[Dict]], List[Dict]]:
"""Convert OpenAI-format messages to Bedrock Converse format.
Returns ``(system_prompt, converse_messages)`` where:
- ``system_prompt`` is a list of system content blocks (or None)
- ``converse_messages`` is the conversation in Converse format
Handles:
- System messages → extracted as system prompt
- User messages → ``{"role": "user", "content": [...]}``
- Assistant messages → ``{"role": "assistant", "content": [...]}``
- Tool calls → ``{"toolUse": {"toolUseId": ..., "name": ..., "input": ...}}``
- Tool results → ``{"toolResult": {"toolUseId": ..., "content": [...]}}``
Converse requires strict user/assistant alternation. Consecutive messages
with the same role are merged into a single message.
"""
system_blocks: List[Dict] = []
converse_msgs: List[Dict] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content")
if role == "system":
if isinstance(content, str) and content.strip():
system_blocks.append({"text": content})
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
system_blocks.append({"text": part.get("text", "")})
elif isinstance(part, str):
system_blocks.append({"text": part})
continue
if role == "tool":
tool_call_id = msg.get("tool_call_id", "")
result_content = content if isinstance(content, str) else json.dumps(content)
tool_result_block = {
"toolResult": {
"toolUseId": tool_call_id,
"content": [{"text": result_content}],
}
}
if converse_msgs and converse_msgs[-1]["role"] == "user":
converse_msgs[-1]["content"].append(tool_result_block)
else:
converse_msgs.append({
"role": "user",
"content": [tool_result_block],
})
continue
if role == "assistant":
content_blocks = []
if isinstance(content, str) and content.strip():
content_blocks.append({"text": content})
elif isinstance(content, list):
content_blocks.extend(_convert_content_to_converse(content))
tool_calls = msg.get("tool_calls", [])
for tc in (tool_calls or []):
fn = tc.get("function", {})
args_str = fn.get("arguments", "{}")
try:
args_dict = json.loads(args_str) if isinstance(args_str, str) else args_str
except (json.JSONDecodeError, TypeError):
args_dict = {}
content_blocks.append({
"toolUse": {
"toolUseId": tc.get("id", ""),
"name": fn.get("name", ""),
"input": args_dict,
}
})
if not content_blocks:
content_blocks = [{"text": " "}]
if converse_msgs and converse_msgs[-1]["role"] == "assistant":
converse_msgs[-1]["content"].extend(content_blocks)
else:
converse_msgs.append({
"role": "assistant",
"content": content_blocks,
})
continue
if role == "user":
content_blocks = _convert_content_to_converse(content)
if converse_msgs and converse_msgs[-1]["role"] == "user":
converse_msgs[-1]["content"].extend(content_blocks)
else:
converse_msgs.append({
"role": "user",
"content": content_blocks,
})
continue
if converse_msgs and converse_msgs[0]["role"] != "user":
converse_msgs.insert(0, {"role": "user", "content": [{"text": " "}]})
if converse_msgs and converse_msgs[-1]["role"] != "user":
converse_msgs.append({"role": "user", "content": [{"text": " "}]})
return (system_blocks if system_blocks else None, converse_msgs)
def _converse_stop_reason_to_openai(stop_reason: str) -> str:
"""Map Bedrock Converse stop reasons to OpenAI finish_reason values."""
mapping = {
"end_turn": "stop",
"stop_sequence": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"content_filtered": "content_filter",
"guardrail_intervened": "content_filter",
}
return mapping.get(stop_reason, "stop")
def normalize_converse_response(response: Dict) -> SimpleNamespace:
"""Convert a Bedrock Converse API response to an OpenAI-compatible object.
The agent loop in ``run_agent.py`` expects responses shaped like
``openai.ChatCompletion`` — this function bridges the gap.
Returns a SimpleNamespace with:
- ``.choices[0].message.content`` — text response
- ``.choices[0].message.tool_calls`` — tool call list (if any)
- ``.choices[0].finish_reason`` — stop/tool_calls/length
- ``.usage`` — token usage stats
"""
output = response.get("output", {})
message = output.get("message", {})
content_blocks = message.get("content", [])
stop_reason = response.get("stopReason", "end_turn")
text_parts = []
reasoning_parts = []
tool_calls = []
for block in content_blocks:
if "text" in block:
text_parts.append(block["text"])
elif "reasoningContent" in block:
reasoning = block["reasoningContent"]
if isinstance(reasoning, dict):
thinking_text = reasoning.get("text", "")
if thinking_text:
reasoning_parts.append(str(thinking_text))
elif "toolUse" in block:
tu = block["toolUse"]
tool_calls.append(SimpleNamespace(
id=tu.get("toolUseId", ""),
type="function",
function=SimpleNamespace(
name=tu.get("name", ""),
arguments=json.dumps(tu.get("input", {})),
),
))
msg = SimpleNamespace(
role="assistant",
content="\n".join(text_parts) if text_parts else None,
tool_calls=tool_calls if tool_calls else None,
reasoning_content="\n\n".join(reasoning_parts) if reasoning_parts else None,
)
usage_data = response.get("usage", {})
usage = SimpleNamespace(
prompt_tokens=usage_data.get("inputTokens", 0),
completion_tokens=usage_data.get("outputTokens", 0),
total_tokens=(
usage_data.get("inputTokens", 0) + usage_data.get("outputTokens", 0)
),
)
finish_reason = _converse_stop_reason_to_openai(stop_reason)
if tool_calls and finish_reason == "stop":
finish_reason = "tool_calls"
choice = SimpleNamespace(
index=0,
message=msg,
finish_reason=finish_reason,
)
return SimpleNamespace(
choices=[choice],
usage=usage,
model=response.get("modelId", ""),
)
def normalize_converse_stream_events(event_stream) -> SimpleNamespace:
"""Consume a Bedrock ConverseStream event stream and build an OpenAI-compatible response.
Processes the stream events in order:
- ``messageStart`` — role info
- ``contentBlockStart`` — new text or toolUse block
- ``contentBlockDelta`` — incremental text or toolUse input
- ``contentBlockStop`` — block complete
- ``messageStop`` — stop reason
- ``metadata`` — usage stats
Returns the same shape as ``normalize_converse_response()``.
"""
return stream_converse_with_callbacks(event_stream)
def stream_converse_with_callbacks(
event_stream,
on_text_delta=None,
on_tool_start=None,
on_reasoning_delta=None,
on_interrupt_check=None,
) -> SimpleNamespace:
"""Process a Bedrock ConverseStream event stream with real-time callbacks.
This is the core streaming function that powers both the CLI's live token
display and the gateway's progressive message updates.
Args:
event_stream: The boto3 ``converse_stream()`` response containing a
``stream`` key with an iterable of events.
on_text_delta: Called with each text chunk as it arrives. Only fires
when no tool_use blocks have been seen (same semantics as the
Anthropic and chat_completions streaming paths).
on_tool_start: Called with the tool name when a toolUse block begins.
Lets the TUI show a spinner while tool arguments are generated.
on_reasoning_delta: Called with reasoning/thinking text chunks.
Bedrock surfaces thinking via ``reasoning`` content block deltas
on supported models (Claude 4.6+).
on_interrupt_check: Called on each event. Should return True if the
agent has been interrupted and streaming should stop.
Returns:
An OpenAI-compatible SimpleNamespace response, identical in shape to
``normalize_converse_response()``.
"""
text_parts: List[str] = []
reasoning_parts: List[str] = []
tool_calls: List[SimpleNamespace] = []
current_tool: Optional[Dict] = None
current_text_buffer: List[str] = []
has_tool_use = False
stop_reason = "end_turn"
usage_data: Dict[str, int] = {}
for event in event_stream.get("stream", []):
if on_interrupt_check and on_interrupt_check():
break
if "contentBlockStart" in event:
start = event["contentBlockStart"].get("start", {})
if "toolUse" in start:
has_tool_use = True
if current_text_buffer:
text_parts.append("".join(current_text_buffer))
current_text_buffer = []
current_tool = {
"toolUseId": start["toolUse"].get("toolUseId", ""),
"name": start["toolUse"].get("name", ""),
"input_json": "",
}
if on_tool_start:
on_tool_start(current_tool["name"])
elif "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
text = delta["text"]
current_text_buffer.append(text)
if on_text_delta and not has_tool_use:
on_text_delta(text)
elif "toolUse" in delta:
if current_tool is not None:
current_tool["input_json"] += delta["toolUse"].get("input", "")
elif "reasoningContent" in delta:
reasoning = delta["reasoningContent"]
if isinstance(reasoning, dict):
thinking_text = reasoning.get("text", "")
if thinking_text:
reasoning_parts.append(str(thinking_text))
if on_reasoning_delta:
on_reasoning_delta(thinking_text)
elif "contentBlockStop" in event:
if current_tool is not None:
try:
input_dict = json.loads(current_tool["input_json"]) if current_tool["input_json"] else {}
except (json.JSONDecodeError, TypeError):
input_dict = {}
tool_calls.append(SimpleNamespace(
id=current_tool["toolUseId"],
type="function",
function=SimpleNamespace(
name=current_tool["name"],
arguments=json.dumps(input_dict),
),
))
current_tool = None
elif current_text_buffer:
text_parts.append("".join(current_text_buffer))
current_text_buffer = []
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason", "end_turn")
elif "metadata" in event:
meta_usage = event["metadata"].get("usage", {})
usage_data = {
"inputTokens": meta_usage.get("inputTokens", 0),
"outputTokens": meta_usage.get("outputTokens", 0),
}
if current_text_buffer:
text_parts.append("".join(current_text_buffer))
msg = SimpleNamespace(
role="assistant",
content="\n".join(text_parts) if text_parts else None,
tool_calls=tool_calls if tool_calls else None,
reasoning_content="\n\n".join(reasoning_parts) if reasoning_parts else None,
)
usage = SimpleNamespace(
prompt_tokens=usage_data.get("inputTokens", 0),
completion_tokens=usage_data.get("outputTokens", 0),
total_tokens=(
usage_data.get("inputTokens", 0) + usage_data.get("outputTokens", 0)
),
)
finish_reason = _converse_stop_reason_to_openai(stop_reason)
if tool_calls and finish_reason == "stop":
finish_reason = "tool_calls"
choice = SimpleNamespace(
index=0,
message=msg,
finish_reason=finish_reason,
)
return SimpleNamespace(
choices=[choice],
usage=usage,
model="",
)
def build_converse_kwargs(
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Build kwargs for ``bedrock-runtime.converse()`` or ``converse_stream()``.
Converts OpenAI-format inputs to Converse API parameters.
"""
system_prompt, converse_messages = convert_messages_to_converse(messages)
kwargs: Dict[str, Any] = {
"modelId": model,
"messages": converse_messages,
"inferenceConfig": {
"maxTokens": max_tokens,
},
}
if system_prompt:
kwargs["system"] = system_prompt
if temperature is not None:
kwargs["inferenceConfig"]["temperature"] = temperature
if top_p is not None:
kwargs["inferenceConfig"]["topP"] = top_p
if stop_sequences:
kwargs["inferenceConfig"]["stopSequences"] = stop_sequences
if tools:
converse_tools = convert_tools_to_converse(tools)
if converse_tools:
if _model_supports_tool_use(model):
kwargs["toolConfig"] = {"tools": converse_tools}
else:
logger.warning(
"Model %s does not support tool calling — tools stripped. "
"The agent will operate in text-only mode.", model
)
if guardrail_config:
kwargs["guardrailConfig"] = guardrail_config
return kwargs
def call_converse(
region: str,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> SimpleNamespace:
"""Call Bedrock Converse API (non-streaming) and return an OpenAI-compatible response.
This is the primary entry point for the agent loop when using the Bedrock provider.
"""
client = _get_bedrock_runtime_client(region)
kwargs = build_converse_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop_sequences=stop_sequences,
guardrail_config=guardrail_config,
)
try:
response = client.converse(**kwargs)
except Exception as exc:
if is_stale_connection_error(exc):
logger.warning(
"bedrock: stale-connection error on converse(region=%s, model=%s): "
"%s — evicting cached client so the next call reconnects.",
region, model, type(exc).__name__,
)
invalidate_runtime_client(region)
raise
return normalize_converse_response(response)
def call_converse_stream(
region: str,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> SimpleNamespace:
"""Call Bedrock ConverseStream API and return an OpenAI-compatible response.
Consumes the full stream and returns the assembled response. For true
streaming with delta callbacks, use ``iter_converse_stream()`` instead.
"""
client = _get_bedrock_runtime_client(region)
kwargs = build_converse_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop_sequences=stop_sequences,
guardrail_config=guardrail_config,
)
try:
response = client.converse_stream(**kwargs)
except Exception as exc:
if is_stale_connection_error(exc):
logger.warning(
"bedrock: stale-connection error on converse_stream(region=%s, "
"model=%s): %s — evicting cached client so the next call reconnects.",
region, model, type(exc).__name__,
)
invalidate_runtime_client(region)
raise
return normalize_converse_stream_events(response)
_discovery_cache: Dict[str, Any] = {}
_DISCOVERY_CACHE_TTL_SECONDS = 3600
def reset_discovery_cache():
"""Clear the model discovery cache. Used in tests."""
_discovery_cache.clear()
def discover_bedrock_models(
region: str,
provider_filter: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Discover available Bedrock foundation models and inference profiles.
Returns a list of model info dicts with keys:
- ``id``: Model ID (e.g. "anthropic.claude-sonnet-4-6-20250514-v1:0")
- ``name``: Human-readable name
- ``provider``: Model provider (e.g. "Anthropic", "Amazon", "Meta")
- ``input_modalities``: List of input types (e.g. ["TEXT", "IMAGE"])
- ``output_modalities``: List of output types
- ``streaming``: Whether streaming is supported
Caches results for 1 hour per region to avoid repeated API calls.
Mirrors OpenClaw's ``discoverBedrockModels()`` in
``extensions/amazon-bedrock/discovery.ts``.
"""
import time
cache_key = f"{region}:{','.join(sorted(provider_filter or []))}"
cached = _discovery_cache.get(cache_key)
if cached and (time.time() - cached["timestamp"]) < _DISCOVERY_CACHE_TTL_SECONDS:
return cached["models"]
try:
client = _get_bedrock_control_client(region)
except Exception as e:
logger.warning("Failed to create Bedrock client for model discovery: %s", e)
return []
models = []
seen_ids = set()
filter_set = {f.lower() for f in (provider_filter or [])}
try:
response = client.list_foundation_models()
for summary in response.get("modelSummaries", []):
model_id = (summary.get("modelId") or "").strip()
if not model_id:
continue
if filter_set:
provider_name = (summary.get("providerName") or "").lower()
model_prefix = model_id.split(".")[0].lower() if "." in model_id else ""
if provider_name not in filter_set and model_prefix not in filter_set:
continue
lifecycle = summary.get("modelLifecycle", {})
if lifecycle.get("status", "").upper() != "ACTIVE":
continue
if not summary.get("responseStreamingSupported", False):
continue
output_mods = summary.get("outputModalities", [])
if "TEXT" not in output_mods:
continue
models.append({
"id": model_id,
"name": (summary.get("modelName") or model_id).strip(),
"provider": (summary.get("providerName") or "").strip(),
"input_modalities": summary.get("inputModalities", []),
"output_modalities": output_mods,
"streaming": True,
})
seen_ids.add(model_id.lower())
except Exception as e:
logger.warning("Failed to list Bedrock foundation models: %s", e)
try:
profiles = []
next_token = None
while True:
kwargs = {}
if next_token:
kwargs["nextToken"] = next_token
response = client.list_inference_profiles(**kwargs)
for profile in response.get("inferenceProfileSummaries", []):
profiles.append(profile)
next_token = response.get("nextToken")
if not next_token:
break
for profile in profiles:
profile_id = (profile.get("inferenceProfileId") or "").strip()
if not profile_id:
continue
if profile.get("status") != "ACTIVE":
continue
if profile_id.lower() in seen_ids:
continue
if filter_set:
profile_models = profile.get("models", [])
matches = any(
_extract_provider_from_arn(m.get("modelArn", "")).lower() in filter_set
for m in profile_models
)
if not matches:
continue
models.append({
"id": profile_id,
"name": (profile.get("inferenceProfileName") or profile_id).strip(),
"provider": "inference-profile",
"input_modalities": ["TEXT"],
"output_modalities": ["TEXT"],
"streaming": True,
})
seen_ids.add(profile_id.lower())
except Exception as e:
logger.debug("Skipping inference profile discovery: %s", e)
models.sort(key=lambda m: (
0 if m["id"].startswith("global.") else 1,
m["name"].lower(),
))
_discovery_cache[cache_key] = {
"timestamp": time.time(),
"models": models,
}
return models
def _extract_provider_from_arn(arn: str) -> str:
"""Extract the model provider from a Bedrock model ARN.
Example: "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2"
→ "anthropic"
"""
match = re.search(r"foundation-model/([^.]+)", arn)
return match.group(1) if match else ""
def get_bedrock_model_ids(region: str) -> List[str]:
"""Return a flat list of available Bedrock model IDs for the given region.
Convenience wrapper around ``discover_bedrock_models()`` for use in
the model selection UI.
"""
models = discover_bedrock_models(region)
return [m["id"] for m in models]
CONTEXT_OVERFLOW_PATTERNS = [
re.compile(r"ValidationException.*(?:input is too long|max input token|input token.*exceed)", re.IGNORECASE),
re.compile(r"ValidationException.*(?:exceeds? the (?:maximum|max) (?:number of )?(?:input )?tokens)", re.IGNORECASE),
re.compile(r"ModelStreamErrorException.*(?:Input is too long|too many input tokens)", re.IGNORECASE),
]
THROTTLE_PATTERNS = [
re.compile(r"ThrottlingException", re.IGNORECASE),
re.compile(r"Too many concurrent requests", re.IGNORECASE),
re.compile(r"ServiceQuotaExceededException", re.IGNORECASE),
]
OVERLOAD_PATTERNS = [
re.compile(r"ModelNotReadyException", re.IGNORECASE),
re.compile(r"ModelTimeoutException", re.IGNORECASE),
re.compile(r"InternalServerException", re.IGNORECASE),
]
def is_context_overflow_error(error_message: str) -> bool:
"""Return True if the error indicates the input context was too large.
When this returns True, the agent should compress context and retry
rather than treating it as a fatal error.
"""
return any(p.search(error_message) for p in CONTEXT_OVERFLOW_PATTERNS)
def classify_bedrock_error(error_message: str) -> str:
"""Classify a Bedrock error for retry/failover decisions.
Returns:
- ``"context_overflow"`` — input too long, compress and retry
- ``"rate_limit"`` — throttled, backoff and retry
- ``"overloaded"`` — model temporarily unavailable, retry with delay
- ``"unknown"`` — unclassified error
"""
if is_context_overflow_error(error_message):
return "context_overflow"
if any(p.search(error_message) for p in THROTTLE_PATTERNS):
return "rate_limit"
if any(p.search(error_message) for p in OVERLOAD_PATTERNS):
return "overloaded"
return "unknown"
BEDROCK_CONTEXT_LENGTHS: Dict[str, int] = {
"anthropic.claude-opus-4-6": 200_000,
"anthropic.claude-sonnet-4-6": 200_000,
"anthropic.claude-sonnet-4-5": 200_000,
"anthropic.claude-haiku-4-5": 200_000,
"anthropic.claude-opus-4": 200_000,
"anthropic.claude-sonnet-4": 200_000,
"anthropic.claude-3-5-sonnet": 200_000,
"anthropic.claude-3-5-haiku": 200_000,
"anthropic.claude-3-opus": 200_000,
"anthropic.claude-3-sonnet": 200_000,
"anthropic.claude-3-haiku": 200_000,
"amazon.nova-pro": 300_000,
"amazon.nova-lite": 300_000,
"amazon.nova-micro": 128_000,
"meta.llama4-maverick": 128_000,
"meta.llama4-scout": 128_000,
"meta.llama3-3-70b-instruct": 128_000,
"mistral.mistral-large": 128_000,
"deepseek.v3": 128_000,
}
BEDROCK_DEFAULT_CONTEXT_LENGTH = 128_000
def get_bedrock_context_length(model_id: str) -> int:
"""Look up the context window size for a Bedrock model.
Uses substring matching so versioned IDs like
``anthropic.claude-sonnet-4-6-20250514-v1:0`` resolve correctly.
"""
model_lower = model_id.lower()
best_key = ""
best_val = BEDROCK_DEFAULT_CONTEXT_LENGTH
for key, val in BEDROCK_CONTEXT_LENGTHS.items():
if key in model_lower and len(key) > len(best_key):
best_key = key
best_val = val
return best_val