from unittest.mock import patch, AsyncMock, MagicMock
import pytest
from openjiuwen_deepsearch.algorithm.report.report import (
Reporter,
VisualizationInsertRenderContext,
_deduplicate_and_renumber_ref,
_replace_citations_and_classified_index,
_get_classified_infos,
)
from openjiuwen_deepsearch.common.common_constants import CHINESE, ENGLISH
@pytest.mark.parametrize("input_str, expected", [
("第一章 Python入门", "Python入门"),
("第十2章 高级用法", "高级用法"),
("二、异常处理", "异常处理"),
("3.4 数据结构", "数据结构"),
("12-5 算法分析", "算法分析"),
("第九章", ""),
("Chapter Intro", "Chapter Intro"),
])
def test_strip_leading_number(input_str, expected):
assert Reporter.strip_leading_number(input_str) == expected
@pytest.mark.parametrize(
"input_md, expected",
[
(
"# 五、潜在挑战与风险管理策略建议",
"# 潜在挑战与风险管理策略建议"
),
(
"## (二) 方法论",
"## 方法论"
),
(
"### (一)研发孵化期",
"### 研发孵化期"
),
(
"### 1. 目标",
"### 目标"
),
(
"### (1) 目标",
"### 目标"
),
(
"### (1) 目标",
"### 目标"
),
(
"#### 数据来源",
"- **数据来源**"
),
(
"##### 进一步细节",
"- **进一步细节**"
),
(
"#### 1.进一步细节",
"- **进一步细节**"
),
(
"#### 1. 进一步细节",
"- **进一步细节**"
),
(
"### (二) 方法论\n#### 数据来源",
"### 方法论\n- **数据来源**"
),
(
"这是正文",
"这是正文"
),
]
)
def test_clean_markdown(input_md, expected):
assert Reporter.clean_markdown_headers(input_md) == expected
@pytest.mark.parametrize("text, section_idx, expected", [
('5 财务分析\n5.1 三张报表分析框架\n5.2 关键财务比率分析\n5.3 同行业对比分析方法\n5.4 财务风险识别与评估', 5, True),
("1 主章节\n1.1 子章节一\n1.2 子章节二", 1, True),
("1.1 子章节一\n1.2 子章节二", 1, False),
("1 主章节\n1 主章节重复", 1, False),
("1 主章节\n1.2 子章节二", 1, False),
("1 主章节\n1.1 子章节一\n1.1.1 第三层", 1, False),
("1 主章节\n123", 1, False),
("", 1, False),
])
def test_is_valid_chapter_format(text, section_idx, expected):
assert Reporter.is_valid_chapter_format(text, section_idx) == expected
@pytest.mark.parametrize("content, refs, lang, expected", [
("这是正文", ["参考A", "参考B"], CHINESE,
"这是正文\n## 参考文章\n[1] 参考A\n[2] 参考B"),
("This is content", ["Ref A", "Ref B"], ENGLISH,
"This is content\n## References\n[1] Ref A\n[2] Ref B"),
("正文内容", [], CHINESE, "正文内容"),
("", ["Ref A"], ENGLISH, ""),
("Contenu", ["Réf A"], "fr",
"Contenu\n## References\n[1] Réf A"),
])
def test_add_references(content, refs, lang, expected):
result = Reporter.add_references(content, refs, lang)
assert result == expected
def test_apply_visualization_insertions_escapes_image_title_html():
context = VisualizationInsertRenderContext(
report_lines=["第一段\n", "第二段\n"],
insertions=[{"after_row": 1, "index": 1}],
mermaid_map={1: "graph TD\nA-->B"},
title_meta_map={
1: {
"image_title": '<img src=x onerror="alert(1)">',
"citation_index": 7,
}
},
newline="\n",
language=CHINESE,
)
result = Reporter._apply_visualization_insertions(context)
assert '<img src=x onerror="alert(1)">' not in result
assert "<img src=x onerror="alert(1)">[citation:7]" in result
@pytest.mark.asyncio
@patch("openjiuwen_deepsearch.algorithm.report.report.ainvoke_llm_with_stats", new_callable=AsyncMock)
@patch("openjiuwen_deepsearch.algorithm.report.report.llm_context", new_callable=MagicMock)
async def test_generate_with_llm_returns_content(mock_llm_cls, mock_ainvoke_llm):
mock_ainvoke_llm.return_value = {"content": "mocked response"}
mock_llm_instance = MagicMock()
mock_llm_cls.return_value = mock_llm_instance
reporter = Reporter("basic")
reporter.gen_report_context = {}
result = await reporter._generate_with_llm(
task_type="abstract",
prompt="report_abstract_markdown",
content="test content"
)
assert result == "mocked response"
mock_ainvoke_llm.assert_awaited_once()
args, kwargs = mock_ainvoke_llm.call_args
assert kwargs["agent_name"] is not None
assert any(msg["role"] == "user" for msg in kwargs["messages"])
@pytest.mark.asyncio
@patch("openjiuwen_deepsearch.algorithm.report.report.ainvoke_llm_with_stats", new_callable=AsyncMock)
@patch("openjiuwen_deepsearch.algorithm.report.report.llm_context", new_callable=MagicMock)
async def test_generate_with_llm_rejects_unknown_task_type(mock_llm_cls, mock_ainvoke_llm):
mock_llm_instance = MagicMock()
mock_llm_cls.return_value = mock_llm_instance
reporter = Reporter("basic")
reporter.gen_report_context = {}
with pytest.raises(KeyError, match="Unsupported report task type"):
await reporter._generate_with_llm(
task_type="summary",
prompt="report_abstract_markdown",
content="test content"
)
mock_ainvoke_llm.assert_not_awaited()
@patch("openjiuwen_deepsearch.algorithm.report.report.llm_context", new_callable=MagicMock)
def test_set_context_variables_none(mock_llm_cls):
reporter = Reporter("basic")
result = reporter._set_context_variables(None)
assert result is False
assert reporter.gen_report_context is None
@patch("openjiuwen_deepsearch.algorithm.report.report.llm_context", new_callable=MagicMock)
def test_set_context_variables_dict(mock_llm_cls):
reporter = Reporter("basic")
ctx = {"foo": "bar"}
result = reporter._set_context_variables(ctx)
assert result is True
assert reporter.gen_report_context == ctx
def test_deduplicate_and_renumber_with_ref_empty_input():
text = ""
result, mapping = _deduplicate_and_renumber_ref(text)
assert result == ""
assert mapping == {}
def test_deduplicate_and_renumber_with_ref_single_reference():
text = "[1] First reference"
result, mapping = _deduplicate_and_renumber_ref(text)
assert result == "[1] First reference"
assert mapping == {"1-1": 1}
def test_deduplicate_and_renumber_with_ref_duplicate_references_same_paragraph():
text = "[1] First reference\n[2] First reference"
result, mapping = _deduplicate_and_renumber_ref(text)
assert result == "[1] First reference"
assert mapping == {"1-1": 1, "1-2": 1}
def test_deduplicate_and_renumber_with_multiple_paragraphs_and_sections():
text = "[1] First reference\n\n[1] Second reference\n[2] First reference"
result, mapping = _deduplicate_and_renumber_ref(text)
assert "[1] First reference" in result
assert "[2] Second reference" in result
assert mapping["1-1"] == 1
assert mapping["3-1"] == 2
assert mapping["3-2"] == 1
def test_deduplicate_and_renumber_with_ignore_lines_without_reference():
text = "This is not a ref\n[1] Valid reference"
result, mapping = _deduplicate_and_renumber_ref(text)
assert result == "[1] Valid reference"
assert mapping == {"1-1": 1}
@pytest.mark.parametrize("paragraphs, classified_contents, ref_map, expected", [
(
["This is a paragraph [citation:1].", "Another paragraph [citation:2]."],
[
[{"index": 1, "content": "First citation"}],
[{"index": 2, "content": "Second citation"}]
],
{"1-1": 10, "2-2": 20},
(["This is a paragraph [citation:10].", "Another paragraph [citation:20]."], [
[{"index": 10, "content": "First citation"}],
[{"index": 20, "content": "Second citation"}]
])
),
(
["This is a paragraph [citation:1].", "Another paragraph [citation:2]."],
[
[{"index": 1, "content": "First citation"}],
[{"index": 2, "content": "Second citation"}]
],
{},
(["This is a paragraph [citation:1].", "Another paragraph [citation:2]."], [
[{"index": 1, "content": "First citation"}],
[{"index": 2, "content": "Second citation"}]
])
),
(
["This is a paragraph [citation:1].", "Another paragraph [citation:2]."],
[],
{"1-1": 10, "2-2": 20},
(["This is a paragraph [citation:1].", "Another paragraph [citation:2]."], [])
),
(
[],
[],
{"1-1": 10, "2-2": 20},
([], [])
)
])
def test_replace_citations_and_classified_index(paragraphs, classified_contents, ref_map, expected):
result = _replace_citations_and_classified_index(paragraphs, classified_contents, ref_map)
assert result == expected
@pytest.mark.parametrize(
"doc_infos, urls, expected_infos, expected_docs",
[
([], ["http://a.com"], {}, []),
([{"url": "http://a.com", "title": "A", "original_content": "contentA"}], [], {}, []),
(
[{"url": "http://a.com", "title": "A", "original_content": "contentA"}],
["http://a.com"],
{"references": ["[A](http://a.com)"], "core_content_list": ["contentA"]},
[{"url": "http://a.com", "title": "A", "original_content": "contentA"}],
),
(
[
{"url": "http://a.com", "title": "A", "original_content": "contentA"},
{"url": "http://b.com", "title": "B", "original_content": "contentB"},
{"url": "http://c.com", "title": "C", "original_content": "contentC"},
],
["http://a.com", "http://b.com"],
{
"references": [
"[A](http://a.com)",
"[B](http://b.com)"
],
"core_content_list": [
"contentA",
"contentB"
]
},
[
{"url": "http://a.com", "title": "A", "original_content": "contentA"},
{"url": "http://b.com", "title": "B", "original_content": "contentB"},
],
),
(
[
{
"url": "https://example.test/",
"title": "x](javascript:alert(1)) [safe",
"original_content": "contentA",
}
],
["https://example.test/"],
{
"references": [
"[x\\]\\(javascript:alert\\(1\\)\\) \\[safe](https://example.test/)"
],
"core_content_list": ["contentA"],
},
[
{
"url": "https://example.test/",
"title": "x](javascript:alert(1)) [safe",
"original_content": "contentA",
}
],
),
(
[{"url": "javascript:alert(2)", "title": "benign", "original_content": "contentB"}],
["javascript:alert(2)"],
{
"references": ["benign (javascript:alert\\(2\\))"],
"core_content_list": ["contentB"],
},
[{"url": "javascript:alert(2)", "title": "benign", "original_content": "contentB"}],
),
],
)
def test_get_classified_infos(doc_infos, urls, expected_infos, expected_docs):
classified_infos, classified_doc_infos = _get_classified_infos(doc_infos, urls)
assert classified_infos == expected_infos
assert classified_doc_infos == expected_docs