use std::time::Instant;

use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use atomcode_telemetry::{CurrentContext, Event as TelemetryEvent, LlmErrorKind, ToolErrorKind};

use crate::config::Config;
use crate::conversation::Conversation;
use crate::hook::{HookCtx, HookEngine, ToolResultContext};
use crate::provider::LlmProvider;
use crate::stream::StreamEvent;
use crate::tool::{
    PermissionDecision, ToolCall, ToolCallBuffer, ToolContext, ToolRegistry, ToolResult,
};

use super::event::{TurnEvent, TurnResult};
use super::loop_guard::{LoopGuardDecision, LoopGuardState};
use super::permission::PermissionDecider;

/// Core LLM streaming + tool execution primitive.
///
/// Handles exactly one LLM call cycle:
/// 1. Build messages from conversation
/// 2. Stream LLM response (text deltas + tool calls)
/// 3. Execute tool calls (with permission checking)
/// 4. Add results to conversation
///
/// Does NOT handle: retries, discipline (anti-loop, step limits), or conversation management.
/// The caller (AgentLoop / SubagentLoop) owns those responsibilities.
pub struct TurnRunner {
    pub provider: std::sync::Arc<dyn LlmProvider>,
    pub tools: std::sync::Arc<ToolRegistry>,
    pub context: ToolContext,
    pub config: Config,
    /// Context construction strategy. Shared with the parent
    /// `AgentLoop::ctx` (same `Arc`) so the turn's actual send and
    /// the agent's datalog snapshot go through one ctx — per-model
    /// logic like `apply_model_directives` lands on both paths.
    /// Rebuilt on `AgentCommand::ReloadConfig` alongside the agent's
    /// clone.
    pub ctx: std::sync::Arc<dyn crate::ctx::CtxBuilder>,
    pub permission: Box<dyn PermissionDecider>,
    /// 统一 Hook 引擎 (Arc, 由 AgentLoop 管理)
    pub hook_engine: std::sync::Arc<HookEngine>,
    /// Files edited during the current session (tracked for context awareness).
    pub recently_edited_files: Vec<String>,
    /// Cross-batch tool-call loop guard. Cleared per user-message by the
    /// agent (see `handle_send_message`); records every executed tool's
    /// `(name, args, output_hash)` triple and short-circuits the third
    /// identical attempt. See `loop_guard.rs` for the full rationale.
    pub loop_guard: LoopGuardState,
    /// Current turn number, set by AgentLoop before each turn.
    /// Propagated to ToolCallStartContext so built-in hooks (e.g.
    /// ToolAuditLogHook) see the correct turn index.
    pub current_turn_number: u32,
 }

impl TurnRunner {
    /// Execute one LLM turn: stream response, execute any tool calls, return result.
    pub async fn run(
        &mut self,
        conversation: &mut Conversation,
        system_prompt: &str,
        event_tx: &mpsc::UnboundedSender<TurnEvent>,
        cancel: CancellationToken,
    ) -> TurnResult {
        self.run_with_filter(conversation, system_prompt, "", event_tx, cancel, None)
            .await
    }

    /// Run with optional tool filter and turn reminder.
    /// `turn_reminder` is dynamic per-turn context (git status, current task, etc.)
    /// injected as a <system-reminder> into the last user message to keep the
    /// system prompt stable for caching.
    pub async fn run_with_filter(
        &mut self,
        conversation: &mut Conversation,
        system_prompt: &str,
        turn_reminder: &str,
        event_tx: &mpsc::UnboundedSender<TurnEvent>,
        cancel: CancellationToken,
        allowed_tools: Option<&[&str]>,
    ) -> TurnResult {
        // Telemetry: build a per-turn context carrying turn_id / provider / model.
        // Emitted on every exit path via the `tel_return!` macro below.
        let turn_id = uuid::Uuid::new_v4();
        let parent = CurrentContext::current();
        // Telemetry envelope fields:
        //   provider      = vendor type ("claude" / "openai" / "ollama"),
        //                   read directly from ProviderConfig — analytics
        //                   want the vendor label, not the user's named alias.
        //   provider_host = host parsed from base_url, with vendor default
        //                   fallback. Resolved by the telemetry crate so the
        //                   default-host table lives next to the schema.
        //   model         = LlmProvider::model_name() — the wire-level model
        //                   string sent to the API.
        let pcfg = self
            .config
            .providers
            .get(&self.config.default_provider);
        let vendor = pcfg.map(|p| p.provider_type.clone());
        let host = pcfg.and_then(|p| {
            atomcode_telemetry::resolve_provider_host(&p.provider_type, p.base_url.as_deref())
        });
        let scope_ctx = CurrentContext {
            turn_id: Some(turn_id),
            provider: parent.provider.clone().or(vendor),
            provider_host: parent.provider_host.clone().or(host),
            model: parent
                .model
                .clone()
                .or_else(|| Some(self.provider.model_name().to_string())),
            ..parent
        };
        let turn_started = std::time::Instant::now();

        // 1. Build messages within token budget.
        // Goes through `self.ctx.build_messages` (trait dispatch), NOT
        // `ctx::render::build_messages` (free fn) — otherwise per-model
        // logic like `apply_model_directives` only lands in datalog and
        // the actually-sent messages diverge from what we logged.
        let context_window = self.ctx.ctx_window();

        let (messages, ctx_stats) =
            self.ctx
                .build_messages(conversation, system_prompt, turn_reminder);

        let actual_tokens: usize = messages.iter().map(|m| m.estimate_tokens()).sum();

        // Set budget hint for read_file dynamic threshold.
        // read_file checks this to decide full content vs skeleton.
        self.context.ctx_budget_hint.store(
            context_window.saturating_sub(actual_tokens),
            std::sync::atomic::Ordering::Relaxed,
        );
        let _ = event_tx.send(TurnEvent::ContextStats {
            system_tokens: ctx_stats.system_tokens,
            sent_tokens: actual_tokens.saturating_sub(ctx_stats.system_tokens),
            dropped_tokens: ctx_stats.dropped_tokens,
            working_set_tokens: 0,
            total_messages: messages.len(),
        });

        // 3. Get tool definitions for the LLM
        let all_tool_defs = self.tools.get_definitions().await;
        let mut tool_defs: Vec<_> = if let Some(filter) = allowed_tools {
            all_tool_defs
                .into_iter()
                .filter(|d| filter.contains(&d.name))
                .collect()
        } else {
            all_tool_defs
        };

        // Inject ALL known-existing files into write_file description.
        // Includes both edited AND read files — anything the model touched exists on disk.
        {
            let mut known_files: Vec<String> = self.recently_edited_files.clone();
            // Extract read files from conversation tool calls
            for msg in &messages {
                if let crate::conversation::message::MessageContent::AssistantWithToolCalls {
                    tool_calls,
                    ..
                } = &msg.content
                {
                    for call in tool_calls {
                        if call.name == "read_file" {
                            if let Ok(args) =
                                serde_json::from_str::<serde_json::Value>(&call.arguments)
                            {
                                if let Some(fp) = args.get("file_path").and_then(|v| v.as_str()) {
                                    let short = fp.rsplit('/').next().unwrap_or(fp).to_string();
                                    if !known_files.contains(&short) {
                                        known_files.push(short);
                                    }
                                }
                            }
                        }
                    }
                }
            }
            if !known_files.is_empty() {
                if let Some(wf) = tool_defs.iter_mut().find(|d| d.name == "create_file") {
                    // Display basenames for readability in tool description
                    let display_names: Vec<&str> = known_files
                        .iter()
                        .map(|p| p.rsplit('/').next().unwrap_or(p.as_str()))
                        .collect();
                    let list = if display_names.len() <= 6 {
                        display_names.join(", ")
                    } else {
                        format!(
                            "{}, ... ({} files)",
                            display_names[..5].join(", "),
                            display_names.len()
                        )
                    };
                    wf.description.push_str(&format!(
                        "\nThese files ALREADY EXIST — use edit_file instead: {}",
                        list,
                    ));
                }
            }
        }

        // Log the request to <datalog_dir>/llm/<ts>.json right before send.
        // `pending_request_log` holds the path so the response call below
        // can merge into the same file — passed explicitly to avoid the old
        // process-wide-static approach that bled across concurrent daemon
        // sessions.
        //
        // `datalog_dir` is resolved from `[datalog].dir` (default
        // `$ATOMCODE_HOME/datalog/<project-slug>/`) — the same root the
        // markdown writer uses, so request JSON, response JSON, calls.log,
        // and the markdown summary all live next to each other for any
        // given project.
        let pending_request_log = {
            let wd = self
                .context
                .working_dir
                .try_read()
                .map(|g| g.clone())
                .unwrap_or_default();
            let datalog_dir = crate::turn::datalog::DatalogWriter::resolve_log_dir(
                &wd,
                self.config.datalog.dir.as_deref(),
            );
            super::log::log_llm_request(
                &datalog_dir,
                &messages,
                &tool_defs,
                self.provider.model_name(),
                context_window,
                0, // step — always 0 in calls.log today; step param
                // kept for future per-tool-call correlation.
                &self.provider.session_id(),
                self.config.datalog.enabled,
            )
        };

        // 3. Start streaming
        let stream_start = std::time::Instant::now();
        let stream_result = self.provider.chat_stream(&messages, Some(&tool_defs));

        // 4. Process stream events
        let mut tool_calls_buf: Vec<ToolCall> = Vec::new();
        // RAW accumulator — keeps `<tool_call>...</tool_call>` blocks intact
        // so the rescue path at Done can parse them when the model emitted
        // its tool calls as XML in text instead of using the structured
        // tool_calls API (Qwen / GLM / DeepSeek occasional misbehavior).
        let mut text_buf = String::new();
        // VISIBLE accumulator — mirror of what `stream_filter` actually
        // emitted to UI / conversation history. Used for `TurnResult::
        // Responded.text` so downstream consumers (datalog `log_text`,
        // ATLAS plan extraction, telemetry) see the same clean text the
        // user saw, not the raw text_buf with leaked XML. Earlier bug
        // (5-7 datalog 20-14-23 Turn 5): Responded.text was raw text_buf
        // → datalog `**Response:**` block carried `<tool_call>grep<arg_key>
        // pattern</arg_key>...</tool_call>` mid-prose, polluting A/B
        // analysis.
        let mut visible_text_buf = String::new();
        // Runaway-stream guard: if a degenerate model gets stuck in an
        // autoregressive loop emitting the same character forever, we
        // abort the turn rather than burning the user's quota waiting
        // for the upstream max_tokens cap (#225). The detector keeps
        // running across all visible Delta text in this turn — tool
        // call boundaries reset stream_filter but not this; a real
        // runaway will still trip on the per-segment text alone.
        let mut runaway_detector = crate::stream::RunawayDetector::with_default_threshold();
        // Reasoning-model thinking content collected separately — not emitted
        // to scrollback by default (users don't want to read the thinking).
        // If `text_buf` ends up empty at `Done` but this is non-empty, we
        // promote reasoning to the final answer: some gateways route entire
        // responses through `reasoning_content` for MiniMax-M2.7 / DeepSeek-R1,
        // and without the fallback we'd return a silent 0-token "Nailed it".
        let mut reasoning_buf = String::new();
        // Anthropic extended-thinking blocks (text + signature) accumulated
        // from `StreamEvent::ThinkingBlock`. Carried into the message via
        // `finalize_stream_with_tool_calls_and_thinking` so the next
        // request can echo them back — Anthropic 400s otherwise (`The
        // content[].thinking in the thinking mode must be passed back`).
        let mut thinking_blocks: Vec<crate::conversation::message::ThinkingBlock> =
            Vec::new();
        let mut total_tokens: usize = 0;
        // Telemetry: per-turn token counters populated from StreamEvent::Usage.
        let mut tel_input_tokens: u32 = 0;
        let mut tel_output_tokens: u32 = 0;
        let mut tel_cached_tokens: u32 = 0;
        let mut got_usage = false;

        // Telemetry helper: emit LlmChat (with turn_id/provider/model in scope)
        // and return the given result. `scope_ctx` is cloned for each emission so
        // the task-local is properly set when `track` reads `CurrentContext::current()`.
        macro_rules! tel_return {
            ($result:expr, $tool_count:expr, $conv:expr) => {{
                let result = $result;
                let messages_count = $conv.messages.len() as u32;
                // system_tokens: estimate from the system prompt string
                let system_tokens: u32 =
                    crate::conversation::message::Message::new(
                        crate::conversation::message::Role::System,
                        system_prompt,
                    ).estimate_tokens() as u32;
                // tool_def_tokens: direct measurement from tool definitions sent to the LLM.
                // Each ToolDef contributes name + description + JSON-serialized parameters.
                let tool_def_tokens: u32 = tool_defs
                    .iter()
                    .map(|d| {
                        let params_len = d.parameters.to_string().len();
                        // name + description + serialized params, ~4 chars/token, +4 overhead
                        (d.name.len() + d.description.len() + params_len) / 4 + 4
                    })
                    .sum::<usize>() as u32;
                // tool_result_tokens: sum of estimates for Role::Tool messages in conversation
                let tool_result_tokens: u32 = $conv
                    .messages
                    .iter()
                    .filter(|m| matches!(m.role, crate::conversation::message::Role::Tool))
                    .map(|m| m.estimate_tokens() as u32)
                    .sum();
                // message_tokens: sum of estimates for Role::User + Role::Assistant messages
                let message_tokens: u32 = $conv
                    .messages
                    .iter()
                    .filter(|m| matches!(
                        m.role,
                        crate::conversation::message::Role::User
                            | crate::conversation::message::Role::Assistant
                    ))
                    .map(|m| m.estimate_tokens() as u32)
                    .sum();
                let (error_kind, error_data) = if result.is_failed() {
                    let reason = match &result {
                        TurnResult::Failed(r) => r.clone(),
                        _ => String::new(),
                    };
                    let kind = classify_llm_error(&reason);
                    let error_data = build_llm_error_data(
                        kind,
                        &reason,
                        turn_started.elapsed().as_millis() as u32,
                        scope_ctx.provider.as_deref(),
                        scope_ctx.provider_host.as_deref(),
                        scope_ctx.model.as_deref(),
                        context_window as u32,
                        system_tokens,
                        tool_def_tokens,
                        tool_result_tokens,
                        message_tokens,
                        messages_count,
                    );
                    (Some(kind), error_data)
                } else {
                    (None, None)
                };
                let event = TelemetryEvent::LlmChat {
                    duration_ms: turn_started.elapsed().as_millis() as u32,
                    tool_calls_count: $tool_count as u32,
                    input_tokens: tel_input_tokens,
                    output_tokens: tel_output_tokens,
                    cached_tokens: tel_cached_tokens,
                    had_error: result.is_failed(),
                    context_window: context_window as u32,
                    system_tokens,
                    tool_def_tokens,
                    tool_result_tokens,
                    message_tokens,
                    messages_count,
                    error_kind,
                    error_data,
                };
                let tel = self.context.telemetry.clone();
                let emit_ctx = scope_ctx.clone();
                CurrentContext::scope(emit_ctx, || async move {
                    tel.track(event);
                })
                .await;
                return result;
            }};
            // Variant for early-exit paths where conversation is not available.
            ($result:expr, $tool_count:expr) => {
                tel_return!($result, $tool_count, conversation)
            };
        }

        let mut stream = match stream_result {
            Ok(s) => s,
            Err(e) => tel_return!(TurnResult::Failed(e.to_string()), 0u32),
        };
        let mut got_any_event = false;
        let mut was_truncated = false;
        // Hides `<tool_call>...</tool_call>` blocks from UI/conversation while
        // keeping `text_buf` raw so rescue can still parse them at Done.
        let mut stream_filter = ToolCallStreamFilter::default();

        // Stream timeouts. Defaults are 300s for both first-token and
        // subsequent-token waits, since slow domestic model providers
        // (SiliconFlow, Zhipu GLM, etc.) under thinking mode can take >3min
        // to emit a single token after a large prompt. Override via env
        // ATOMCODE_FIRST_TOKEN_TIMEOUT_SECS / ATOMCODE_STREAM_TIMEOUT_SECS
        // for environments where you want a tighter "real hang" detector.
        fn timeout_from_env(var: &str, default_secs: u64) -> std::time::Duration {
            std::env::var(var)
                .ok()
                .and_then(|v| v.parse::<u64>().ok())
                .map(std::time::Duration::from_secs)
                .unwrap_or_else(|| std::time::Duration::from_secs(default_secs))
        }
        let first_token_timeout = timeout_from_env("ATOMCODE_FIRST_TOKEN_TIMEOUT_SECS", 300);
        let stream_timeout = timeout_from_env("ATOMCODE_STREAM_TIMEOUT_SECS", 300);

        loop {
            let timeout = if got_any_event {
                stream_timeout
            } else {
                first_token_timeout
            };
            tokio::select! {
                            biased;

            _ = cancel.cancelled() => {
                                conversation.finalize_stream();
                                tel_return!(TurnResult::Cancelled, 0u32);
                            }

                            _ = tokio::time::sleep(timeout) => {
                                conversation.finalize_stream();
                                tel_return!(TurnResult::Failed(format!(
                                    "Stream timeout: no event for {:?}",
                                    timeout
                                )), 0u32);
                            }

                            event = stream.next() => {
                                match event {
                                    Some(Ok(StreamEvent::Delta(text))) => {
                                        got_any_event = true;
                                        // Strip model-internal tags (DeepSeek </think>`, QwQ, etc.)
                                        let text = strip_model_tags(&text);
                                        if !text.is_empty() {
                                            // Raw goes into rescue source so XML tool_call blocks
                                            // can be parsed at Done.
                                            text_buf.push_str(&text);
                                            // Visible stream excludes <tool_call>...</tool_call>
                                            // blocks (Qwen/GLM XML leak suppression).
                                            let visible = stream_filter.feed(&text);
                                            if !visible.is_empty() {
                                                // Runaway guard runs on the visible stream
                                                // only — XML scaffolding inside <tool_call>
                                                // blocks legitimately runs long, and we
                                                // don't want to false-positive on it (#225).
                                                if let Some(reason) =
                                                    runaway_detector.feed(&visible)
                                                {
                                                    conversation.push_delta(&visible);
                                                    visible_text_buf.push_str(&visible);
                                                    let _ = event_tx
                                                        .send(TurnEvent::TextDelta(visible));
                                                    conversation.finalize_stream();
                                                    tel_return!(
                                                        TurnResult::Failed(reason),
                                                        0u32
                                                    );
                                                }
                                                conversation.push_delta(&visible);
                                                visible_text_buf.push_str(&visible);
                                                let _ = event_tx.send(TurnEvent::TextDelta(visible));
                                            }
                                        }
                                    }
                                    Some(Ok(StreamEvent::Reasoning(text))) => {
                                        got_any_event = true;
                                        // Emit to UI for verbose mode (Ctrl+O) display.
                                        // Still accumulate for the fallback case where
                                        // content ends up empty.
                                        let _ = event_tx.send(TurnEvent::ReasoningDelta(text.clone()));
                                        reasoning_buf.push_str(&text);
                                    }
                                    Some(Ok(StreamEvent::ThinkingBlock { text, signature })) => {
                                        got_any_event = true;
                                        // Anthropic-only path: store the block (with
                                        // its signature) for echo-back. Don't emit a
                                        // UI event — the text was already streamed
                                        // through ReasoningDelta during the deltas.
                                        thinking_blocks.push(
                                            crate::conversation::message::ThinkingBlock {
                                                text,
                                                signature,
                                            },
                                        );
                                    }
                                    Some(Ok(StreamEvent::ToolCallStart { id, name })) => {
                                        got_any_event = true;
                                        // Surface the tool name to UI immediately — otherwise users see
                                        // "Generating…" for the entire args-streaming window (can be 30s+
                                        // for large write_file calls).
                                        let _ = event_tx.send(TurnEvent::ToolCallStreaming { name: name.clone(), hint: String::new() });
                                        conversation.tool_call_buffer = Some(ToolCallBuffer {
                                            id,
                                            name,
                                            arguments: String::new(),
                                            hint_sent: false,
                                        });
                                    }

                                    Some(Ok(StreamEvent::ToolCallDelta(args))) => {
                                        got_any_event = true;
                                        if let Some(ref mut buf) = conversation.tool_call_buffer {
                                            buf.arguments.push_str(&args);
                                            // Extract file_path from partial args (once only).
                                            if !buf.hint_sent && buf.arguments.len() < 300 {
                                                if let Some(hint) = extract_path_hint(&buf.arguments) {
                                                    buf.hint_sent = true;
                                                    let _ = event_tx.send(TurnEvent::ToolCallStreaming {
                                                        name: buf.name.clone(),
                                                        hint,
                                                    });
                                                }
                                            }
                                        }
                                    }

                                    Some(Ok(StreamEvent::ToolCallDone(mut call))) => {
                                        conversation.tool_call_buffer = None;
                                        // Variant E — atomgit gateway occasionally
                                        // corrupts `function.name` by spilling argument
                                        // attributes into it (e.g.
                                        // name='grep" path="..." pattern="..."'). The
                                        // `arguments` field is then "{}". Drop the call
                                        // entirely so it never enters tool_calls_buf nor
                                        // the assistant message — the next stream is a
                                        // fresh routing-lottery roll. Surface a one-line
                                        // Error event so the user sees what happened.
                                        if name_looks_corrupt(&call.name) {
                                            let _ = event_tx.send(TurnEvent::Error(format!(
                                                "Dropped malformed tool_call (provider returned corrupt function name: {:?})",
                                                truncate(&call.name, 60)
                                            )));
                                            continue;
                                        }

                                        // Variants A/A2/B/C/D — atomgit gateway wraps
                                        // tool args in `{"arguments": ...}` envelopes
                                        // (string- or object-valued, 1-3 levels deep,
                                        // with optional sibling fields). Schema-aware
                                        // recovery unwraps to the flat form expected by
                                        // the OpenAI tool-call protocol. See
                                        // `recover_tool_args` for the variant catalogue.
                                        let expected = self.tools.expected_top_keys(&call.name).await;
                                        if let Some(recovered) =
                                            crate::tool::recover_tool_args(&call.arguments, &expected)
                                        {
                                            call.arguments = recovered;
                                        }

                                        // ToolCallStarted is intentionally NOT sent here —
                                        // it's emitted when the tool actually starts executing,
                                        // so tool call and result are paired correctly in the
                                        // UI for sequential execution.
                                        tool_calls_buf.push(call);
                                    }

                                    Some(Ok(StreamEvent::Usage(usage))) => {
                                        total_tokens += usage.completion_tokens;
                                        // Telemetry: accumulate per-turn token counters.
                                        tel_input_tokens = tel_input_tokens.saturating_add(usage.prompt_tokens as u32);
                                        tel_output_tokens = tel_output_tokens.saturating_add(usage.completion_tokens as u32);
                                        tel_cached_tokens = tel_cached_tokens.saturating_add(usage.cached_tokens as u32);
                                        got_usage = true;
                                        let _ = event_tx.send(TurnEvent::TokenUsage {
                                            prompt_tokens: usage.prompt_tokens,
                                            completion_tokens: usage.completion_tokens,
                                            total_tokens: usage.prompt_tokens + usage.completion_tokens,
                                            cached_tokens: usage.cached_tokens,
                                        });
                                    }

                                    Some(Ok(StreamEvent::Done { truncated: is_truncated })) => {
                                        // Flush any holdback from the tool_call filter. If the
                                        // stream ended mid-`<tool_call>` block, the filter
                                        // discards the partial — preferring a missing close to
                                        // a leaked tag.
                                        let trailing = stream_filter.flush();
                                        if !trailing.is_empty() {
                                            conversation.push_delta(&trailing);
                                            visible_text_buf.push_str(&trailing);
                                            let _ = event_tx.send(TurnEvent::TextDelta(trailing));
                                        }

                                        // Reasoning-only fallback: some gateways route the
                                        // entire response through `reasoning_content` for
                                        // reasoning models (MiniMax-M2.7, DeepSeek-R1). If
                                        // we end up here with empty `content`, empty
                                        // tool_calls, but a non-empty reasoning buffer, treat
                                        // the reasoning as the answer — otherwise the agent's
                                        // empty-response retry loop fires twice, sleeps 4s,
                                        // and finally reports a silent "Nailed it · 0 tok".
                                        //
                                        // Rescue runs before this so real tool-call-in-text
                                        // escapes still take priority.
                                        let rescued_tools = if tool_calls_buf.is_empty() {
                                            let rescued = rescue_text_tool_calls(&text_buf);
                                            if !rescued.is_empty() {
                                                conversation.clear_stream_buffer();
                                                tool_calls_buf.extend(rescued);
                                                true
                                            } else {
                                                false
                                            }
                                        } else {
                                            // Repair path: model split intent across two channels
                                            // — function-calling JSON arrived with truncated args
                                            // (e.g. only `new_string`, missing `old_string`),
                                            // while the text stream carried the complete args as
                                            // `<tool_call>` XML. Fill missing keys from the XML
                                            // pool so the call doesn't fail at execute() with a
                                            // misleading "old_string is required". JSON wins on
                                            // conflicts; XML only fills gaps.
                                            let xml_pool = rescue_text_tool_calls(&text_buf);
                                            if !xml_pool.is_empty() {
                                                repair_tool_call_args(&mut tool_calls_buf, &xml_pool);
                                            }
                                            false
                                        };

                                        if text_buf.trim().is_empty()
                                            && tool_calls_buf.is_empty()
                                            && !rescued_tools
                                            && !is_only_placeholder_filler(&reasoning_buf)
                                        {
                                            // Skip-promotion guard: when the reasoning
                                            // channel carries nothing besides copies of
                                            // our own outbound placeholder
                                            // (`(no reasoning recorded)`), don't promote
                                            // it to the assistant text channel. Some
                                            // gateways echo back the placeholder as the
                                            // response's reasoning_content; more often
                                            // the model mimics the pattern from a
                                            // context full of historical placeholder
                                            // copies — DeepSeek V4 thinking-mode
                                            // requires non-empty reasoning_content on
                                            // every historical assistant tool_call
                                            // message, so a 17-round session has 17
                                            // copies of the placeholder in context, and
                                            // the response often comes back as 3+
                                            // copies concatenated. `is_only_placeholder_filler`
                                            // handles any N (≥1) copies plus
                                            // interleaved whitespace. Promoting that
                                            // would commit a meaningless string to
                                            // history AND present `Responded { text:
                                            // "(no reasoning recorded)..." }` to the
                                            // agent loop, which then calls
                                            // finish_turn(Natural) and the user sees a
                                            // silent "Nailed it" mid-task stop
                                            // (user-reported on DeepSeek V4 Flash, 17
                                            // rounds 20 tools, screenshot showed the
                                            // placeholder as the only assistant text
                                            // before TurnComplete fired). With the
                                            // guard: text_buf stays empty, falls
                                            // through to the empty-response Failed
                                            // branch below, the agent loop's existing
                                            // 3-retry-with-backoff path takes over and
                                            // surfaces the issue to the user instead of
                                            // burying it as success.
                                            let promoted = std::mem::take(&mut reasoning_buf);
                                            conversation.push_delta(&promoted);
                                            text_buf.push_str(&promoted);
                                            // Reasoning channel doesn't carry tool_call XML
                                            // (it's a separate stream from delta text), so
                                            // promoting it directly to visible_text_buf is
                                            // safe — no need to re-feed through stream_filter.
                                            visible_text_buf.push_str(&promoted);
                                            let _ = event_tx.send(TurnEvent::TextDelta(promoted));
                                        }

                                        // Fallback: if the provider didn't report usage (many
                                        // OpenAI-compatible APIs ignore stream_options), estimate
                                        // output tokens from the streamed text + tool call args.
                                        if !got_usage {
                                            let mut output_chars = text_buf.len();
                                            for tc in &tool_calls_buf {
                                                output_chars += tc.arguments.len();
                                            }
                                            // Rough heuristic: ~2 chars per token for mixed
                                            // Chinese/English, ~4 for pure English. Use 3 as a
                                            // middle ground since most users mix both.
                                            let estimated = (output_chars / 3).max(1);
                                            total_tokens += estimated;
                                            let _ = event_tx.send(TurnEvent::TokenUsage {
                                                prompt_tokens: 0,
                                                completion_tokens: estimated,
                                                total_tokens: estimated,
                                                cached_tokens: 0,
                                            });
                                        }

                                        // Normalize tool calls before they enter history. In
                                        // particular, merging same-file edit_file calls after
                                        // finalization leaves the assistant message declaring
                                        // more tool calls than the ToolResults we later append,
                                        // which poisons the next provider request.
                                        merge_edit_calls(&mut tool_calls_buf);

                                        // Finalize conversation state. Pass the accumulated
                                        // reasoning_buf so thinking-model providers (Moonshot
                                        // Kimi K2-thinking/K2.6, etc.) can echo it back on
                                        // the next request — without this the provider 400s
                                        // with "reasoning_content is missing in assistant
                                        // tool call message". The send-side ReasoningPolicy
                                        // (per-provider) decides whether the field actually
                                        // reaches the wire.
                                        if !tool_calls_buf.is_empty() {
                                            let reasoning = if reasoning_buf.trim().is_empty() {
                                                None
                                            } else {
                                                Some(reasoning_buf.as_str())
                                            };
                                            conversation
                                                .finalize_stream_with_tool_calls_and_thinking(
                                                    &tool_calls_buf,
                                                    reasoning,
                                                    std::mem::take(&mut thinking_blocks),
                                                );
                                        } else {
                                            conversation.finalize_stream();
                                        }
                                        was_truncated = is_truncated;
                                        break;
                                    }

                                    Some(Ok(StreamEvent::Error(e))) => {
                                        conversation.finalize_stream();
                                        tel_return!(TurnResult::Failed(e), 0u32);
                                    }

                                    Some(Ok(StreamEvent::Warning(w))) => {
                                        // Advisory only — keep streaming. The
                                        // TUI surfaces this to the user so a
                                        // truncating proxy is visible at the
                                        // moment of the bad request, not three
                                        // hours later in the datalog.
                                        let _ = event_tx.send(TurnEvent::Warning(w));
                                    }

                                    Some(Err(e)) => {
                                        conversation.finalize_stream();
                                        tel_return!(TurnResult::Failed(e.to_string()), 0u32);
                                    }

                                    None => {
                                        // Stream ended without Done event
                                        conversation.finalize_stream();
                                        break;
                                    }
                                }
                            }
                        }
        }

        // Log LLM response (text + tool calls) into the same per-project
        // datalog dir as the request — see comment on the matching
        // `log_llm_request` call above.
        let response_duration = stream_start.elapsed().as_millis() as u64;
        let wd = self
            .context
            .working_dir
            .try_read()
            .map(|g| g.clone())
            .unwrap_or_default();
        let datalog_dir = crate::turn::datalog::DatalogWriter::resolve_log_dir(
            &wd,
            self.config.datalog.dir.as_deref(),
        );
        super::log::log_llm_response(
            &datalog_dir,
            pending_request_log,
            &text_buf,
            &tool_calls_buf,
            &reasoning_buf,
            self.provider.model_name(),
            0, // step is set by caller
            response_duration,
            self.config.datalog.enabled,
        );

        if tool_calls_buf.is_empty() && text_buf.trim().is_empty() {
            tel_return!(
                TurnResult::Failed(
                    "Provider returned an empty response (no text, no tool calls).".to_string(),
                ),
                0u32
            );
        }

        // 5. If no tool calls, we're done — LLM produced text only.
        //    Use the FILTERED accumulator so downstream consumers
        //    (datalog `log_text`, ATLAS plan extraction, telemetry)
        //    see clean prose, not raw text_buf with leaked XML
        //    tool_call blocks.
        if tool_calls_buf.is_empty() {
            // Trigger post-turn hooks for Responded
            self.trigger_post_turn_hooks("Responded").await;
            
            return TurnResult::Responded {
                text: visible_text_buf,
                tokens: total_tokens,
                truncated: was_truncated,
            };
        }

        // 5. If no tool calls, we're done — LLM produced text only.
        //    Use the FILTERED accumulator so downstream consumers
        //    (datalog `log_text`, ATLAS plan extraction, telemetry)
        //    see clean prose, not raw text_buf with leaked XML
        //    tool_call blocks. Earlier bug: 5-7 atomgr datalog
        //    20-14-23 Turn 5 logged `### 3. 传输层安全<tool_call>grep
        //    <arg_key>...` because Responded.text was raw.
        if tool_calls_buf.is_empty() {
            tel_return!(
                TurnResult::Responded {
                    text: visible_text_buf,
                    tokens: total_tokens,
                    truncated: was_truncated,
                },
                0u32
            );
        }

        // 6. Tool calls were normalized before being written into conversation
        // history. From this point on, execute exactly the calls the provider
        // will see in the assistant message on the next turn.
        //
        // Auto-merge multiple edit_file calls on the same file into one multi-edit.
        // Models often generate 2+ separate edit_file calls for the same file instead of
        // using the edits array. Merging at framework level is 100% reliable vs prompt ~50%.

        // ── Layer B: per-turn read budget allocation ──
        // Count read_file calls in this batch and set per-file token budget.
        // Formula: 20% of ctx budget / num_reads. This ensures N reads in one
        // turn share the budget fairly — 1 read gets 20%, 3 reads get 6.7% each.
        // read.rs Layer A checks file_tokens against this to decide full vs skeleton.
        {
            let num_reads = tool_calls_buf
                .iter()
                .filter(|c| c.name == "read_file")
                .count()
                .max(1); // avoid division by zero
            let budget = self
                .context
                .ctx_budget_hint
                .load(std::sync::atomic::Ordering::Relaxed);
            let per_file = budget / (5 * num_reads);
            self.context.read_budget_tokens.store(
                per_file.max(2000), // floor: ~170 lines always get full content
                std::sync::atomic::Ordering::Relaxed,
            );
        }

        let tool_count = tool_calls_buf.len();
        let mut seen_calls: std::collections::HashMap<(String, String), usize> =
            std::collections::HashMap::new();
        let mut is_dup: Vec<bool> = vec![false; tool_calls_buf.len()];
        for (i, call) in tool_calls_buf.iter().enumerate() {
            // Key on the *canonicalised* argument JSON so that semantically
            // identical calls with cosmetically different formatting collapse.
            // Weak/streaming models routinely re-emit the same call with
            // different whitespace, key order, or escape style:
            //   {"pattern":"foo"}   vs   {"pattern": "foo"}
            //   {"a":1,"b":2}       vs   {"b":2,"a":1}
            // The byte-identical comparison below would treat those as
            // distinct and let N ghost in-flight rows leak into the UI.
            // serde_json::to_string with a BTreeMap-backed Value sorts keys
            // and strips whitespace, so two formattings of the same object
            // yield the same canonical string. Non-JSON args fall back to
            // the raw string (no regression for free-form tools).
            let key = (call.name.clone(), normalize_tool_args(&call.arguments));
            if seen_calls.contains_key(&key) {
                is_dup[i] = true;
            } else {
                seen_calls.insert(key, i);
            }
        }

        // ── ToolBatchStarted: fires when ≥ 2 non-duplicate calls fan
        // out from one assistant message. Lets the UI render a single
        // grouped block instead of N independent ▸ rows.
        // Per-call ToolCallStarted events still fire below for backward
        // compat (UI dedupes via batch_id membership).
        let non_dup_count = is_dup.iter().filter(|d| !**d).count();
        let active_batch_id = if non_dup_count >= 2 {
            let batch_id = format!("batch_{}", uuid::Uuid::new_v4());
            let calls: Vec<crate::turn::event::ToolBatchCall> = tool_calls_buf
                .iter()
                .zip(is_dup.iter())
                .filter(|(_, dup)| !**dup)
                .map(|(c, _)| crate::turn::event::ToolBatchCall {
                    id: c.id.clone(),
                    name: c.name.clone(),
                    arguments: c.arguments.clone(),
                })
                .collect();
            let _ = event_tx.send(TurnEvent::ToolBatchStarted {
                batch_id: batch_id.clone(),
                calls,
            });
            Some((batch_id, std::time::Instant::now(), non_dup_count))
        } else {
            None
        };
        let mut batch_ok_count: usize = 0;

        let mut files_edited_this_batch: Vec<String> = Vec::new();
        for (i, call) in tool_calls_buf.iter().enumerate() {
            if cancel.is_cancelled() {
                tel_return!(TurnResult::Cancelled, tool_count);
            }

            // ── Dup-in-batch: silent skip BEFORE any UI event ──
            // Some thinking-mode models emit the same tool_call N times in
            // one assistant message. Dispatching them all wastes execute
            // cycles, so we replay the first call's result for #2..N. The
            // model still sees one ToolResult per tool_call (parity
            // preserved via add_tool_result), but the UI must not render
            // ghost inflight rows for the duplicates — which it would if
            // ToolCallStarted fired before the is_dup gate.
            //
            // Symptom users saw before this gate moved up: a wall of
            // identical `Bash(...)` rows for each batch where the model
            // emitted N copies of the same call (e.g. dead_code grep
            // session with N variants pasted in by mistake).
            if is_dup[i] {
                let result = ToolResult {
                    call_id: call.id.clone(),
                    output: "[Duplicate call — same tool and arguments as an earlier call in this batch. \
                             Result already returned above.]".to_string(),
                    success: true,
                };
                conversation.add_tool_result(result);
                continue;
            }

            // ── Cross-batch loop guard ──
            // The in-batch `is_dup` above only catches a model emitting
            // the same call N times *within one assistant message*. The
            // 22-identical-`Bash(cargo check)` symptom from weak models
            // is the orthogonal case: identical (name, args) repeating
            // across many sequential turns with no progress between.
            // See `loop_guard.rs` for the false-positive avoidance rules
            // (output-hash + state-change reset) that make this safe to
            // gate before execution. Same ghost-row reasoning as is_dup:
            // blocked attempts must not emit ToolCallStarted, otherwise
            // the UI renders a spinner row that never receives a result.
            if let LoopGuardDecision::Block(msg) =
                self.loop_guard.check(&call.name, &call.arguments)
            {
                let result = ToolResult {
                    call_id: call.id.clone(),
                    output: msg,
                    // success=false so the model treats this as a soft
                    // error and is more likely to change strategy.
                    success: false,
                };
                conversation.add_tool_result(result);
                continue;
            }

            // Send ToolCallStarted event when the tool actually starts executing.
            // This ensures tool call and result are paired correctly in the UI.
            let _ = event_tx.send(TurnEvent::ToolCallStarted {
                id: call.id.clone(),
                name: call.name.clone(),
                arguments: call.arguments.clone(),
            });

            // Enforce tool filter at execution time — LLM may call tools
            // not in the provided tool_defs (e.g., during diagnosis read-only phase).
            if let Some(filter) = allowed_tools {
                if !filter.contains(&call.name.as_str()) {
                    let result = ToolResult {
                        call_id: call.id.clone(),
                        output: format!(
                            "Tool '{}' is not available in this phase. Read the code first, then edit.",
                            call.name
                        ),
                        success: false,
                    };
                    let _ = event_tx.send(TurnEvent::ToolCallResult {
                        call_id: call.id.clone(),
                        name: call.name.clone(),
                        output: result.output.clone(),
                        success: false,
                        duration: std::time::Duration::ZERO,
                    });
                    conversation.add_tool_result(result);
                    continue;
                }
            }
            // Dup-in-batch was already short-circuited above (before the
            // ToolCallStarted emit), so by the time we reach here this is
            // a real, non-duplicate call to execute.
            let result = self.execute_single_tool(call, event_tx, &cancel, &conversation.messages).await;
            if active_batch_id.is_some() && result.success {
                batch_ok_count += 1;
            }

            // Track files edited for read interception (batch + cross-turn)
            // Use full file path as key to avoid basename collisions
            // (e.g., api/__init__.py vs schemas/__init__.py).
            if matches!(call.name.as_str(), "edit_file" | "create_file") && result.success {
                if let Ok(args) = serde_json::from_str::<serde_json::Value>(&call.arguments) {
                    if let Some(fp) = args.get("file_path").and_then(|v| v.as_str()) {
                        let file_key = fp.to_string();
                        if !files_edited_this_batch.contains(&file_key) {
                            files_edited_this_batch.push(file_key.clone());
                        }
                        if !self.recently_edited_files.contains(&file_key) {
                            self.recently_edited_files.push(file_key);
                        }
                    }
                }
            }

            // Record into the cross-batch loop guard. Must run on every
            // real execution (success OR failure) so the next turn's
            // check() sees the full history. The guard's own state-
            // change reset rule lives inside record() — runner doesn't
            // need to know the tool taxonomy.
            self.loop_guard
                .record(&call.name, &call.arguments, &result.output, result.success);

            conversation.add_tool_result(result);
        }

        // ── ToolBatchCompleted: closes the group started above. UI
        // uses this to swap the spinner header to a static `· N/M ok ·
        // Xs wall` summary. Only fires when a batch was actually opened.
        if let Some((batch_id, started_at, total)) = active_batch_id {
            let _ = event_tx.send(TurnEvent::ToolBatchCompleted {
                batch_id,
                ok: batch_ok_count,
                total,
                elapsed_ms: started_at.elapsed().as_millis() as u64,
            });
        }

        // Trigger post-turn hooks
        self.trigger_post_turn_hooks("UsedTools").await;

        // Truncate oversized tool outputs before returning. Without this,
        // a single `ls -la node_modules` / wide `find` dump (multi-MB)
        // stays raw in `conversation.messages` and the NEXT LLM call
        // blows the upstream context limit. Every caller of TurnRunner
        // used to have to remember to invoke this — daemon didn't, which
        // was the root of the 738K-token 400 bug. Making runner own it
        // removes the implicit contract.
        crate::ctx::truncate::post_process_tool_results(
            &mut conversation.messages,
            tool_count,
            "", // fallback only — each result is keyed by its own
            // call_id → ATC.tool_name lookup (see ctx::truncate).
            context_window,
        );

        tel_return!(
            TurnResult::UsedTools {
                // Same filtered-vs-raw split as the Responded arm above.
                // text_buf keeps raw for the rescue path; visible_text_buf
                // is what should reach downstream consumers.
                text: if visible_text_buf.is_empty() {
                    None
                } else {
                    Some(visible_text_buf)
                },
                tool_count,
                tokens: total_tokens,
            },
            tool_count
        );
    }

    /// Helper to trigger post-turn hooks with proper context
    async fn trigger_post_turn_hooks(&self, turn_result: &str) {
        let working_dir = self.context.working_dir.read().await.clone();
        let hook_ctx = HookCtx::new(
            "".to_string(),
            "".to_string(),
            working_dir.to_string_lossy().to_string(),
        );
        self.hook_engine.trigger_post_turn(&hook_ctx, turn_result).await;
    }

    /// EXECUTE mode: run one LLM turn with minimal context.
    /// Reads the target file fresh from disk, sends only the file + instruction,
    /// and only exposes edit_file. Used for precise, focused edits.
    ///
    /// Returns the TurnResult and whether any file was edited.
    pub async fn run_execute(
        &mut self,
        file_path: &str,
        instruction: &str,
        event_tx: &mpsc::UnboundedSender<TurnEvent>,
        cancel: CancellationToken,
    ) -> TurnResult {
        // 1. Read fresh file content from disk
        let file_content = match std::fs::read_to_string(file_path) {
            Ok(c) => c,
            Err(e) => return TurnResult::Failed(format!("Cannot read {}: {}", file_path, e)),
        };

        // 2. Build minimal conversation: system + user(file + instruction)
        let system_prompt = "You are an execution agent. Your ONLY job: apply the edit instruction to the file below.\n\
            RULES:\n\
            1. Call edit_file IMMEDIATELY with old_string/new_string. Do NOT explain.\n\
            2. Do NOT read_file — the file content is already provided.\n\
            3. Do NOT fix other issues — ONLY apply the given instruction.\n\
            4. If the instruction is unclear, apply your best interpretation.";

        let user_message = format!(
            "## Instruction\n{}\n\n## File: {}\n```\n{}\n```",
            instruction, file_path, file_content,
        );

        let mut mini_conv = Conversation::new();
        mini_conv.add_user_message(&user_message);

        // 3. Only expose edit_file
        let execute_tools = &["edit_file"];

        // 4. Run the LLM turn with filtered tools
        let result = self
            .run_with_filter(
                &mut mini_conv,
                system_prompt,
                "",
                event_tx,
                cancel,
                Some(execute_tools),
            )
            .await;

        result
    }

    /// Execute a single tool call with permission checking.
    ///
    /// `cancel` is polled while the tool future runs so Ctrl+C interrupts
    /// mid-execution — without this, long-running tools (deep `glob`, slow
    /// `grep`, network calls) complete before the turn-level cancel check
    /// runs on the next iteration, and the user sees an unresponsive UI.
    async fn execute_single_tool(
        &mut self,
        call: &ToolCall,
        event_tx: &mpsc::UnboundedSender<TurnEvent>,
        cancel: &CancellationToken,
        conversation_messages: &[crate::conversation::message::Message],
    ) -> ToolResult {
        // Auto-fix common tool name aliases (models trained on other agents use different names)
        // Case-insensitive matching: models may output "Run", "Bash", "Edit_File", etc.
        let name_lower = call.name.to_lowercase();
        let corrected_name = match name_lower.as_str() {
            "create_file" => "write_file",
            "find" | "find_files" => "glob",
            "run" | "run_command" | "run_server" | "run_shell" | "run_app" | "execute"
            | "shell" | "terminal" => "bash",
            "list_files" | "ls" => "list_directory",
            "search" => "grep",
            _ => "",
        };
        let corrected_name = if corrected_name.is_empty() {
            // No alias match — try case-insensitive lookup in registry
            if self.tools.get(&call.name).await.is_some() {
                call.name.clone()
            } else if let Some(name) = self.tools.iter().await
                .find(|(k, _)| k.eq_ignore_ascii_case(&call.name))
                .map(|(k, _)| k)
            {
                name
            } else {
                call.name.clone()
            }
        } else {
            corrected_name.to_string()
        };
        // Clone the Arc so the borrow of `self.tools` ends here — we need to
        // call `self.detect_call_loop(..)` mutably below.
        let tool = match self.tools.get(&corrected_name).await {
            Some(t) => t,
            None => {
                let available: String = self.tools.iter().await
                    .map(|(name, _)| name)
                    .collect::<Vec<String>>()
                    .join(", ");
                let hint = match call.name.as_str() {
                    "create_file" => "\nDid you mean write_file? create_file was renamed to write_file.",
                    "search" => "\nFor file content search: grep(pattern, path)\nFor web search: web_search(query)",
                    _ => "",
                };
                let output = format!(
                    "Error: unknown tool '{}'. Available tools: {}.{}",
                    call.name, available, hint
                );
                let _ = event_tx.send(TurnEvent::ToolCallResult {
                    call_id: call.id.clone(),
                    name: call.name.clone(),
                    output: output.clone(),
                    success: false,
                    duration: std::time::Duration::ZERO,
                });
                self.context.telemetry.track(TelemetryEvent::ToolCall {
                    name: corrected_name.clone(),
                    success: false,
                    duration_ms: 0,
                    error_kind: Some(ToolErrorKind::NotFound),
                    error_data: Some(serde_json::json!({
                        "tool_name": call.name,
                        "duration_ms": 0,
                        "original_name": if call.name != corrected_name { Some(call.name.as_str()) } else { None },
                        "available_tools": available,
                        "reason": format!("Tool '{}' not found", call.name),
                    }).to_string()),
                });
                return ToolResult {
                    call_id: call.id.clone(),
                    output,
                    success: false,
                };
            }
        };

        // Repair malformed JSON args before approval and execution.
        // Providers sometimes emit truncated / unescaped / fenced JSON (especially
        // on max_tokens cutoff mid-arguments). Running the repair chain here means
        // tool implementations see valid JSON whenever we can salvage anything,
        // and surface deterministic errors when we can't.
        let repaired_args = super::json_repair::repair_tool_args(&corrected_name, &call.arguments);

        // Use corrected name and repaired args for all subsequent checks
        let owned_call;
        let call = if corrected_name != call.name.as_str() || repaired_args != call.arguments {
            owned_call = ToolCall {
                id: call.id.clone(),
                name: corrected_name.to_string(),
                arguments: repaired_args,
            };
            &owned_call
        } else {
            call
        };

        // Schema gate: bounce malformed args back to the model BEFORE
        // approval / execute. Provider stream truncation occasionally
        // ships `{]` or `{"file_path":"..."]` (closing bracket wrong,
        // required field missing); without this guard, write_file's
        // fail-closed approval branch would prompt the user, the user
        // would Allow, and execute would then fail with the same parse
        // error — a wasted approval round-trip on a known-broken call.
        // Runs AFTER `repair_tool_args` (so wrapper-shape / fence / nested
        // payloads recover first) but BEFORE approval — the unrecoverable
        // remainder is what gets bounced.
        if let Err(reason) = tool.validate_args(&call.arguments) {
            // `reason` already comes from `diagnose_args` (or a sibling
            // helper) carrying a "Re-issue: <example>" tail. Appending
            // another "Re-issue {tool} with..." here produced the
            // double-Re-issue the user saw on the failed WriteFile call.
            let msg = format!("Error: {}", reason);
            let _ = event_tx.send(TurnEvent::ToolCallResult {
                call_id: call.id.clone(),
                name: call.name.clone(),
                output: msg.clone(),
                success: false,
                duration: std::time::Duration::ZERO,
            });
            self.context.telemetry.track(TelemetryEvent::ToolCall {
                name: corrected_name.clone(),
                success: false,
                duration_ms: 0,
                error_kind: Some(ToolErrorKind::InvalidArgs),
                error_data: Some(serde_json::json!({
                    "tool_name": corrected_name,
                    "reason": reason,
                    "args_summary": build_args_summary(&corrected_name, &call.arguments),
                }).to_string()),
            });
            return ToolResult {
                call_id: call.id.clone(),
                output: msg,
                success: false,
            };
        }

        // Loop detection moved upstream to `dispatch_tools` (gates BEFORE
        // ToolCallStarted is emitted, so blocked attempts don't render
        // ghost inflight rows in scrollback). When we reach here the
        // call has already cleared that guard exactly once.

        // Check permission via the injected PermissionDecider.
        // AutoApprove tools execute immediately; RequireApproval tools go through
        // the decider which handles interactive prompts or automatic policy.
        let approval = tool.approval_with_context(&call.arguments, &self.context);
        if let crate::tool::ApprovalRequirement::RequireApproval(ref reason)
        | crate::tool::ApprovalRequirement::RequireApprovalAlways(ref reason)
        | crate::tool::ApprovalRequirement::RequireApprovalScoped { ref reason, .. } = approval
        {
            // Only emit the ApprovalRequested event (which triggers the
            // TUI approval prompt) when the decider actually needs user
            // input.  If the PermissionStore already has a session grant
            // or override (e.g. the user pressed [A] on a prior call of
            // the same tool in this batch), `will_auto_approve` returns
            // true and we skip the event — the subsequent `decide()` call
            // will return Allow without blocking.  Without this guard,
            // parallel MCP calls show N redundant "Waiting for approval"
            // prompts even though all but the first are auto-resolved.
            let needs_prompt = !self.permission.will_auto_approve(call, &approval);
            if needs_prompt {
                // Emit an informational event carrying a snapshot of
                // conversation.messages so the TUI can persist mid-turn
                // session state (e.g. for `/bg`).
                let _ = event_tx.send(TurnEvent::ApprovalRequested {
                    tool_name: call.name.clone(),
                    reason: reason.clone(),
                    call: call.clone(),
                    messages: conversation_messages.to_vec(),
                });
            }

            let decision = self.permission.decide(call, &approval).await;
            if !matches!(decision, PermissionDecision::Allow) {
                let output = format!("Tool '{}' was denied by the user.", call.name);
                let _ = event_tx.send(TurnEvent::ToolCallResult {
                    call_id: call.id.clone(),
                    name: call.name.clone(),
                    output: output.clone(),
                    success: false,
                    duration: std::time::Duration::ZERO,
                });
            self.context.telemetry.track(TelemetryEvent::ToolCall {
                name: corrected_name.clone(),
                success: false,
                duration_ms: 0,
                error_kind: Some(ToolErrorKind::DeniedByUser),
                error_data: Some(serde_json::json!({
                    "tool_name": corrected_name,
                    "duration_ms": 0,
                    "args_summary": build_args_summary(&corrected_name, &call.arguments),
                    "approval_reason": reason,
                    "reason": "User denied tool execution",
                }).to_string()),
            });
                return ToolResult {
                    call_id: call.id.clone(),
                    output,
                    success: false,
                };
            }
        }

        // --- 统一 PreToolUse Hook ---
        let working_dir = self.context.working_dir.read().await.clone();
        let pr_hook_ctx = HookCtx::new(
            call.name.clone(),
            call.arguments.clone(),
            working_dir.to_string_lossy().to_string(),
        );

        // --- 统一 ToolCallStart Hook (fire-and-forget) ---
        let tc_start_ctx = crate::hook::ToolCallStartContext {
            tool_name: call.name.clone(),
            tool_args: call.arguments.clone(),
            call_id: call.id.clone(),
            turn_number: self.current_turn_number,
        };
        self.hook_engine.trigger_on_tool_call_start(&tc_start_ctx).await;

        let mut final_args = call.arguments.clone();
        match self.hook_engine.trigger_pre_tool_use(&pr_hook_ctx).await {
            Ok(Some(new_args)) => {
                final_args = new_args;
            }
            Ok(None) => {}
            Err(reason) => {
                let output = format!("Tool '{}' was blocked by hook: {}", call.name, reason);
                let _ = event_tx.send(TurnEvent::ToolCallResult {
                    call_id: call.id.clone(),
                    name: call.name.clone(),
                    output: output.clone(),
                    success: false,
                    duration: std::time::Duration::ZERO,
                });
                self.context.telemetry.track(TelemetryEvent::ToolCall {
                    name: corrected_name.clone(),
                    success: false,
                    duration_ms: 0,
                    error_kind: Some(ToolErrorKind::BlockedByHook),
                    error_data: Some(serde_json::json!({
                        "tool_name": corrected_name,
                        "duration_ms": 0,
                        "args_summary": build_args_summary(&corrected_name, &call.arguments),
                        "hook_reason": reason,
                        "reason": "Tool call blocked by PreToolUse hook",
                    }).to_string()),
                });
                return ToolResult {
                    call_id: call.id.clone(),
                    output,
                    success: false,
                };
            }
        }

        // Snapshot the shared working directory before executing. Tools like
        // `change_dir` and `bash` (when the command starts with `cd`) mutate
        // `ctx.working_dir` in place; we compare before/after to emit a
        // `WorkingDirChanged` event so the TUI footer can track the cwd
        // without polling the `Arc<RwLock<PathBuf>>` every frame.
        let wd_before = self.context.working_dir.read().await.clone();

        // Set up event sender for real-time tool output streaming
        self.context.event_tx = Some(std::sync::Arc::new(event_tx.clone()));
        self.context.current_call_id = Some(call.id.clone());

        // Execute the tool. Race against `cancel` so Ctrl+C aborts a
        // long-running tool future instead of waiting for it to finish.
        // Dropping the tool future is safe for read-only tools (glob /
        // grep / read_file); mutating tools (write_file / edit_file /
        // bash) finish fast enough that interrupting them mid-execution
        // is acceptable — user pressed Ctrl+C knowing they want to stop.
        let start = Instant::now();
        crate::ctrace!("RNR", "execute_single_tool start name={} cancel_already={}", call.name, cancel.is_cancelled());
        let result = tokio::select! {
            r = tool.execute(&call.arguments, &self.context) => {
                crate::ctrace!("RNR", "execute_single_tool tool returned name={} elapsed_ms={} cancel_now={}", call.name, start.elapsed().as_millis(), cancel.is_cancelled());
                r
            },
            _ = cancel.cancelled() => {
                crate::ctrace!("RNR", "execute_single_tool cancel branch fired name={} elapsed_ms={}", call.name, start.elapsed().as_millis());
                // Clean up event sender
                self.context.event_tx = None;
                self.context.current_call_id = None;

                let duration = start.elapsed();
                let output = "[Cancelled by user]".to_string();
                let _ = event_tx.send(TurnEvent::ToolCallResult {
                    call_id: call.id.clone(),
                    name: call.name.clone(),
                    output: output.clone(),
                    success: false,
                    duration,
                });
                self.context.telemetry.track(TelemetryEvent::ToolCall {
                    name: corrected_name.clone(),
                    success: false,
                    duration_ms: duration.as_millis() as u32,
                    error_kind: Some(ToolErrorKind::ExecutionFailed),
                    error_data: Some(serde_json::json!({
                        "tool_name": corrected_name,
                        "duration_ms": duration.as_millis() as u32,
                        "args_summary": build_args_summary(&corrected_name, &call.arguments),
                        "output_tail": "[Cancelled by user]",
                        "reason": "Tool execution cancelled by user",
                    }).to_string()),
                });
                return ToolResult {
                    call_id: call.id.clone(),
                    output,
                    success: false,
                };
            }
        };

        // Clean up event sender after tool execution
        self.context.event_tx = None;
        self.context.current_call_id = None;

        let duration = start.elapsed();

        // If the tool mutated the shared working directory, surface it as
        // a TurnEvent so the TUI layer can keep its footer in sync. Emit
        // before ToolCallResult so consumers that redraw on result see
        // the new cwd in the same frame.
        let wd_after = self.context.working_dir.read().await.clone();
        if wd_after != wd_before {
            let _ = event_tx.send(TurnEvent::WorkingDirChanged(wd_after));
        }

        let tool_result = match result {
            Ok(mut r) => {
                r.call_id = call.id.clone();
                r
            }
            Err(e) => ToolResult {
                call_id: call.id.clone(),
                output: format!("Error: {}", e),
                success: false,
            },
        };

        // --- 统一 PostToolUse Hook ---
        let result_ctx = ToolResultContext {
            tool_name: call.name.clone(),
            tool_args: final_args.clone(),
            result: tool_result.output.clone(),
            success: tool_result.success,
            duration_ms: duration.as_millis() as u64,
        };
        self.hook_engine.trigger_post_tool_use(&pr_hook_ctx, &result_ctx).await;

        let _ = event_tx.send(TurnEvent::ToolCallResult {
            call_id: call.id.clone(),
            name: call.name.clone(),
            output: tool_result.output.clone(),
            success: tool_result.success,
            duration,
        });

        // Emit ToolCall telemetry event for both success and failure.
        let output_tail = atomcode_telemetry::scrub::truncate_head(
            &atomcode_telemetry::scrub::scrub_path(
                &tool_result.output,
                None,
                Some(&self.context.working_dir.read().await.clone()),
            ),
            200,
        );
        // Detect warning: exit 0 (success) but stderr present.
        let has_stderr = tool_result.output.contains("STDERR:")
            || tool_result.output.contains("[stderr]");
        let (error_kind, error_data) = if !tool_result.success {
            (Some(ToolErrorKind::ExecutionFailed), Some(serde_json::json!({
                "tool_name": corrected_name,
                "duration_ms": duration.as_millis() as u32,
                "args_summary": build_args_summary(&corrected_name, &call.arguments),
                "output_tail": output_tail,
                "reason": "Tool execution returned an error",
            }).to_string()))
        } else if has_stderr {
            (Some(ToolErrorKind::Warning), Some(serde_json::json!({
                "tool_name": corrected_name,
                "duration_ms": duration.as_millis() as u32,
                "args_summary": build_args_summary(&corrected_name, &call.arguments),
                "output_tail": output_tail,
                "reason": "Command succeeded (exit 0) but produced stderr output",
                "resolution": "Review stderr for potential issues; the command may not have had the intended effect",
            }).to_string()))
        } else {
            (None, None)
        };
        self.context.telemetry.track(TelemetryEvent::ToolCall {
            name: corrected_name.clone(),
            success: tool_result.success,
            duration_ms: duration.as_millis() as u32,
            error_kind,
            error_data,
        });

        tool_result
    }

}

/// Canonicalise a tool-call `arguments` string for in-batch dedup keying.
///
/// Weak/streaming models routinely re-emit the same call with cosmetically
/// different formatting — `{"pattern":"foo"}` vs `{"pattern": "foo"}` vs
/// `{"a":1,"b":2}` vs `{"b":2,"a":1}`. Byte-comparison treats them as
/// distinct, the in-batch `is_dup` misses, and N ghost ToolCallInFlight
/// rows leak into the UI (the symptom from the deepseek-v4-flash
/// screenshot: 2 empty `Glob(**/*.rs)` rows + 1 with body).
///
/// We re-parse and serialise compact. `serde_json::Map` is BTreeMap-backed
/// when the `preserve_order` feature is off (it is — see workspace
/// Cargo.toml), so object keys come out alphabetically — two re-orderings
/// of the same object hash to the same canonical string. Non-JSON args
/// (free-form text, garbage from broken streams) round-trip through the
/// fallback unchanged so we don't regress free-form tools or accidentally
/// merge two genuinely different malformed payloads.
/// True iff `reasoning` contains nothing besides one or more copies
/// of the outbound placeholder (`REASONING_PLACEHOLDER`) interleaved
/// with whitespace — including the all-empty / all-whitespace case.
///
/// The Done-event skip-promotion guard uses this to detect not just
/// the trivial single-copy echo but also the multi-copy mimicry seen
/// on DeepSeek V4 thinking-mode (a long session has many historical
/// copies of the placeholder in context, and the model regenerates the
/// pattern in its own response — observed 3+ copies concatenated in a
/// single reasoning_content stream).
fn is_only_placeholder_filler(reasoning: &str) -> bool {
    reasoning
        .replace(crate::provider::REASONING_PLACEHOLDER, "")
        .trim()
        .is_empty()
}

fn normalize_tool_args(args: &str) -> String {
    match serde_json::from_str::<serde_json::Value>(args) {
        Ok(v) => serde_json::to_string(&v).unwrap_or_else(|_| args.to_string()),
        Err(_) => args.to_string(),
    }
}

/// Strip model-internal reasoning tags from streaming output.
/// Extract a file path hint from partial JSON args (e.g. `{"file_path":"/src/main.rs"`).
/// Returns the short filename on success, empty on failure. Only fires once — caller
/// should stop calling after the first hit.
fn extract_path_hint(partial_json: &str) -> Option<String> {
    // Look for "file_path":"..." or "path":"..."
    for key in &["file_path", "path"] {
        let needle = format!("\"{}\":\"", key);
        if let Some(start) = partial_json.find(&needle) {
            let val_start = start + needle.len();
            let rest = &partial_json[val_start..];
            // Find the closing quote (or take what we have so far)
            let end = rest.find('"').unwrap_or(rest.len());
            let full_path = &rest[..end];
            if !full_path.is_empty() {
                // Return just the filename or last 2 path components
                let short = full_path.rsplit('/').take(2).collect::<Vec<_>>();
                let display = short.into_iter().rev().collect::<Vec<_>>().join("/");
                return Some(display);
            }
        }
    }
    None
}

/// Detect provider-side corruption of a tool_call's `function.name` field.
/// atomgit's gateway sometimes spills attribute syntax into `name`, leaving
/// `arguments` as `"{}"` — e.g. `name='grep" path="..." pattern="..."'`.
/// Legitimate tool names are short ASCII identifiers (`bash`, `read_file`,
/// `mcp__server__tool`), so any whitespace/quote/`=`/`<`/`>` or a length
/// far above what we register is a strong corruption signal.
fn name_looks_corrupt(name: &str) -> bool {
    if name.is_empty() {
        return true;
    }
    if name.len() > 96 {
        return true;
    }
    name.chars().any(|c| c.is_whitespace() || matches!(c, '"' | '=' | '<' | '>'))
}

fn truncate(s: &str, max: usize) -> String {
    if s.chars().count() <= max {
        s.to_string()
    } else {
        let mut out: String = s.chars().take(max).collect();
        out.push('…');
        out
    }
}

/// DeepSeek uses `<think>...</think>`, QwQ uses similar patterns.
/// These should not be shown to the user or stored in conversation.
fn strip_model_tags(text: &str) -> String {
    let mut result = text.to_string();
    while let Some(start) = result.find("<think>") {
        if let Some(end) = result.find("</think>") {
            let end = end + "</think>".len();
            result = format!("{}{}", &result[..start], &result[end..]);
        } else {
            result = result[..start].to_string();
            break;
        }
    }
    result = result.replace("</think>", "");
    result = result.replace("<|im_start|>", "").replace("<|im_end|>", "");
    result
}

/// Rescue tool calls embedded as text in the model's response. Four variants:
///   1. `<tool_call>name(json)</tool_call>` — paren+JSON
///   2. `<tool_call>name(k=v, k=v)</tool_call>` — paren+kv (legacy single-line)
///   3. `<tool_call><tool_name>name</tool_name><arg_key>k</arg_key><arg_value>v</arg_value>...</tool_call>`
///      — Qwen2.5 / GLM XML format
///   4. `<tool_call><function=name><parameter=k>v</parameter>...</function></tool_call>`
///      — OpenHands / Qwen3+ "function-tag" format (attribute-style)
/// Returns rescued ToolCalls, empty vec if nothing found.
fn rescue_text_tool_calls(text: &str) -> Vec<ToolCall> {
    let mut calls = Vec::new();
    let mut remaining = text;

    while let Some(start) = remaining.find("<tool_call>") {
        let after_tag = &remaining[start + "<tool_call>".len()..];

        // Prefer </tool_call> close (XML format spans newlines).
        // Fall back to first newline only when no close tag is present
        // (legacy single-line format).
        let (body, advance) = match after_tag.find("</tool_call>") {
            Some(pos) => (&after_tag[..pos], pos + "</tool_call>".len()),
            None => {
                let pos = after_tag.find('\n').unwrap_or(after_tag.len());
                (&after_tag[..pos], pos)
            }
        };
        let body = body.trim();

        if let Some((name, args_json)) = parse_xml_tool_call(body)
            .or_else(|| parse_xml_attr_style_tool_call(body))
        {
            let call_id = format!("rescued_{}", calls.len());
            calls.push(ToolCall {
                id: call_id,
                name,
                arguments: args_json,
            });
        } else if let Some(paren) = body.find('(') {
            let name = body[..paren].trim();
            let args_raw = body[paren + 1..].trim_end_matches(')').trim();

            if !name.is_empty() {
                let args_json = if args_raw.starts_with('{') {
                    args_raw.to_string()
                } else {
                    let mut json_parts = Vec::new();
                    for part in args_raw.split(',') {
                        let part = part.trim();
                        if let Some(eq) = part.find('=') {
                            let k = part[..eq].trim();
                            let v = part[eq + 1..].trim();
                            let v_quoted = if v.starts_with('"')
                                || v.starts_with('{')
                                || v.starts_with('[')
                                || v == "true"
                                || v == "false"
                                || v.parse::<f64>().is_ok()
                            {
                                v.to_string()
                            } else {
                                format!("\"{}\"", v.replace('\\', "\\\\").replace('"', "\\\""))
                            };
                            json_parts.push(format!("\"{}\":{}", k, v_quoted));
                        }
                    }
                    format!("{{{}}}", json_parts.join(","))
                };

                let call_id = format!("rescued_{}", calls.len());
                calls.push(ToolCall {
                    id: call_id,
                    name: name.to_string(),
                    arguments: args_json,
                });
            }
        }

        remaining = &after_tag[advance..];
    }

    calls
}

/// Parse Qwen2.5 / GLM XML-style tool call body:
///   `<tool_name>NAME</tool_name><arg_key>K1</arg_key><arg_value>V1</arg_value>...`
/// Returns `(name, args_as_json_object)` or None when the format doesn't match.
fn parse_xml_tool_call(body: &str) -> Option<(String, String)> {
    let name = extract_between(body, "<tool_name>", "</tool_name>")?
        .trim()
        .to_string();
    if name.is_empty() {
        return None;
    }

    let mut map = serde_json::Map::new();
    let mut rest = body;
    while let Some(k_start) = rest.find("<arg_key>") {
        let k_after = &rest[k_start + "<arg_key>".len()..];
        let k_end = k_after.find("</arg_key>")?;
        let key = k_after[..k_end].trim().to_string();
        if key.is_empty() {
            return None;
        }
        let after_key = &k_after[k_end + "</arg_key>".len()..];
        let v_start = after_key.find("<arg_value>")?;
        let v_after = &after_key[v_start + "<arg_value>".len()..];
        let v_end = v_after.find("</arg_value>")?;
        let raw_value = &v_after[..v_end];
        map.insert(key, coerce_xml_value(raw_value));
        rest = &v_after[v_end + "</arg_value>".len()..];
    }

    if map.is_empty() {
        return None;
    }
    Some((name, serde_json::Value::Object(map).to_string()))
}

/// Parse OpenHands / Qwen3+ "function-tag" XML-style tool call body:
///   `<function=NAME><parameter=K1>V1</parameter><parameter=K2>V2</parameter></function>`
/// Returns `(name, args_as_json_object)` or None when the format doesn't match.
///
/// Differs from `parse_xml_tool_call` in two ways:
///   * the function name rides on the opening tag's attribute slot
///     (`<function=read_file>`), not inside a `<tool_name>` wrapper
///   * each parameter is one `<parameter=KEY>VALUE</parameter>` element,
///     not two separate `<arg_key>` / `<arg_value>` pairs
///
/// This format is what Qwen3 / Qwen3-Coder / Qwen3.6 / GLM-Z1-Agent emit
/// when the inference gateway doesn't parse them out into the OpenAI
/// `tool_calls` field — they were SFT-trained on OpenHands-style data.
/// Without this rescue, the XML leaks into the visible assistant text
/// and the tool never executes.
fn parse_xml_attr_style_tool_call(body: &str) -> Option<(String, String)> {
    let fn_marker = body.find("<function=")?;
    let after_marker = &body[fn_marker + "<function=".len()..];
    let close_bracket = after_marker.find('>')?;
    let name = after_marker[..close_bracket].trim().to_string();
    if name.is_empty() {
        return None;
    }
    let inner_start = close_bracket + 1;
    let inner_end = after_marker[inner_start..].find("</function>")? + inner_start;
    let inner = &after_marker[inner_start..inner_end];

    let mut map = serde_json::Map::new();
    let mut rest = inner;
    while let Some(p_marker) = rest.find("<parameter=") {
        let after_p = &rest[p_marker + "<parameter=".len()..];
        let cb = after_p.find('>')?;
        let key = after_p[..cb].trim().to_string();
        if key.is_empty() {
            return None;
        }
        let val_start = cb + 1;
        let val_end = after_p[val_start..].find("</parameter>")? + val_start;
        let raw = &after_p[val_start..val_end];
        // Function-tag format puts every value on its own line(s); strip a
        // single leading and trailing newline that's there purely for layout.
        // Don't `.trim()` — `old_string` for edit_file may legitimately need
        // its internal whitespace preserved for the file-match to land.
        let value = raw.strip_prefix('\n').unwrap_or(raw);
        let value = value.strip_suffix('\n').unwrap_or(value);
        map.insert(key, coerce_xml_value(value));
        rest = &after_p[val_end + "</parameter>".len()..];
    }

    if map.is_empty() {
        return None;
    }
    Some((name, serde_json::Value::Object(map).to_string()))
}

fn extract_between<'a>(haystack: &'a str, open: &str, close: &str) -> Option<&'a str> {
    let s = haystack.find(open)? + open.len();
    let e = haystack[s..].find(close)? + s;
    Some(&haystack[s..e])
}

/// Best-effort type inference for `<arg_value>` payloads. Bool/int/float/JSON
/// literals get unquoted; everything else stays a string (preserves whitespace).
fn coerce_xml_value(raw: &str) -> serde_json::Value {
    let trimmed = raw.trim();
    if trimmed == "true" {
        return serde_json::Value::Bool(true);
    }
    if trimmed == "false" {
        return serde_json::Value::Bool(false);
    }
    if trimmed == "null" {
        return serde_json::Value::Null;
    }
    if let Ok(n) = trimmed.parse::<i64>() {
        return serde_json::Value::from(n);
    }
    if let Ok(f) = trimmed.parse::<f64>() {
        return serde_json::Value::from(f);
    }
    if (trimmed.starts_with('{') && trimmed.ends_with('}'))
        || (trimmed.starts_with('[') && trimmed.ends_with(']'))
    {
        if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
            return v;
        }
    }
    // Preserve raw (including leading/trailing whitespace) — the model may have
    // intended exact-match strings (e.g. old_string for edit_file).
    serde_json::Value::String(raw.to_string())
}

/// Patch `tool_calls_buf` entries with missing keys borrowed from `xml_pool`
/// (parsed by `rescue_text_tool_calls` on the same turn's raw text). Used when
/// the model split intent across the function-calling JSON channel and the
/// `<tool_call>` XML in the text — the JSON path may arrive with only a subset
/// of the arguments, while the XML carries the full set. JSON wins on
/// conflicts; XML only fills gaps. Multiple calls of the same name are matched
/// to XML blocks of the same name in order of appearance.
fn repair_tool_call_args(calls: &mut [ToolCall], xml_pool: &[ToolCall]) {
    use std::collections::HashMap;

    let mut by_name: HashMap<&str, Vec<&ToolCall>> = HashMap::new();
    for x in xml_pool {
        by_name.entry(x.name.as_str()).or_default().push(x);
    }
    let mut consumed: HashMap<&str, usize> = HashMap::new();

    for call in calls.iter_mut() {
        let Some(group) = by_name.get(call.name.as_str()) else {
            continue;
        };
        let idx = consumed.entry(call.name.as_str()).or_insert(0);
        let Some(xml_call) = group.get(*idx) else {
            continue;
        };
        *idx += 1;

        let xml_obj = match serde_json::from_str::<serde_json::Value>(&xml_call.arguments) {
            Ok(serde_json::Value::Object(o)) => o,
            _ => continue,
        };

        let merged = match serde_json::from_str::<serde_json::Value>(&call.arguments) {
            Ok(serde_json::Value::Object(mut j_obj)) => {
                let mut patched = false;
                for (k, v) in xml_obj {
                    if !j_obj.contains_key(&k) {
                        j_obj.insert(k, v);
                        patched = true;
                    }
                }
                if patched {
                    Some(serde_json::Value::Object(j_obj))
                } else {
                    None
                }
            }
            // JSON args unparseable / non-object → take XML wholesale.
            _ => Some(serde_json::Value::Object(xml_obj)),
        };
        if let Some(v) = merged {
            call.arguments = v.to_string();
        }
    }
}

/// Streaming filter that hides `<tool_call>...</tool_call>` blocks from the
/// visible UI/conversation stream while letting the rescue path see the full
/// raw text via a separate buffer. Tags can split across delta chunks, so the
/// filter holds back trailing bytes that might be a partial tag.
#[derive(Default)]
struct ToolCallStreamFilter {
    inside: bool,
    holdback: String,
}

impl ToolCallStreamFilter {
    const OPEN: &'static str = "<tool_call>";
    const CLOSE: &'static str = "</tool_call>";

    /// Feed a delta chunk; return what's safe to display now.
    fn feed(&mut self, chunk: &str) -> String {
        let mut work = std::mem::take(&mut self.holdback);
        work.push_str(chunk);
        let mut out = String::new();

        loop {
            if self.inside {
                match work.find(Self::CLOSE) {
                    Some(pos) => {
                        work = work[pos + Self::CLOSE.len()..].to_string();
                        self.inside = false;
                    }
                    None => {
                        self.holdback = trail_holdback(&work, Self::CLOSE.len() - 1);
                        return out;
                    }
                }
            } else {
                match work.find(Self::OPEN) {
                    Some(pos) => {
                        out.push_str(&work[..pos]);
                        work = work[pos + Self::OPEN.len()..].to_string();
                        self.inside = true;
                    }
                    None => {
                        let hold = trail_holdback(&work, Self::OPEN.len() - 1);
                        let visible_len = work.len() - hold.len();
                        out.push_str(&work[..visible_len]);
                        self.holdback = hold;
                        return out;
                    }
                }
            }
        }
    }

    /// End-of-stream flush. If we're still inside an unclosed `<tool_call>`,
    /// the holdback is dropped (prevents leak); otherwise emit any held tail.
    fn flush(&mut self) -> String {
        if self.inside {
            self.holdback.clear();
            String::new()
        } else {
            std::mem::take(&mut self.holdback)
        }
    }
}

/// Take up to `max` trailing bytes from `s`, snapped down to a UTF-8 char
/// boundary so the holdback is always a valid `String`.
fn trail_holdback(s: &str, max: usize) -> String {
    if s.len() <= max {
        return s.to_string();
    }
    let mut split = s.len() - max;
    while split < s.len() && !s.is_char_boundary(split) {
        split += 1;
    }
    s[split..].to_string()
}

/// Merge multiple edit_file calls on the same file into one multi-edit call.
/// The model often generates 2+ separate edit_file(file, old, new) for the same file;
/// we merge them into one edit_file(file, edits=[...]) before execution and before
/// the assistant tool-call message is written into conversation history.
/// Returns the ids of calls that were merged away.
fn merge_edit_calls(calls: &mut Vec<ToolCall>) -> Vec<String> {
    use std::collections::HashMap;

    // Group edit_file calls by file_path. Preserve order of first occurrence.
    let mut file_groups: HashMap<String, Vec<usize>> = HashMap::new();
    let mut file_order: Vec<String> = Vec::new();
    for (i, call) in calls.iter().enumerate() {
        if call.name != "edit_file" {
            continue;
        }
        let fp = serde_json::from_str::<serde_json::Value>(&call.arguments)
            .ok()
            .and_then(|a| {
                a.get("file_path")
                    .and_then(|v| v.as_str())
                    .map(String::from)
            });
        if let Some(fp) = fp {
            let entry = file_groups.entry(fp.clone()).or_default();
            if entry.is_empty() {
                file_order.push(fp);
            }
            entry.push(i);
        }
    }

    // Only merge groups with 2+ calls
    let merge_targets: Vec<(String, Vec<usize>)> = file_order
        .into_iter()
        .filter_map(|fp| {
            let indices = file_groups.remove(&fp)?;
            if indices.len() >= 2 {
                Some((fp, indices))
            } else {
                None
            }
        })
        .collect();

    if merge_targets.is_empty() {
        return Vec::new();
    }

    let mut remove_indices: Vec<usize> = Vec::new();
    let mut removed_ids: Vec<String> = Vec::new();
    for (file_path, indices) in &merge_targets {
        // Build edits array from individual calls
        let mut edits: Vec<serde_json::Value> = Vec::new();
        for &idx in indices {
            let args: serde_json::Value =
                serde_json::from_str(&calls[idx].arguments).unwrap_or_default();
            let mut edit = serde_json::Map::new();
            if let Some(v) = args.get("old_string") {
                edit.insert("old_string".into(), v.clone());
            }
            if let Some(v) = args.get("new_string") {
                edit.insert("new_string".into(), v.clone());
            }
            if let Some(v) = args.get("start_line") {
                edit.insert("start_line".into(), v.clone());
            }
            if let Some(v) = args.get("end_line") {
                edit.insert("end_line".into(), v.clone());
            }
            edits.push(serde_json::Value::Object(edit));
        }

        // Replace first call with merged version, mark rest for removal
        let first_idx = indices[0];
        let merged_args = serde_json::json!({
            "file_path": file_path,
            "edits": edits,
        });
        calls[first_idx].arguments = merged_args.to_string();
        for &idx in &indices[1..] {
            removed_ids.push(calls[idx].id.clone());
            remove_indices.push(idx);
        }
    }

    // Remove merged calls (reverse order to preserve indices)
    remove_indices.sort_unstable();
    remove_indices.dedup();
    for idx in remove_indices.into_iter().rev() {
        calls.remove(idx);
    }

    removed_ids
}


#[cfg(test)]
mod is_only_placeholder_filler_tests {
    use super::is_only_placeholder_filler;
    use crate::provider::REASONING_PLACEHOLDER;

    #[test]
    fn empty_and_whitespace_are_filler() {
        assert!(is_only_placeholder_filler(""));
        assert!(is_only_placeholder_filler("   "));
        assert!(is_only_placeholder_filler("\n\t  \n"));
    }

    #[test]
    fn single_placeholder_is_filler() {
        // The original strict-equality guard already caught this; pin
        // it so a refactor doesn't regress.
        assert!(is_only_placeholder_filler(REASONING_PLACEHOLDER));
    }

    #[test]
    fn multiple_concatenated_placeholders_are_filler() {
        // The bug: DeepSeek V4 Flash 17-round session screenshot
        // showed the response's reasoning_content as 3 copies of the
        // placeholder concatenated with no separator. The old
        // `!= REASONING_PLACEHOLDER` check missed this and promoted
        // the meaningless string into the assistant text channel.
        let three = REASONING_PLACEHOLDER.repeat(3);
        assert!(is_only_placeholder_filler(&three));
        let five = REASONING_PLACEHOLDER.repeat(5);
        assert!(is_only_placeholder_filler(&five));
    }

    #[test]
    fn placeholders_with_whitespace_are_filler() {
        // Some gateways insert chunk delimiters (newlines, spaces)
        // between repeated placeholder echoes. Filler regardless.
        let mixed = format!("{}\n{}  {}", REASONING_PLACEHOLDER, REASONING_PLACEHOLDER, REASONING_PLACEHOLDER);
        assert!(is_only_placeholder_filler(&mixed));
    }

    #[test]
    fn real_reasoning_is_not_filler() {
        assert!(!is_only_placeholder_filler(
            "Let me think about this — first, the user wants..."
        ));
    }

    #[test]
    fn placeholder_plus_real_content_is_not_filler() {
        // If the model emits the placeholder AND some substantive
        // text, we still want promotion — the substantive text is
        // the real reasoning we'd want to keep.
        let mixed = format!("{} but actually I see now that...", REASONING_PLACEHOLDER);
        assert!(!is_only_placeholder_filler(&mixed));
    }
}

#[cfg(test)]
mod normalize_tool_args_tests {
    use super::normalize_tool_args;

    #[test]
    fn whitespace_variants_collapse() {
        // The deepseek-v4-flash screenshot symptom: same call, different
        // whitespace → must dedup.
        let a = r#"{"pattern":"**/*.rs"}"#;
        let b = r#"{"pattern": "**/*.rs"}"#;
        let c = r#"{ "pattern":"**/*.rs" }"#;
        let d = r#"{
  "pattern": "**/*.rs"
}"#;
        let na = normalize_tool_args(a);
        assert_eq!(normalize_tool_args(b), na);
        assert_eq!(normalize_tool_args(c), na);
        assert_eq!(normalize_tool_args(d), na);
    }

    #[test]
    fn key_order_collapses() {
        // serde_json::Map is BTreeMap-backed (no preserve_order feature),
        // so re-serialising sorts keys alphabetically.
        let a = r#"{"a":1,"b":2}"#;
        let b = r#"{"b":2,"a":1}"#;
        assert_eq!(normalize_tool_args(a), normalize_tool_args(b));
    }

    #[test]
    fn nested_objects_normalize_recursively() {
        let a = r#"{"outer":{"x":1,"y":2}}"#;
        let b = r#"{"outer":{"y":2,"x":1}}"#;
        assert_eq!(normalize_tool_args(a), normalize_tool_args(b));
    }

    #[test]
    fn semantically_different_args_stay_different() {
        // Don't over-collapse — different values must remain distinct so a
        // legitimate batch of `Glob(**/*.rs)` + `Glob(**/*.toml)` doesn't
        // dedup.
        let a = r#"{"pattern":"**/*.rs"}"#;
        let b = r#"{"pattern":"**/*.toml"}"#;
        assert_ne!(normalize_tool_args(a), normalize_tool_args(b));
    }

    #[test]
    fn non_json_args_pass_through_unchanged() {
        // Free-form / malformed payloads must not panic or merge.
        // (Two genuinely different garbage strings must stay distinct so
        // we don't accidentally dedup unrelated calls.)
        let raw = "not even json {{{";
        assert_eq!(normalize_tool_args(raw), raw);
        assert_ne!(normalize_tool_args("garbage A"), normalize_tool_args("garbage B"));
    }
}

#[cfg(test)]
mod tool_call_text_rescue_tests {
    use super::{repair_tool_call_args, rescue_text_tool_calls, ToolCall, ToolCallStreamFilter};

    #[test]
    fn rescues_qwen_xml_format() {
        // Qwen/GLM-5.1 sometimes emits args as <arg_key>/<arg_value> XML pairs
        // instead of a JSON blob. Without parsing this format the call gets
        // dispatched with empty args and edit_file fails with "old_string is
        // required" while the raw XML leaks into the user-visible stream.
        let text = r#"Let me make the edit:
<tool_call>
  <tool_name>edit_file</tool_name>
  <arg_key>file_path</arg_key><arg_value>src/main.rs</arg_value>
  <arg_key>old_string</arg_key><arg_value>        attrs
      }
  }</arg_value>
  <arg_key>new_string</arg_key><arg_value>        attrs.push(x);
        attrs
      }
  }</arg_value>
  <arg_key>replace_all</arg_key><arg_value>false</arg_value>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert_eq!(calls.len(), 1, "single XML block should rescue one call");
        assert_eq!(calls[0].name, "edit_file");
        let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(v["file_path"], "src/main.rs");
        assert_eq!(v["replace_all"], false);
        // Whitespace-sensitive — old_string must round-trip exactly so edit_file
        // can find the match in the file.
        assert_eq!(v["old_string"], "        attrs\n      }\n  }");
    }

    #[test]
    fn xml_without_tool_name_is_skipped() {
        // No <tool_name> means we have no idea what to dispatch — better to
        // skip than guess. An XML block with only <arg_key> tags is treated as
        // a malformed legacy emit (no `(` either, so the paren branch also
        // skips), yielding zero calls.
        let text = r#"<tool_call>
  <arg_key>file_path</arg_key><arg_value>x.rs</arg_value>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert!(calls.is_empty());
    }

    #[test]
    fn legacy_paren_json_format_still_works() {
        // Don't regress the existing rescue path used by GLM-5 via OpenRouter.
        let text = r#"<tool_call>read_file({"file_path":"a.rs"})</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert_eq!(calls.len(), 1);
        assert_eq!(calls[0].name, "read_file");
        let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(v["file_path"], "a.rs");
    }

    #[test]
    fn legacy_paren_kv_format_still_works() {
        let text = r#"<tool_call>read_file(file_path=a.rs, offset=10)</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert_eq!(calls.len(), 1);
        assert_eq!(calls[0].name, "read_file");
        let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(v["file_path"], "a.rs");
        assert_eq!(v["offset"], 10);
    }

    #[test]
    fn xml_coerces_bool_int_float() {
        let text = r#"<tool_call>
  <tool_name>cfg</tool_name>
  <arg_key>flag</arg_key><arg_value>true</arg_value>
  <arg_key>n</arg_key><arg_value>42</arg_value>
  <arg_key>f</arg_key><arg_value>3.14</arg_value>
  <arg_key>s</arg_key><arg_value>hello</arg_value>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(v["flag"], true);
        assert_eq!(v["n"], 42);
        assert!((v["f"].as_f64().unwrap() - 3.14).abs() < 1e-9);
        assert_eq!(v["s"], "hello");
    }

    #[test]
    fn rescues_qwen3_function_tag_format() {
        // Qwen3 / Qwen3-Coder / Qwen3.6 (and other OpenHands-trained agents)
        // emit tool calls as `<function=NAME><parameter=K>V</parameter>...
        // </function>`. Mirrors the exact shape captured in the 21:17 Qwen3.6
        // screenshot: two sequential read_file calls with int + path params.
        let text = r#"Let me look at the exact code structure more carefully.

<tool_call>
<function=read_file>
<parameter=limit>
30
</parameter>
<parameter=offset>
147
</parameter>
<parameter=file_path>
/tmp/cc-switch-src/src-tauri/src/proxy/response_processor.rs
</parameter>
</function>
</tool_call>
<tool_call>
<function=read_file>
<parameter=limit>
20
</parameter>
<parameter=offset>
530
</parameter>
<parameter=file_path>
/tmp/cc-switch-src/src-tauri/src/proxy/providers/streaming.rs
</parameter>
</function>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert_eq!(calls.len(), 2, "two sequential blocks must rescue two calls");

        let v0: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(calls[0].name, "read_file");
        assert_eq!(v0["limit"], 30);
        assert_eq!(v0["offset"], 147);
        // File path must NOT carry the layout newlines that wrapped the value
        // inside the `<parameter>` element — file dispatch on a `"\n/tmp/...\n"`
        // arg would 404.
        assert_eq!(
            v0["file_path"],
            "/tmp/cc-switch-src/src-tauri/src/proxy/response_processor.rs"
        );

        let v1: serde_json::Value = serde_json::from_str(&calls[1].arguments).unwrap();
        assert_eq!(calls[1].name, "read_file");
        assert_eq!(v1["limit"], 20);
        assert_eq!(v1["offset"], 530);
        assert_eq!(
            v1["file_path"],
            "/tmp/cc-switch-src/src-tauri/src/proxy/providers/streaming.rs"
        );
    }

    #[test]
    fn function_tag_preserves_internal_whitespace_for_edit_file_old_string() {
        // edit_file's `old_string` is matched against the file verbatim, so
        // internal whitespace MUST round-trip. The layout-newline strip should
        // only peel ONE leading and ONE trailing `\n`, leaving multi-line
        // bodies intact.
        let text = r#"<tool_call>
<function=edit_file>
<parameter=file_path>
src/main.rs
</parameter>
<parameter=old_string>
        attrs
      }
  }
</parameter>
<parameter=new_string>
        attrs.push(x);
        attrs
      }
  }
</parameter>
</function>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert_eq!(calls.len(), 1);
        assert_eq!(calls[0].name, "edit_file");
        let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(v["file_path"], "src/main.rs");
        assert_eq!(v["old_string"], "        attrs\n      }\n  }");
        assert_eq!(
            v["new_string"],
            "        attrs.push(x);\n        attrs\n      }\n  }"
        );
    }

    #[test]
    fn function_tag_without_close_function_tag_skips() {
        // Missing `</function>` close — better to drop the rescue than guess
        // where the call body ended. Matches the existing safety posture in
        // `parse_xml_tool_call` (returns None on missing tags rather than
        // making things up).
        let text = r#"<tool_call>
<function=read_file>
<parameter=file_path>x.rs</parameter>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert!(calls.is_empty(), "missing </function> must yield zero calls");
    }

    #[test]
    fn function_tag_without_any_parameter_skips() {
        // A `<function=...>` with no `<parameter=...>` children means we'd
        // dispatch with empty args — usually broken intent. Skip rather than
        // guess (mirrors `xml_without_tool_name_is_skipped`).
        let text = r#"<tool_call>
<function=list_files>
</function>
</tool_call>"#;
        let calls = rescue_text_tool_calls(text);
        assert!(calls.is_empty(), "no <parameter> children → no rescue");
    }

    #[test]
    fn stream_filter_passes_plain_text() {
        let mut f = ToolCallStreamFilter::default();
        let out = f.feed("hello world");
        // Holdback may keep up to 10 bytes in case "<tool_call" is starting,
        // so flush to get the full output.
        let tail = f.flush();
        assert_eq!(format!("{}{}", out, tail), "hello world");
    }

    /// Regression for 5-7 atomgr datalog (build dd425fd, 20-14-23 Turn 5):
    /// GLM-5.1 emitted prose then mid-sentence switched to XML tool_call:
    /// `### 3. 传输层安全<tool_call>grep<arg_key>pattern</arg_key>...
    /// </tool_call>`. The stream_filter caught it for streamed deltas /
    /// conversation history, but `TurnResult::Responded.text` used raw
    /// `text_buf` → `datalog::log_text` printed the XML in `**Response:**`.
    ///
    /// Fix: parallel `visible_text_buf` mirrors what the filter actually
    /// emitted; `Responded.text` and `UsedTools.text` use it instead of
    /// raw text_buf. This test pins the visible-side behavior for the
    /// exact Turn 5 input shape.
    #[test]
    fn glm_xml_leak_mid_prose_strips_to_clean_visible_text() {
        let mut f = ToolCallStreamFilter::default();
        let mut visible = String::new();

        // Replay the actual Turn 5 chunking shape: prose, then XML
        // tool_call split across multiple deltas (provider chunks at
        // arbitrary boundaries — the filter must hold back across them).
        for chunk in [
            "### 3. 传输层安全",
            "<tool_call>grep",
            "<arg_key>pattern</arg_key>",
            "<arg_value>http://</arg_value>",
            "<arg_key>path</arg_key>",
            "<arg_value>/Users/y/project</arg_value>",
            "</tool_call>",
        ] {
            visible.push_str(&f.feed(chunk));
        }
        visible.push_str(&f.flush());

        assert!(
            !visible.contains("<tool_call>"),
            "visible accumulator must strip <tool_call> open tag: {:?}",
            visible
        );
        assert!(
            !visible.contains("</tool_call>"),
            "visible accumulator must strip </tool_call> close tag: {:?}",
            visible
        );
        assert!(
            !visible.contains("<arg_key>") && !visible.contains("<arg_value>"),
            "visible accumulator must strip XML inner tags: {:?}",
            visible
        );
        assert_eq!(
            visible, "### 3. 传输层安全",
            "only the pre-tool prose should reach Responded.text"
        );
    }

    #[test]
    fn stream_filter_strips_complete_block_in_one_chunk() {
        let mut f = ToolCallStreamFilter::default();
        let out = f.feed("before <tool_call>edit_file({})</tool_call> after");
        let tail = f.flush();
        let combined = format!("{}{}", out, tail);
        assert!(combined.contains("before "));
        assert!(combined.contains(" after"));
        assert!(!combined.contains("<tool_call>"));
        assert!(!combined.contains("</tool_call>"));
        assert!(!combined.contains("edit_file"));
    }

    #[test]
    fn stream_filter_strips_block_split_across_chunks() {
        // Realistic case: provider streams bytes that split the open tag
        // arbitrarily. The filter must hold back partial-tag bytes, not emit
        // them, and resume cleanly when the close arrives.
        let mut f = ToolCallStreamFilter::default();
        let mut visible = String::new();
        for chunk in [
            "before <tool_",
            "call><tool_name>edit_file</tool_name>",
            "<arg_key>k</arg_key><arg_value>v</arg_value>",
            "</tool_call> after",
        ] {
            visible.push_str(&f.feed(chunk));
        }
        visible.push_str(&f.flush());
        assert_eq!(visible, "before  after");
    }

    #[test]
    fn stream_filter_drops_unclosed_block() {
        // If the stream ends mid-`<tool_call>` (truncation, error), discard
        // the holdback rather than leaking the open fragment to the user.
        let mut f = ToolCallStreamFilter::default();
        let out = f.feed("text <tool_call>edit_file({});");
        let tail = f.flush();
        let combined = format!("{}{}", out, tail);
        assert_eq!(combined, "text ");
    }

    #[test]
    fn stream_filter_handles_partial_open_at_chunk_end() {
        // The filter must not emit `<` or `<t` etc. as visible text just
        // because the chunk happened to end mid-tag.
        let mut f = ToolCallStreamFilter::default();
        let v1 = f.feed("hello <");
        // Could be holdback; not guaranteed any specific output yet.
        let v2 = f.feed("tool_call>x</tool_call>!");
        let tail = f.flush();
        let combined = format!("{}{}{}", v1, v2, tail);
        assert_eq!(combined, "hello !");
    }

    #[test]
    fn stream_filter_passes_through_lt_that_isnt_tool_call() {
        // A bare `<` followed by non-tool_call content should eventually flush.
        let mut f = ToolCallStreamFilter::default();
        let mut visible = String::new();
        visible.push_str(&f.feed("a < b "));
        visible.push_str(&f.feed("and c <"));
        visible.push_str(&f.feed("d>e"));
        visible.push_str(&f.flush());
        assert_eq!(visible, "a < b and c <d>e");
    }

    fn tc(id: &str, name: &str, args: &str) -> ToolCall {
        ToolCall {
            id: id.into(),
            name: name.into(),
            arguments: args.into(),
        }
    }

    #[test]
    fn repair_fills_missing_old_string_from_xml() {
        // Reproduces the user-reported bug: the function-calling JSON channel
        // delivered `{file_path, new_string, replace_all}` (passes
        // validate_args because new_string is present) but missing
        // `old_string` — execute() then fails with "old_string is required".
        // The full args were carried as XML in the text stream. After repair,
        // the call has all four keys.
        let mut calls = vec![tc(
            "c1",
            "edit_file",
            r#"{"file_path":"x.rs","new_string":"new","replace_all":false}"#,
        )];
        let xml_pool = vec![tc(
            "rescued_0",
            "edit_file",
            r#"{"file_path":"x.rs","old_string":"old","new_string":"new","replace_all":false}"#,
        )];
        repair_tool_call_args(&mut calls, &xml_pool);
        let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(merged["file_path"], "x.rs");
        assert_eq!(merged["old_string"], "old");
        assert_eq!(merged["new_string"], "new");
        assert_eq!(merged["replace_all"], false);
    }

    #[test]
    fn repair_does_not_overwrite_keys_present_in_json() {
        // Conflict policy: function-calling JSON is the source of truth; XML
        // only fills gaps. If the JSON channel and XML disagree on a key
        // (e.g. different new_string), keep JSON.
        let mut calls = vec![tc(
            "c1",
            "edit_file",
            r#"{"file_path":"x.rs","new_string":"json_wins"}"#,
        )];
        let xml_pool = vec![tc(
            "rescued_0",
            "edit_file",
            r#"{"file_path":"x.rs","new_string":"xml_loses","old_string":"old"}"#,
        )];
        repair_tool_call_args(&mut calls, &xml_pool);
        let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(merged["new_string"], "json_wins");
        assert_eq!(merged["old_string"], "old");
    }

    #[test]
    fn repair_skips_when_names_dont_match() {
        // Repair must never cross tool boundaries — patching read_file with
        // edit_file's args would dispatch a malformed call.
        let original = r#"{"file_path":"x.rs"}"#;
        let mut calls = vec![tc("c1", "read_file", original)];
        let xml_pool = vec![tc(
            "rescued_0",
            "edit_file",
            r#"{"file_path":"x.rs","old_string":"a","new_string":"b"}"#,
        )];
        repair_tool_call_args(&mut calls, &xml_pool);
        assert_eq!(calls[0].arguments, original);
    }

    #[test]
    fn repair_takes_xml_wholesale_when_json_unparseable() {
        // Truncated/garbled args from the JSON channel would otherwise
        // fail to parse later anyway. Replace with the XML object.
        let mut calls = vec![tc("c1", "edit_file", r#"{"file_path": "trunc"#)];
        let xml_pool = vec![tc(
            "rescued_0",
            "edit_file",
            r#"{"file_path":"x.rs","old_string":"a","new_string":"b"}"#,
        )];
        repair_tool_call_args(&mut calls, &xml_pool);
        let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(merged["file_path"], "x.rs");
        assert_eq!(merged["old_string"], "a");
    }

    #[test]
    fn repair_matches_multiple_same_name_calls_in_order() {
        // Two edit_file calls in the same turn, two XML blocks — match by
        // order so the second JSON call gets the second XML's args, not the
        // first one's reused.
        let mut calls = vec![
            tc("c1", "edit_file", r#"{"file_path":"a.rs","new_string":"a_new"}"#),
            tc("c2", "edit_file", r#"{"file_path":"b.rs","new_string":"b_new"}"#),
        ];
        let xml_pool = vec![
            tc(
                "rescued_0",
                "edit_file",
                r#"{"file_path":"a.rs","old_string":"a_old","new_string":"a_new"}"#,
            ),
            tc(
                "rescued_1",
                "edit_file",
                r#"{"file_path":"b.rs","old_string":"b_old","new_string":"b_new"}"#,
            ),
        ];
        repair_tool_call_args(&mut calls, &xml_pool);
        let m1: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        let m2: serde_json::Value = serde_json::from_str(&calls[1].arguments).unwrap();
        assert_eq!(m1["old_string"], "a_old");
        assert_eq!(m2["old_string"], "b_old");
    }

    #[test]
    fn repair_no_op_when_json_already_complete() {
        // If the JSON channel got everything right, repair is silent —
        // serialization-level identity isn't guaranteed (key order may
        // change), but semantic equality holds.
        let mut calls = vec![tc(
            "c1",
            "edit_file",
            r#"{"file_path":"x.rs","old_string":"a","new_string":"b","replace_all":false}"#,
        )];
        let xml_pool = vec![tc(
            "rescued_0",
            "edit_file",
            r#"{"file_path":"x.rs","old_string":"a"}"#,
        )];
        let before: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        repair_tool_call_args(&mut calls, &xml_pool);
        let after: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
        assert_eq!(before, after);
    }

    #[test]
    fn repair_skips_unparseable_xml() {
        // If the XML pool has a bogus entry (e.g. arguments that aren't a
        // JSON object), repair must skip it without crashing or polluting
        // the JSON call.
        let original = r#"{"file_path":"x.rs"}"#;
        let mut calls = vec![tc("c1", "edit_file", original)];
        let xml_pool = vec![tc("rescued_0", "edit_file", "not even json")];
        repair_tool_call_args(&mut calls, &xml_pool);
        assert_eq!(calls[0].arguments, original);
    }

    #[test]
    fn stream_filter_handles_utf8_at_holdback_boundary() {
        // UTF-8 multi-byte chars must not get split across the holdback
        // boundary — the trail snap rounds up to a char boundary.
        let mut f = ToolCallStreamFilter::default();
        let mut visible = String::new();
        visible.push_str(&f.feed("中文 hello "));
        visible.push_str(&f.feed("世界"));
        visible.push_str(&f.flush());
        assert_eq!(visible, "中文 hello 世界");
    }
}

/// Build a structured `error_data` JSON for LLM errors, following the
/// telemetry design doc (section 3.5 — `llm_chat` event).
///
/// Extracts `status_code` from the raw error string (patterns like "401",
/// "403", "429", "500", "502", "503") and scrubs the message via
/// `scrub::scrub_path` + `scrub::truncate_head(_, 200)`.
pub(crate) fn build_llm_error_data(
    kind: LlmErrorKind,
    reason: &str,
    duration_ms: u32,
    provider: Option<&str>,
    provider_host: Option<&str>,
    model: Option<&str>,
    context_window: u32,
    system_tokens: u32,
    tool_def_tokens: u32,
    tool_result_tokens: u32,
    message_tokens: u32,
    messages_count: u32,
) -> Option<String> {
    use atomcode_telemetry::scrub;

    // ── Extract status code from the raw error string ──────────────
    let status_code: Option<u16> = extract_status_code(reason);

    // ── Build a concise, scrubbed error message ───────────────────
    // Strip the raw JSON body that some providers append after a colon.
    let home = std::env::var("HOME").ok().map(|h| std::path::PathBuf::from(h));
    let cwd = std::env::var("PWD").ok().map(|c| std::path::PathBuf::from(c));
    let message_raw = scrub::scrub_path(
        reason,
        home.as_deref(),
        cwd.as_deref(),
    );
    let message = scrub::truncate_head(&message_raw, 200);

    let base = || -> serde_json::Value {
        let mut m = serde_json::Map::new();
        m.insert("duration_ms".into(), serde_json::json!(duration_ms));
        if let Some(p) = provider {
            m.insert("provider".into(), serde_json::json!(p));
        }
        if let Some(h) = provider_host {
            m.insert("provider_host".into(), serde_json::json!(h));
        }
        if let Some(mdl) = model {
            m.insert("model".into(), serde_json::json!(mdl));
        }
        serde_json::Value::Object(m)
    };

    let map = match kind {
        LlmErrorKind::AuthError => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            if let Some(sc) = status_code {
                obj.insert("status_code".into(), serde_json::json!(sc));
            }
            obj.insert("message".into(), serde_json::json!(message));
            m
        }
        LlmErrorKind::RateLimited => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            if let Some(sc) = status_code {
                obj.insert("status_code".into(), serde_json::json!(sc));
            }
            obj.insert("message".into(), serde_json::json!(message));
            // retry_after_secs: could be parsed from Retry-After header,
            // but we don't have that info here. Leave as null.
            obj.insert("retry_after_secs".into(), serde_json::Value::Null);
            m
        }
        LlmErrorKind::ServerError => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            if let Some(sc) = status_code {
                obj.insert("status_code".into(), serde_json::json!(sc));
            }
            obj.insert("message".into(), serde_json::json!(message));
            m
        }
        LlmErrorKind::NetworkError => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            obj.insert("message".into(), serde_json::json!(message));
            obj.insert("attempt_duration_ms".into(), serde_json::json!(duration_ms));
            obj.insert("is_retry".into(), serde_json::json!(false));
            m
        }
        LlmErrorKind::StreamTimeout => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            obj.insert("timeout_secs".into(), serde_json::json!(duration_ms / 1000));
            // Phase heuristic: if no tokens were received → "first_token",
            // otherwise "subsequent". We don't have per-event token counts
            // at this layer, so default to "first_token".
            obj.insert("phase".into(), serde_json::json!("first_token"));
            obj.insert("tokens_received".into(), serde_json::json!(0));
            m
        }
        LlmErrorKind::StreamInterrupted => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            obj.insert("message".into(), serde_json::json!(message));
            obj.insert("bytes_received".into(), serde_json::Value::Null);
            obj.insert("tokens_received".into(), serde_json::Value::Null);
            obj.insert("finish_reason".into(), serde_json::Value::Null);
            m
        }
        LlmErrorKind::ContextOverflow => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            let sent_tokens = system_tokens
                .saturating_add(tool_def_tokens)
                .saturating_add(tool_result_tokens)
                .saturating_add(message_tokens);
            obj.insert("context_window".into(), serde_json::json!(context_window));
            obj.insert("sent_tokens".into(), serde_json::json!(sent_tokens));
            obj.insert("system_tokens".into(), serde_json::json!(system_tokens));
            obj.insert("tool_def_tokens".into(), serde_json::json!(tool_def_tokens));
            obj.insert("tool_result_tokens".into(), serde_json::json!(tool_result_tokens));
            obj.insert("message_tokens".into(), serde_json::json!(message_tokens));
            obj.insert("messages_count".into(), serde_json::json!(messages_count));
            m
        }
        LlmErrorKind::Other => {
            let mut m = base();
            let obj = m.as_object_mut().unwrap();
            obj.insert("message".into(), serde_json::json!(message));
            m
        }
    };

    Some(map.to_string())
}

/// Extract an HTTP status code from a raw error string.
/// Looks for patterns like "401", "403", "429", "500", "502", "503"
/// that appear as standalone numbers (not part of a larger number).
fn extract_status_code(reason: &str) -> Option<u16> {
    // Common HTTP error status codes to look for
    let codes = [401u16, 403, 429, 500, 502, 503];
    let lower = reason.to_lowercase();
    for code in codes {
        // Check if the code appears as a standalone number
        // Match patterns like "401", "(401)", "error 401", "HTTP 401"
        let code_str = code.to_string();
        if lower.contains(&code_str) {
            return Some(code);
        }
    }
    None
}

/// Classify an LLM error reason string into a telemetry `LlmErrorKind`.
pub(crate) fn classify_llm_error(reason: &str) -> LlmErrorKind {
    let r = reason.to_lowercase();
    if r.contains("401") || r.contains("403") || r.contains("unauthorized") || r.contains("auth") {
        LlmErrorKind::AuthError
    } else if r.contains("429") || r.contains("rate") || r.contains("throttl") {
        LlmErrorKind::RateLimited
    } else if r.contains("500") || r.contains("502") || r.contains("503") {
        LlmErrorKind::ServerError
    } else if r.contains("stream timeout") || r.contains("no event for") {
        LlmErrorKind::StreamTimeout
    } else if r.contains("decode") || r.contains("mid-flight") || r.contains("terminated") {
        LlmErrorKind::StreamInterrupted
    } else if r.contains("context") || r.contains("max_tokens") || r.contains("token limit") {
        LlmErrorKind::ContextOverflow
    } else if r.contains("connect") || r.contains("dns") || r.contains("network") || r.contains("timeout") {
        LlmErrorKind::NetworkError
    } else {
        LlmErrorKind::Other
    }
}

/// Build a concise summary of tool call arguments for telemetry.
/// Extracts top-level JSON keys and truncates values to avoid leaking sensitive data.
pub(crate) fn build_args_summary(tool_name: &str, args: &str) -> String {
    if let Ok(v) = serde_json::from_str::<serde_json::Value>(args) {
        if let Some(obj) = v.as_object() {
            let pairs: Vec<String> = obj
                .iter()
                .map(|(k, v)| {
                    let val_str = match v {
                        serde_json::Value::String(s) => {
                            atomcode_telemetry::scrub::truncate_head(s, 50)
                        }
                        serde_json::Value::Number(n) => n.to_string(),
                        serde_json::Value::Bool(b) => b.to_string(),
                        serde_json::Value::Null => "null".to_string(),
                        _ => format!("<{}>", match v {
                            serde_json::Value::Array(_) => "array",
                            serde_json::Value::Object(_) => "object",
                            _ => "value",
                        }),
                    };
                    format!("{}={}", k, val_str)
                })
                .collect();
            return format!("{}({})", tool_name, pairs.join(", "));
        }
    }
    // Fallback: truncate raw args
    format!("{}({})", tool_name, atomcode_telemetry::scrub::truncate_head(args, 100))
}