import json
import logging
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch

import pytest

from openjiuwen_deepsearch.algorithm.user_feedback_processor.new_task_processor import (
    NewTaskProcessor,
    NewTaskAssetAssessment,
    SectionHistoricalAssets,
    NewTaskTargetSection,
)
from openjiuwen_deepsearch.common.exception import CustomValueException
from openjiuwen_deepsearch.common.status_code import StatusCode
from openjiuwen_deepsearch.framework.openjiuwen.agent.search_context import (
    Outline,
    Plan,
    Report,
    RetrievalQuery,
    Section,
    Step,
    StepType,
    SubReport,
    SubReportContent,
)
from openjiuwen_deepsearch.utils.constants_utils.node_constants import AgentLlmName


def test_resolve_target_section_strips_citation_and_infer_markup():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 总览\n"
        "## 第一章\n"
        "原文[checked_citation: 1][[1]](https://a.com)"
        "[推理结论](#inference:3)\n"
        "## 第二章\n"
        "其他内容"
    )
    feedback = {
        "selected_text": "原文[checked_citation: 1][[1]](https://a.com)[推理结论](#inference:3)",
        "start_offset": report.index("原文"),
        "end_offset": report.index("\n## 第二章"),
        "user_instruction": "补充行业背景",
    }

    target = processor.resolve_target_section(report_content=report, feedback=feedback)

    assert target.section_title == "第一章"
    assert target.clean_section_text == "## 第一章\n原文推理结论\n"
    assert target.clean_selected_text == "原文推理结论"


def test_resolve_target_section_keeps_numbered_major_chapter_context():
    """验证 new_task 会保留选区所属的编号大章节上下文。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 已有小节\n"
        "已有内容\n"
        "## 1.2 目标小节\n"
        "目标内容\n"
        "# 2. 第二大章\n"
        "其他内容"
    )
    selected_text = "目标内容"
    feedback = {
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "补充新的对比维度",
    }

    target = processor.resolve_target_section(report_content=report, feedback=feedback)

    assert target.section_title == "1.2 目标小节"
    assert target.major_section_title == "1. 第一大章"
    assert target.major_section_start_offset == 0
    assert target.major_section_end_offset == report.index("# 2. 第二大章")
    assert "## 1.1 已有小节" in target.clean_major_section_text
    assert "## 1.2 目标小节" in target.clean_major_section_text


def test_validate_new_subsection_normalizes_llm_heading_to_expected_title():
    """验证新增小节会规范化 LLM 返回的标题。"""
    target = NewTaskTargetSection(
        section_title="1.2 目标小节",
        section_text="## 1.2 目标小节\n目标内容",
        clean_section_text="## 1.2 目标小节\n目标内容",
        clean_selected_text="目标内容",
        section_start_offset=0,
        section_end_offset=15,
        major_section_title="1. 第一大章",
        major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n",
        clean_major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n",
        major_section_start_offset=0,
        major_section_end_offset=40,
        major_heading_level=1,
    )

    normalized = NewTaskProcessor._validate_new_subsection(
        target=target,
        subsection_text="## 第二名城市对比分析\n新增小节内容",
        subsection_title="1.5 第二名城市对比分析",
    )

    assert normalized == "## 1.5 第二名城市对比分析\n新增小节内容"


def test_validate_new_subsection_extracts_expected_block_from_full_major_output():
    """验证新增小节会从误返回的大章节中截取目标小节块。"""
    target = NewTaskTargetSection(
        section_title="1.2 目标小节",
        section_text="## 1.2 目标小节\n目标内容",
        clean_section_text="## 1.2 目标小节\n目标内容",
        clean_selected_text="目标内容",
        section_start_offset=0,
        section_end_offset=15,
        major_section_title="1. 第一大章",
        major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        clean_major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        major_section_start_offset=0,
        major_section_end_offset=50,
        major_heading_level=1,
    )

    normalized = NewTaskProcessor._validate_new_subsection(
        target=target,
        subsection_text=(
            "# 1. 第一大章\n"
            "大章导语\n"
            "## 1.1 已有小节\n"
            "已有内容\n"
            "## 1.5 第二名城市对比分析\n"
            "新增小节内容\n"
            "### 对比维度\n"
            "细分内容\n"
            "## 1.6 其他小节\n"
            "不应保留"
        ),
        subsection_title="1.5 第二名城市对比分析",
    )

    assert normalized == "## 1.5 第二名城市对比分析\n新增小节内容\n### 对比维度\n细分内容"


def test_collect_section_assets_matches_outline_section_and_aggregates_doc_infos():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )
    outline = Outline(
        thought="",
        title="报告",
        sections=[
            Section(
                id="1",
                title="第一章",
                description="",
                plans=[
                    Plan(
                        title="plan",
                        thought="",
                        is_research_completed=False,
                        steps=[
                            Step(
                                type=StepType.INFO_COLLECTING,
                                title="step",
                                description="desc",
                                retrieval_queries=[
                                    RetrievalQuery(
                                        query="q1",
                                        doc_infos=[
                                            {"title": "文档A", "url": "https://a.com"},
                                            {"title": "文档B", "url": "https://b.com"},
                                        ],
                                    )
                                ],
                            )
                        ],
                    )
                ],
            )
        ],
    )
    report = Report(
        title="",
        report_task="报告",
        sub_reports=[
            SubReport(
                section_id="1",
                section_task="第一章",
                content=SubReportContent(
                    classified_content=[{"title": "文档A", "url": "https://a.com"}],
                    sub_report_content_text="旧章节内容",
                ),
            )
        ],
    )

    assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.section_id == "1"
    assert assets.match_mode == "title_exact"
    assert assets.historical_doc_infos == [
        {"title": "文档A", "url": "https://a.com"},
        {"title": "文档B", "url": "https://b.com"},
    ]
    assert not hasattr(assets, "historical_classified_content")


def test_collect_section_assets_treats_none_list_fields_as_empty():
    """验证历史结构化状态中的空列表字段为 None 时会按空资产处理。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )
    outline = SimpleNamespace(sections=None)
    report = SimpleNamespace(sub_reports=None)

    assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.section_id is None
    assert assets.match_mode == "none"
    assert assets.historical_doc_infos == []


def test_collect_section_assets_filters_malformed_doc_infos(caplog):
    """验证历史文档去重会跳过结构异常的 doc_info。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )
    valid_doc = {"title": "文档A", "url": "https://a.com"}
    outline = SimpleNamespace(
        sections=[
            SimpleNamespace(
                id="1",
                title="第一章",
                plans=[
                    SimpleNamespace(
                        steps=[
                            SimpleNamespace(
                                retrieval_queries=[
                                    SimpleNamespace(
                                        doc_infos=[
                                            valid_doc,
                                            {"title": "缺少 URL"},
                                            {"url": "https://missing-title.com"},
                                            None,
                                            "not a dict",
                                            {"title": "文档A", "url": "https://a.com", "extra": "later duplicate"},
                                        ]
                                    )
                                ]
                            )
                        ]
                    )
                ],
            )
        ]
    )
    report = SimpleNamespace(
        sub_reports=[
            SimpleNamespace(
                section_id="1",
                content=None,
                background_knowledge=None,
            )
        ]
    )

    with caplog.at_level(logging.WARNING):
        assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.historical_doc_infos == [
        {"title": "文档A", "url": "https://a.com"}
    ]
    assert any("filtered malformed doc_infos" in record.message for record in caplog.records)


def test_new_task_deduplicate_doc_infos_keeps_same_url_different_sources():
    docs = [
        {
            "title": "文档A",
            "url": "https://example.com/a",
            "source_id": "source-a",
            "original_content": "content A",
        },
        {
            "title": "文档A",
            "url": "https://example.com/a",
            "source_id": "source-b",
            "original_content": "content B",
        },
    ]

    result = NewTaskProcessor._deduplicate_doc_infos(docs)

    assert [doc["source_id"] for doc in result] == ["source-a", "source-b"]


def test_collect_section_assets_matches_by_normalized_title():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="1. 第一章:",
        section_text="## 1. 第一章:\n旧章节内容",
        clean_section_text="## 1. 第一章:\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=15,
    )
    outline = Outline(
        thought="",
        title="报告",
        sections=[Section(id="1", title="第一章", description="", plans=[])],
    )
    report = Report(
        title="",
        report_task="报告",
        sub_reports=[
            SubReport(
                section_id="1",
                section_task="第一章",
                content=SubReportContent(
                    classified_content=[{"title": "文档A", "url": "https://a.com"}],
                    sub_report_content_text="旧章节内容",
                ),
            )
        ],
    )

    assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.section_id == "1"
    assert assets.match_mode == "title_normalized"


def test_collect_section_assets_does_not_match_by_section_order():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第二章 当前分析",
        section_text="## 第二章 当前分析\n旧章节内容",
        clean_section_text="## 第二章 当前分析\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=18,
    )
    outline = Outline(
        thought="",
        title="报告",
        sections=[
            Section(id="1", title="背景", description="", plans=[]),
            Section(id="2", title="市场现状", description="", plans=[]),
        ],
    )
    report = Report(
        title="",
        report_task="报告",
        sub_reports=[
            SubReport(
                section_id="1",
                section_task="背景",
                content=SubReportContent(classified_content=[], sub_report_content_text="背景章节"),
            ),
            SubReport(
                section_id="2",
                section_task="市场现状",
                content=SubReportContent(classified_content=[], sub_report_content_text="旧章节内容"),
            ),
        ],
    )

    assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.section_id is None
    assert assets.match_mode == "none"


def test_collect_section_assets_does_not_match_by_content_similarity():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="完全不同的标题",
        section_text="## 完全不同的标题\n行业规模在 2024 年显著增长,企业上云比例继续提升。",
        clean_section_text="## 完全不同的标题\n行业规模在 2024 年显著增长,企业上云比例继续提升。",
        clean_selected_text="行业规模在 2024 年显著增长,企业上云比例继续提升。",
        section_start_offset=0,
        section_end_offset=38,
    )
    outline = Outline(
        thought="",
        title="报告",
        sections=[
            Section(id="1", title="背景", description="", plans=[]),
            Section(id="2", title="市场现状", description="", plans=[]),
        ],
    )
    report = Report(
        title="",
        report_task="报告",
        sub_reports=[
            SubReport(
                section_id="1",
                section_task="背景",
                content=SubReportContent(classified_content=[], sub_report_content_text="这是一段完全无关的内容"),
            ),
            SubReport(
                section_id="2",
                section_task="市场现状",
                content=SubReportContent(
                    classified_content=[{"title": "文档B", "url": "https://b.com"}],
                    sub_report_content_text="行业规模在 2024 年显著增长,企业上云比例继续提升。",
                ),
            ),
        ],
    )

    assets = processor.collect_section_assets(target=target, current_outline=outline)

    assert assets.section_id is None
    assert assets.match_mode == "none"


@pytest.mark.asyncio
async def test_assessment_not_sufficient_builds_incremental_plan_from_missing_aspects():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )

    with patch.object(processor, "assess_section_assets", new_callable=AsyncMock) as mock_assess:
        mock_assess.return_value = NewTaskAssetAssessment(
            relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
            is_sufficient=False,
            missing_aspects=["行业背景", "市场规模数据"],
            reasoning_summary="现有资料缺少行业背景与定量数据",
        )
        plan = await processor.build_incremental_plan(
            assets=assets,
            feedback={"user_instruction": "补充行业背景"},
            language="zh-CN",
        )

    assert plan.title == "NEW_TASK incremental research"
    assert len(plan.steps) == 1
    assert plan.steps[0].type == StepType.INFO_COLLECTING
    assert "行业背景" in plan.steps[0].description
    assert "市场规模数据" in plan.steps[0].description


@pytest.mark.asyncio
async def test_run_new_task_merges_new_docs_and_rewrites_entire_section(caplog):
    processor = NewTaskProcessor(llm_model_name="mock_model")
    caplog.set_level(logging.INFO)
    final_result = {
        "response_content": "## 第一章\n旧章节内容\n## 第二章\n其他内容",
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": "旧章节内容",
        "start_offset": 6,
        "end_offset": 11,
        "user_instruction": "补充行业背景",
    }
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容\n",
        clean_section_text="## 第一章\n旧章节内容\n",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=12,
    )
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容\n",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=False,
        missing_aspects=["行业背景"],
        reasoning_summary="还缺行业背景",
    )

    with patch.object(processor, "resolve_target_section", return_value=target):
        with patch.object(processor, "collect_section_assets", return_value=assets):
            with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
                with patch.object(processor, "build_incremental_plan", new_callable=AsyncMock) as mock_build_plan:
                    mock_build_plan.return_value = Plan(
                        id="",
                        language="zh-CN",
                        title="NEW_TASK incremental research",
                        thought="",
                        is_research_completed=False,
                        steps=[Step(type=StepType.INFO_COLLECTING, title="Collect", description="补充行业背景")],
                    )
                    with patch.object(processor, "run_incremental_collection", new_callable=AsyncMock) as mock_collect:
                        mock_collect.return_value = {
                            "info_summary": "补充了行业背景",
                            "doc_infos": [{"title": "文档B", "url": "https://b.com"}],
                        }
                        with patch.object(processor, "rewrite_section_with_assets", new_callable=AsyncMock) as mock_rewrite:
                            mock_rewrite.return_value = "## 第一章\n新章节内容\n"
                            result = await processor.run_new_task(
                                feedback=feedback,
                                final_result=final_result,
                                language="zh-CN",
                            )

    assert result["new_report"] == "## 第一章\n新章节内容\n## 第二章\n其他内容"
    assert result["used_historical_doc_count"] == 1
    assert result["used_new_doc_count"] == 1
    assert result["incremental_doc_infos"] == [{"title": "文档B", "url": "https://b.com"}]
    assert result["assessment_summary"] == "还缺行业背景"
    assert any("incremental collection started" in record.message for record in caplog.records)
    assert any("incremental collection completed" in record.message for record in caplog.records)
    assert any("edit_strategy=modify_existing_subsection" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_run_new_task_keeps_newline_before_next_heading():
    """验证整章改写后保留下一章节标题前的原始空行。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = "## 第一章\n旧章节内容\n本章结尾。\n\n## 第二章\n其他内容"
    selected_text = "旧章节内容"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "补充行业背景",
    }
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容\n本章结尾。\n\n",
        clean_section_text="## 第一章\n旧章节内容\n本章结尾。\n\n",
        clean_selected_text=selected_text,
        section_start_offset=0,
        section_end_offset=len("## 第一章\n旧章节内容\n本章结尾。\n\n"),
    )
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text=target.clean_section_text,
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="历史资料足够",
    )

    with patch.object(processor, "resolve_target_section", return_value=target):
        with patch.object(processor, "collect_section_assets", return_value=assets):
            with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
                with patch.object(
                    processor,
                    "rewrite_section_with_assets",
                    new_callable=AsyncMock,
                    return_value="## 第一章\n改写后的章节内容",
                ):
                    result = await processor.run_new_task(
                        feedback=feedback,
                        final_result=final_result,
                        language="zh-CN",
                    )

    assert "改写后的章节内容\n\n## 第二章" in result["new_report"]
    assert "改写后的章节内容## 第二章" not in result["new_report"]


@pytest.mark.asyncio
async def test_run_new_task_appends_new_subsection_to_numbered_major_chapter(caplog):
    """验证新增任务可以在编号大章节末尾追加新的小章节。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    caplog.set_level(logging.INFO)
    report = (
        "# 1. 第一大章\n"
        "## 1.1 已有小节\n"
        "已有内容\n\n"
        "## 1.2 目标小节\n"
        "目标内容\n\n"
        "# 2. 第二大章\n"
        "其他内容"
    )
    selected_text = "目标内容"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "新增一个小节分析第二名城市差距",
    }
    target = processor.resolve_target_section(report_content=report, feedback=feedback)
    assets = SectionHistoricalAssets(
        section_id="1.2",
        match_mode="title_exact",
        section_title=target.section_title,
        current_section_text=target.clean_major_section_text,
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="需要新增独立小节承载新维度",
        edit_strategy="append_new_subsection",
        subsection_title="第二名城市差距分析",
    )

    with patch.object(processor, "collect_section_assets", return_value=assets):
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
            with patch.object(
                processor,
                "generate_new_subsection_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.3 第二名城市差距分析\n新增小节内容",
            ) as mock_generate:
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result=final_result,
                    language="zh-CN",
                )

    mock_generate.assert_awaited_once()
    assert "## 1.2 目标小节\n目标内容" in result["new_report"]
    assert "## 1.3 第二名城市差距分析\n新增小节内容\n\n# 2. 第二大章" in result["new_report"]
    assert result["edit_strategy"] == "append_new_subsection"
    assert result["new_subsection_title"] == "1.3 第二名城市差距分析"
    assert any("incremental collection skipped" in record.message for record in caplog.records)
    assert any("edit_strategy=append_new_subsection" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_run_new_task_preserves_major_chapter_markup_before_appending_subsection():
    """验证 NEW_TASK 新增小节不会清理目标大章节内已有溯源标记。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 已有小节\n"
        "已有内容[checked_citation: 1][[1]](https://a.com)\n"
        "推理[结论](#inference:7)\n\n"
        "## 1.2 目标小节\n"
        "目标内容[checked_citation: 2][[2]](https://b.com)\n\n"
        "# 2. 第二大章\n"
        "其他内容[checked_citation: 3][[3]](https://c.com)"
    )
    selected_text = "目标内容[checked_citation: 2][[2]](https://b.com)"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "新增一个小节分析第二名城市差距",
    }
    captured_targets = []

    def fake_collect_section_assets(target, current_outline):
        captured_targets.append(target)
        return SectionHistoricalAssets(
            section_id="1.2",
            match_mode="title_exact",
            section_title=target.section_title,
            current_section_text=target.clean_major_section_text,
            historical_plans=[],
            historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        )

    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="需要新增独立小节承载新维度",
        edit_strategy="append_new_subsection",
        subsection_title="新增分析",
    )

    with patch.object(processor, "collect_section_assets", side_effect=fake_collect_section_assets):
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
            with patch.object(
                processor,
                "generate_new_subsection_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.3 新增分析\n新增小节内容",
            ):
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result=final_result,
                    language="zh-CN",
                )

    target = captured_targets[0]
    first_major_chapter = result["new_report"].split("# 2. 第二大章", 1)[0]
    assert "[checked_citation: 1][[1]](https://a.com)" in first_major_chapter
    assert "[结论](#inference:7)" in first_major_chapter
    assert "[checked_citation: 2][[2]](https://b.com)" in first_major_chapter
    assert "其他内容[checked_citation: 3][[3]](https://c.com)" in result["new_report"]
    assert "[checked_citation:" not in target.clean_major_section_text
    assert "(#inference:" not in target.clean_major_section_text
    assert "## 1.3 新增分析\n新增小节内容\n\n# 2. 第二大章" in result["new_report"]


@pytest.mark.asyncio
async def test_run_new_task_preserves_plain_trace_links_before_appending_subsection():
    """验证 NEW_TASK 新增小节不会清理目标大章节内的双中括号溯源链接。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 已有小节\n"
        "已有内容[[1]](https://a.com)\n\n"
        "## 1.2 目标小节\n"
        "目标内容[[2]](https://b.com)\n\n"
        "# 2. 第二大章\n"
        "其他内容[[3]](https://c.com)"
    )
    selected_text = "目标内容[[2]](https://b.com)"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "新增一个小节分析第二名城市差距",
    }
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="需要新增独立小节承载新维度",
        edit_strategy="append_new_subsection",
        subsection_title="新增分析",
    )

    with patch.object(processor, "collect_section_assets") as mock_collect:
        mock_collect.side_effect = lambda target, current_outline: SectionHistoricalAssets(
            section_id="1.2",
            match_mode="title_exact",
            section_title=target.section_title,
            current_section_text=target.clean_major_section_text,
            historical_plans=[],
            historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        )
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
            with patch.object(
                processor,
                "generate_new_subsection_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.3 新增分析\n新增小节内容",
            ):
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result=final_result,
                    language="zh-CN",
                )

    first_major_chapter = result["new_report"].split("# 2. 第二大章", 1)[0]
    reconstructed_report = (
        report[: result["original_start_offset"]]
        + result["rewritten_text"]
        + report[result["original_end_offset"] :]
    )
    assert "[[1]](https://a.com)" in first_major_chapter
    assert "[[2]](https://b.com)" in first_major_chapter
    assert "其他内容[[3]](https://c.com)" in result["new_report"]
    assert reconstructed_report == result["new_report"]


@pytest.mark.asyncio
async def test_run_new_task_can_modify_existing_subsection_other_than_selection():
    """验证修改已有内容时可改写同一大章节内更匹配的小章节。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 政策背景\n"
        "政策旧内容\n\n"
        "## 1.2 产能分析\n"
        "产能内容\n\n"
        "# 2. 第二大章\n"
        "其他内容"
    )
    selected_text = "产能内容"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "补充政策支持内容",
    }
    target = processor.resolve_target_section(report_content=report, feedback=feedback)
    assets = SectionHistoricalAssets(
        section_id="1.1",
        match_mode="title_exact",
        section_title=target.section_title,
        current_section_text=target.clean_major_section_text,
        historical_plans=[],
        historical_doc_infos=[{"title": "政策文档", "url": "https://policy.example.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "政策文档", "url": "https://policy.example.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="政策内容更适合更新 1.1",
        edit_strategy="modify_existing_subsection",
        target_subsection_title="1.1 政策背景",
    )

    with patch.object(processor, "collect_section_assets", return_value=assets):
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
            with patch.object(
                processor,
                "rewrite_section_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.1 政策背景\n政策新内容\n",
            ) as mock_rewrite:
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result=final_result,
                    language="zh-CN",
                )

    rewrite_target = mock_rewrite.await_args.kwargs["target"]
    assert rewrite_target.section_title == "1.1 政策背景"
    assert "## 1.1 政策背景\n政策新内容\n\n## 1.2 产能分析\n产能内容" in result["new_report"]
    assert result["edit_strategy"] == "modify_existing_subsection"


@pytest.mark.asyncio
async def test_run_new_task_retargets_assets_when_modifying_another_subsection():
    """验证跨小节改写会使用实际被改写小节的历史资产和 section_id。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 政策背景\n"
        "政策旧内容\n\n"
        "## 1.2 产能分析\n"
        "产能内容\n"
    )
    selected_text = "产能内容"
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "补充政策支持内容",
    }
    policy_doc = {"title": "政策文档", "url": "https://policy.example.com"}
    capacity_doc = {"title": "产能文档", "url": "https://capacity.example.com"}
    outline = Outline(
        thought="",
        title="报告",
        sections=[
            Section(
                id="1.1",
                title="1.1 政策背景",
                description="",
                plans=[
                    Plan(
                        title="policy",
                        thought="",
                        is_research_completed=False,
                        steps=[
                            Step(
                                type=StepType.INFO_COLLECTING,
                                title="step",
                                description="desc",
                                retrieval_queries=[
                                    RetrievalQuery(query="policy", doc_infos=[policy_doc])
                                ],
                            )
                        ],
                    )
                ],
            ),
            Section(
                id="1.2",
                title="1.2 产能分析",
                description="",
                plans=[
                    Plan(
                        title="capacity",
                        thought="",
                        is_research_completed=False,
                        steps=[
                            Step(
                                type=StepType.INFO_COLLECTING,
                                title="step",
                                description="desc",
                                retrieval_queries=[
                                    RetrievalQuery(query="capacity", doc_infos=[capacity_doc])
                                ],
                            )
                        ],
                    )
                ],
            ),
        ],
    )
    async def fake_assess(assets, feedback, language):
        if assets.section_id == "1.1":
            return NewTaskAssetAssessment(
                relevant_doc_infos=[policy_doc],
                is_sufficient=True,
                missing_aspects=[],
                reasoning_summary="政策小节资料足够",
                edit_strategy="modify_existing_subsection",
                target_subsection_title="1.1 政策背景",
            )
        return NewTaskAssetAssessment(
            relevant_doc_infos=[capacity_doc],
            is_sufficient=True,
            missing_aspects=[],
            reasoning_summary="政策内容更适合更新 1.1",
            edit_strategy="modify_existing_subsection",
            target_subsection_title="1.1 政策背景",
        )

    with patch.object(processor, "_load_current_outline", return_value=outline):
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, side_effect=fake_assess):
            with patch.object(
                processor,
                "rewrite_section_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.1 政策背景\n政策新内容\n",
            ) as mock_rewrite:
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result={"response_content": report},
                    language="zh-CN",
                )

    rewrite_kwargs = mock_rewrite.await_args.kwargs
    assert rewrite_kwargs["target"].section_title == "1.1 政策背景"
    assert rewrite_kwargs["doc_infos"] == [policy_doc]
    assert result["matched_section_id"] == "1.1"
    assert result["section_title"] == "1.1 政策背景"


@pytest.mark.asyncio
async def test_run_new_task_modify_existing_subsection_cleans_only_rewritten_subsection_markup():
    """验证修改已有小节时只清理被改写小节的溯源标记。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    report = (
        "# 1. 第一大章\n"
        "## 1.1 已有小节\n"
        "已有内容[[1]](https://a.com)\n\n"
        "## 1.2 目标小节\n"
        "目标内容[checked_citation: 2][[2]](https://b.com)\n"
        "推理[结论](#inference:7)\n\n"
        "# 2. 第二大章\n"
        "其他内容[[3]](https://c.com)"
    )
    selected_text = "目标内容[checked_citation: 2][[2]](https://b.com)"
    final_result = {
        "response_content": report,
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": selected_text,
        "start_offset": report.index(selected_text),
        "end_offset": report.index(selected_text) + len(selected_text),
        "user_instruction": "修改目标小节内容",
    }
    target = processor.resolve_target_section(report_content=report, feedback=feedback)
    assets = SectionHistoricalAssets(
        section_id="1.2",
        match_mode="title_exact",
        section_title=target.section_title,
        current_section_text=target.clean_major_section_text,
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="需要修改目标小节",
        edit_strategy="modify_existing_subsection",
    )

    with patch.object(processor, "collect_section_assets", return_value=assets):
        with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
            with patch.object(
                processor,
                "rewrite_section_with_assets",
                new_callable=AsyncMock,
                return_value="## 1.2 目标小节\n目标新内容\n",
            ) as mock_rewrite:
                result = await processor.run_new_task(
                    feedback=feedback,
                    final_result=final_result,
                    language="zh-CN",
                )

    rewrite_target = mock_rewrite.await_args.kwargs["target"]
    reconstructed_report = (
        report[: result["original_start_offset"]]
        + result["rewritten_text"]
        + report[result["original_end_offset"] :]
    )
    assert "[[1]](https://a.com)" in result["new_report"]
    assert "[[3]](https://c.com)" in result["new_report"]
    assert "[checked_citation: 2]" not in result["new_report"]
    assert "(#inference:7)" not in result["new_report"]
    assert "[checked_citation: 2]" not in rewrite_target.section_text
    assert "(#inference:7)" not in rewrite_target.section_text
    assert result["original_text"] == target.section_text
    assert reconstructed_report == result["new_report"]


@pytest.mark.asyncio
async def test_run_new_task_raises_when_no_historical_or_incremental_docs_available():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    final_result = {
        "response_content": "## 第一章\n旧章节内容\n## 第二章\n其他内容",
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": "旧章节内容",
        "start_offset": 6,
        "end_offset": 11,
        "user_instruction": "补充行业背景",
    }
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容\n",
        clean_section_text="## 第一章\n旧章节内容\n",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=12,
    )
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容\n",
        historical_plans=[],
        historical_doc_infos=[],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[],
        is_sufficient=False,
        missing_aspects=["行业背景"],
        reasoning_summary="还缺行业背景",
    )

    with patch.object(processor, "resolve_target_section", return_value=target):
        with patch.object(processor, "collect_section_assets", return_value=assets):
            with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
                with patch.object(processor, "build_incremental_plan", new_callable=AsyncMock) as mock_build_plan:
                    mock_build_plan.return_value = Plan(
                        id="",
                        language="zh-CN",
                        title="NEW_TASK incremental research",
                        thought="",
                        is_research_completed=False,
                        steps=[Step(type=StepType.INFO_COLLECTING, title="Collect", description="补充行业背景")],
                    )
                    with patch.object(processor, "run_incremental_collection", new_callable=AsyncMock) as mock_collect:
                        mock_collect.return_value = {
                            "info_summary": "",
                            "doc_infos": [],
                        }
                        with patch.object(processor, "rewrite_section_with_assets", new_callable=AsyncMock) as mock_rewrite:
                            with pytest.raises(CustomValueException) as exc_info:
                                await processor.run_new_task(
                                    feedback=feedback,
                                    final_result=final_result,
                                    language="zh-CN",
                                )

    assert exc_info.value.error_code == StatusCode.USER_FEEDBACK_PROCESSOR_REWRITE_ERROR.code
    assert "No evidence available" in exc_info.value.message
    mock_rewrite.assert_not_awaited()


@pytest.mark.asyncio
async def test_assess_section_assets_parses_structured_llm_response():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    doc_info = {
        "doc_id": "web_1",
        "source_id": "web_1_p123",
        "title": "文档A",
        "url": "https://a.com",
        "publish_time": "2025-05",
        "original_content": "原文A",
        "content_ref": {"type": "source_store", "source_id": "web_1_p123"},
        "scores": {"authority": 8, "relevance": 9, "answerability": 7, "data_density": 6},
        "key_passages": ["关键段落"],
        "brief_reason": "相关",
    }
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[doc_info],
    )
    llm_response = json.dumps(
        {
            "relevant_doc_indices": [1],
            "is_sufficient": True,
            "missing_aspects": [],
            "reasoning_summary": "历史资料足够支撑补写",
        },
        ensure_ascii=False,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_response) as mock_invoke:
        result = await processor.assess_section_assets(
            assets=assets,
            feedback={"user_instruction": "补充行业背景", "selected_text": "旧章节内容"},
            language="zh-CN",
    )

    assert result.is_sufficient is True
    assert result.relevant_doc_infos == [doc_info]
    assert result.reasoning_summary == "历史资料足够支撑补写"
    prompt_vars = mock_invoke.await_args.args[1]
    assert prompt_vars["historical_doc_infos"] == [
        {
            "doc_time": "2025-05",
            "source_authority": "",
            "task_relevance": "",
            "original_content": "原文A",
            "url": "https://a.com",
            "information_richness": "",
            "data_density": "",
            "title": "文档A",
            "query": "",
        }
    ]
    assert mock_invoke.await_args.args[2] == AgentLlmName.USER_FEEDBACK_PROCESSOR_NEW_TASK_ASSESSMENT.value


@pytest.mark.asyncio
async def test_assess_section_assets_defaults_to_modify_when_llm_omits_edit_strategy():
    """验证编辑策略只由 LLM 结构化字段决定,不再依赖用户指令关键词。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="# 1. 第一章\n## 1.1 已有小节\n旧内容",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    llm_response = json.dumps(
        {
            "relevant_doc_indices": [1],
            "is_sufficient": True,
            "missing_aspects": [],
            "reasoning_summary": "历史资料足够支撑补写",
        },
        ensure_ascii=False,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_response):
        result = await processor.assess_section_assets(
            assets=assets,
            feedback={"user_instruction": "请新增一个小节分析第二名城市", "selected_text": "旧内容"},
            language="zh-CN",
        )

    assert result.edit_strategy == "modify_existing_subsection"


@pytest.mark.asyncio
async def test_assess_section_assets_logs_and_falls_back_when_llm_response_is_invalid_json(caplog):
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value="{invalid json"):
        with caplog.at_level(logging.WARNING):
            result = await processor.assess_section_assets(
                assets=assets,
                feedback={"user_instruction": "补充行业背景", "selected_text": "旧章节内容"},
                language="zh-CN",
            )

    assert result.is_sufficient is False
    assert result.relevant_doc_infos == []
    assert result.missing_aspects == ["补充行业背景"]
    assert result.reasoning_summary == "无法稳定解析评估结果,已降级为资料不足。"
    assert any("failed to parse LLM JSON" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_assess_section_assets_logs_filtered_invalid_relevant_doc_indices(caplog):
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[
            {"title": "文档A", "url": "https://a.com"},
            {"title": "文档B", "url": "https://b.com"},
        ],
    )
    llm_response = json.dumps(
        {
            "relevant_doc_indices": [1, 0, -1, 3, "2", True, 2],
            "is_sufficient": True,
            "missing_aspects": [],
            "reasoning_summary": "混合返回有效与无效索引",
        },
        ensure_ascii=False,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_response):
        with caplog.at_level(logging.WARNING):
            result = await processor.assess_section_assets(
                assets=assets,
                feedback={"user_instruction": "补充行业背景", "selected_text": "旧章节内容"},
                language="zh-CN",
            )

    assert result.relevant_doc_infos == [
        {"title": "文档A", "url": "https://a.com"},
        {"title": "文档B", "url": "https://b.com"},
    ]
    assert any("filtered invalid relevant_doc_indices" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_assess_section_assets_downgrades_sufficient_result_without_valid_docs(caplog):
    """验证 LLM 误判资料足够但没有有效文档时会降级为资料不足。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    llm_response = json.dumps(
        {
            "relevant_doc_indices": [2],
            "is_sufficient": True,
            "missing_aspects": [],
            "reasoning_summary": "历史资料足够支撑补写",
        },
        ensure_ascii=False,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_response):
        with caplog.at_level(logging.WARNING):
            result = await processor.assess_section_assets(
                assets=assets,
                feedback={"user_instruction": "补充行业背景", "selected_text": "旧章节内容"},
                language="zh-CN",
            )

    assert result.is_sufficient is False
    assert result.relevant_doc_infos == []
    assert any("downgraded sufficient assessment" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_assess_section_assets_treats_string_false_as_insufficient(caplog):
    """验证 LLM 返回字符串 false 时不会被 Python truthiness 误判为资料充足。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    llm_response = json.dumps(
        {
            "relevant_doc_indices": [1],
            "is_sufficient": "false",
            "missing_aspects": ["行业规模数据"],
            "reasoning_summary": "资料不足",
        },
        ensure_ascii=False,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_response):
        with caplog.at_level(logging.WARNING):
            result = await processor.assess_section_assets(
                assets=assets,
                feedback={"user_instruction": "补充行业背景", "selected_text": "旧章节内容"},
                language="zh-CN",
            )

    assert result.is_sufficient is False
    assert result.relevant_doc_infos == [{"title": "文档A", "url": "https://a.com"}]
    assert any("invalid is_sufficient" in record.message for record in caplog.records)


@pytest.mark.asyncio
async def test_run_new_task_skips_incremental_collection_when_historical_assets_are_sufficient():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    final_result = {
        "response_content": "## 第一章\n旧章节内容\n## 第二章\n其他内容",
        "citation_messages": {"data": []},
        "infer_messages": [],
    }
    feedback = {
        "action": "new_task",
        "selected_text": "旧章节内容",
        "start_offset": 6,
        "end_offset": 11,
        "user_instruction": "补充行业背景",
    }
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容\n",
        clean_section_text="## 第一章\n旧章节内容\n",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=12,
    )
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="第一章",
        current_section_text="## 第一章\n旧章节内容\n",
        historical_plans=[],
        historical_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[{"title": "文档A", "url": "https://a.com"}],
        is_sufficient=True,
        missing_aspects=[],
        reasoning_summary="历史资料足够支撑补写",
    )

    with patch.object(processor, "resolve_target_section", return_value=target):
        with patch.object(processor, "collect_section_assets", return_value=assets):
            with patch.object(processor, "assess_section_assets", new_callable=AsyncMock, return_value=assessment):
                with patch.object(processor, "build_incremental_plan", new_callable=AsyncMock) as mock_build_plan:
                    with patch.object(
                        processor,
                        "run_incremental_collection",
                        new_callable=AsyncMock,
                    ) as mock_collect:
                        with patch.object(processor, "rewrite_section_with_assets", new_callable=AsyncMock) as mock_rewrite:
                            mock_rewrite.return_value = "## 第一章\n新章节内容\n"
                            result = await processor.run_new_task(
                                feedback=feedback,
                                final_result=final_result,
                                language="zh-CN",
                            )

    mock_build_plan.assert_not_awaited()
    mock_collect.assert_not_awaited()
    assert result["new_report"] == "## 第一章\n新章节内容\n## 第二章\n其他内容"
    assert result["used_historical_doc_count"] == 1
    assert result["used_new_doc_count"] == 0
    assert result["incremental_plan"] is None
    assert result["incremental_doc_infos"] == []
    assert result["assessment_summary"] == "历史资料足够支撑补写"


@pytest.mark.parametrize(
    ("title", "expected"),
    [
        ("1. 第一章:", "第一章"),
        ("第2章 市场-现状", "市场现状"),
        (" 结论/建议 ", "结论建议"),
    ],
)
def test_normalize_section_title(title, expected):
    assert NewTaskProcessor._normalize_section_title(title) == expected


def test_build_next_subsection_title_extracts_explicit_title_from_instruction():
    """验证新增小节兜底标题可从用户指令中提取。"""
    target = NewTaskTargetSection(
        section_title="1.2 目标小节",
        section_text="## 1.2 目标小节\n目标内容",
        clean_section_text="## 1.2 目标小节\n目标内容",
        clean_selected_text="目标内容",
        section_start_offset=0,
        section_end_offset=15,
        major_section_title="1. 第一大章",
        major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        clean_major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        major_section_start_offset=0,
        major_section_end_offset=50,
        major_heading_level=1,
    )

    title = NewTaskProcessor._build_next_subsection_title(
        target,
        "请在本章新增一个小节,标题为“第二名城市对比分析”,补充产量差距。",
    )

    assert title == "1.3 第二名城市对比分析"


def test_build_next_subsection_title_renumbers_llm_numbered_title_under_major_chapter():
    """验证 LLM 返回带编号标题时仍按当前大章节追加下一个小节号。"""
    target = NewTaskTargetSection(
        section_title="1.2 目标小节",
        section_text="## 1.2 目标小节\n目标内容",
        clean_section_text="## 1.2 目标小节\n目标内容",
        clean_selected_text="目标内容",
        section_start_offset=0,
        section_end_offset=15,
        major_section_title="1. 第一大章",
        major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        clean_major_section_text="# 1. 第一大章\n## 1.1 已有小节\n内容\n## 1.2 目标小节\n目标内容\n",
        major_section_start_offset=0,
        major_section_end_offset=50,
        major_heading_level=1,
    )

    title = NewTaskProcessor._build_next_subsection_title(
        target,
        "1.2.1 第二名城市对比分析",
    )

    assert title == "1.3 第二名城市对比分析"


def test_build_next_subsection_title_uses_english_fallback_and_extracts_english_title():
    """验证英文报告新增小节标题不会退化为中文兜底文案。"""
    target = NewTaskTargetSection(
        section_title="1.2 Target Subsection",
        section_text="## 1.2 Target Subsection\nTarget content",
        clean_section_text="## 1.2 Target Subsection\nTarget content",
        clean_selected_text="Target content",
        section_start_offset=0,
        section_end_offset=40,
        major_section_title="1. Market Overview",
        major_section_text="# 1. Market Overview\n## 1.1 Current State\nContent\n## 1.2 Target Subsection\nTarget content\n",
        clean_major_section_text="# 1. Market Overview\n## 1.1 Current State\nContent\n## 1.2 Target Subsection\nTarget content\n",
        major_section_start_offset=0,
        major_section_end_offset=90,
        major_heading_level=1,
    )

    extracted_title = NewTaskProcessor._build_next_subsection_title(
        target,
        'Please add a subsection titled "Competitive Risk".',
        language="en",
    )
    fallback_title = NewTaskProcessor._build_next_subsection_title(target, "", language="en")

    assert extracted_title == "1.3 Competitive Risk"
    assert fallback_title == "1.3 Additional Analysis"
    assert "新增" not in fallback_title


@pytest.mark.asyncio
async def test_build_incremental_plan_uses_english_description_labels():
    """验证英文报告增量采集计划不会使用中文字段标签。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    assets = SectionHistoricalAssets(
        section_id="1",
        match_mode="title_exact",
        section_title="Market Overview",
        current_section_text="## Market Overview\nOld content",
        historical_plans=[],
        historical_doc_infos=[],
    )
    assessment = NewTaskAssetAssessment(
        relevant_doc_infos=[],
        is_sufficient=False,
        missing_aspects=["market size", "competitor risk"],
        reasoning_summary="Need more evidence.",
    )

    plan = await processor.build_incremental_plan(
        assets=assets,
        feedback={"user_instruction": "Add competitor risk analysis."},
        language="en",
        assessment=assessment,
    )

    description = plan.steps[0].description
    assert "User request:" in description
    assert "Section title:" in description
    assert "Missing information:" in description
    assert "用户要求" not in description
    assert "章节标题" not in description
    assert "待补充信息" not in description


@pytest.mark.asyncio
async def test_rewrite_section_with_assets_returns_clean_section_text():
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )

    with patch.object(
        processor,
        "_invoke_prompt",
        new_callable=AsyncMock,
        return_value="## 第一章\n新章节内容\n",
    ) as mock_invoke:
        result = await processor.rewrite_section_with_assets(
            target=target,
            feedback={"user_instruction": "补充行业背景"},
            doc_infos=[
                {
                    "doc_id": "web_1",
                    "source_id": "web_1_p123",
                    "title": "文档A",
                    "url": "https://a.com",
                    "publish_time": "2025-05",
                    "original_content": "原文A",
                    "content_ref": {"type": "source_store", "source_id": "web_1_p123"},
                    "scores": {"authority": 8},
                    "key_passages": ["关键段落"],
                }
            ],
            language="zh-CN",
        )

    assert result == "## 第一章\n新章节内容"
    prompt_vars = mock_invoke.await_args.args[1]
    assert prompt_vars["doc_infos"] == [
        {
            "doc_time": "2025-05",
            "source_authority": "",
            "task_relevance": "",
            "original_content": "原文A",
            "url": "https://a.com",
            "information_richness": "",
            "data_density": "",
            "title": "文档A",
            "query": "",
        }
    ]
    assert mock_invoke.await_args.args[2] == AgentLlmName.USER_FEEDBACK_PROCESSOR_NEW_TASK_REWRITE_SECTION.value


def test_validate_rewritten_section_rejects_preamble_before_heading():
    """验证改写已有小节时不允许在目标标题前输出解释性文本。"""
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )

    with pytest.raises(CustomValueException) as exc_info:
        NewTaskProcessor._validate_rewritten_section(
            target=target,
            rewritten_section="说明如下\n## 第一章\n新章节内容",
        )

    assert exc_info.value.error_code == StatusCode.USER_FEEDBACK_PROCESSOR_REWRITE_ERROR.code
    assert "must start with original section title" in exc_info.value.message


@pytest.mark.asyncio
@pytest.mark.parametrize(
    ("llm_output", "message_fragment"),
    [
        ("", "empty"),
        ("新章节内容", "missing original section title"),
        ("# 总览\n## 第一章\n新章节内容", "unexpected higher-level headings"),
    ],
)
async def test_rewrite_section_with_assets_rejects_invalid_structure(llm_output, message_fragment):
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value=llm_output):
        with pytest.raises(CustomValueException) as exc_info:
            await processor.rewrite_section_with_assets(
                target=target,
                feedback={"user_instruction": "补充行业背景"},
                doc_infos=[{"title": "文档A", "url": "https://a.com"}],
                language="zh-CN",
            )

    assert exc_info.value.error_code == StatusCode.USER_FEEDBACK_PROCESSOR_REWRITE_ERROR.code
    assert message_fragment in exc_info.value.message


@pytest.mark.asyncio
async def test_rewrite_section_with_assets_rejects_heading_level_drift():
    """验证整章改写不能改变原章节标题层级。"""
    processor = NewTaskProcessor(llm_model_name="mock_model")
    target = NewTaskTargetSection(
        section_title="第一章",
        section_text="## 第一章\n旧章节内容",
        clean_section_text="## 第一章\n旧章节内容",
        clean_selected_text="旧章节内容",
        section_start_offset=0,
        section_end_offset=11,
    )

    with patch.object(processor, "_invoke_prompt", new_callable=AsyncMock, return_value="### 第一章\n新章节内容"):
        with pytest.raises(CustomValueException) as exc_info:
            await processor.rewrite_section_with_assets(
                target=target,
                feedback={"user_instruction": "补充行业背景"},
                doc_infos=[{"title": "文档A", "url": "https://a.com"}],
                language="zh-CN",
            )

    assert exc_info.value.error_code == StatusCode.USER_FEEDBACK_PROCESSOR_REWRITE_ERROR.code
    assert "heading level" in exc_info.value.message