from openjiuwen_deepsearch.algorithm.research_collector.collector_evidence import (
CollectorSourceStore,
build_content_dedup_hash,
build_content_ref,
build_evaluation_documents,
build_evidence_atom,
build_legacy_doc_info_view,
build_legacy_doc_infos_view,
build_summary_evidence_pack,
build_supervisor_evidence_table,
extract_key_passages,
generate_doc_id,
generate_source_id,
hydrate_legacy_doc_info_fields,
MAX_PASSAGE_LENGTH,
normalize_content_for_dedup,
normalize_scores,
read_content_by_ref,
split_passages,
)
def test_generate_doc_id_is_stable_for_same_web_source():
first = generate_doc_id(url="https://example.com/a?utm_source=x", title="Alpha", source_type="web")
second = generate_doc_id(url="https://example.com/a?utm_source=y", title="Alpha", source_type="web")
assert first == second
assert first.startswith("web_")
def test_generate_doc_id_sorts_remaining_query_parameters():
first = generate_doc_id(url="https://example.com/a?b=2&a=1", title="Alpha", source_type="web")
second = generate_doc_id(url="https://example.com/a?a=1&b=2", title="Alpha", source_type="web")
assert first == second
def test_generate_doc_id_ignores_tracking_query_case_insensitively():
first = generate_doc_id(url="https://example.com/a?UTM_SOURCE=x&id=1", title="Alpha", source_type="web")
second = generate_doc_id(url="https://example.com/a?id=1", title="Alpha", source_type="web")
assert first == second
def test_generate_doc_id_normalizes_root_url_path():
first = generate_doc_id(url="https://example.com", title="Alpha", source_type="web")
second = generate_doc_id(url="https://example.com/", title="Alpha", source_type="web")
assert first == second
def test_generate_doc_id_uses_local_file_identity():
doc_id = generate_doc_id(
url="localdataset://result//kb-1//file-9",
title="Ignored Title",
source_type="local",
)
assert doc_id.startswith("local_")
assert "kb-1" not in doc_id
assert "file-9" not in doc_id
def test_source_id_defaults_to_doc_id_in_phase_one():
doc_id = "web_123"
assert generate_source_id(doc_id=doc_id) == doc_id
def test_source_id_distinguishes_evidence_content_under_same_doc_id():
"""同一原文文档下的不同证据内容应生成不同 source_id。"""
doc_id = "web_123"
first = generate_source_id(doc_id=doc_id, content="第一段证据")
second = generate_source_id(doc_id=doc_id, content="第二段证据")
assert first != second
assert first.startswith(f"{doc_id}_p")
assert second.startswith(f"{doc_id}_p")
def test_source_id_uses_normalized_content_for_hashing():
doc_id = "web_123"
first = generate_source_id(doc_id=doc_id, content="A B\r\nC")
second = generate_source_id(doc_id=doc_id, content="A B C")
assert normalize_content_for_dedup("A B\r\nC") == "A B C"
assert build_content_dedup_hash("A B\r\nC") == build_content_dedup_hash("A B C")
assert first == second
def test_source_store_round_trip_with_content_ref():
store = CollectorSourceStore()
doc_id = "web_123"
store.write(doc_id, "完整正文")
content_ref = build_content_ref(doc_id=doc_id, stored=True)
assert read_content_by_ref(content_ref, store, legacy_content="fallback") == "完整正文"
def test_source_store_preserves_first_content_on_duplicate_source_id(caplog):
store = CollectorSourceStore()
assert store.write("web_123", "第一版正文") is True
assert store.write("web_123", "第二版正文") is True
assert store.read("web_123") == "第一版正文"
assert "source_store source_id conflict" in caplog.text
def test_source_store_from_dict_handles_none_and_invalid_input():
"""source_store 从无效 session 值恢复时应返回空 store。"""
assert CollectorSourceStore.from_dict(None).to_dict() == {}
assert CollectorSourceStore.from_dict(["invalid"]).to_dict() == {}
def test_content_ref_falls_back_to_legacy_doc_infos_when_store_missing(caplog):
store = CollectorSourceStore()
content_ref = build_content_ref(doc_id="web_missing", stored=True)
assert read_content_by_ref(content_ref, store, legacy_content="兼容正文") == "兼容正文"
assert "content_ref missing in source_store" in caplog.text
def test_content_ref_legacy_doc_infos_returns_legacy_content_directly(caplog):
"""legacy_doc_infos 引用不应尝试读取 source_store,直接返回兼容正文。"""
store = CollectorSourceStore()
content_ref = build_content_ref(doc_id="web_legacy", stored=False)
assert read_content_by_ref(content_ref, store, legacy_content="兼容正文") == "兼容正文"
assert "content_ref missing in source_store" not in caplog.text
def test_build_legacy_doc_info_view_returns_old_doc_info_shape():
"""旧报告链路视图只应包含未适配节点需要的 legacy 字段。"""
doc_info = {
"doc_id": "web_1",
"source_id": "web_1_p123",
"title": "标题",
"url": "https://example.com",
"source": "example.com",
"publish_time": "2025-05",
"doc_time": "",
"query": "查询",
"key_passages": ["关键段落"],
"scores": {"authority": 8, "relevance": 9, "answerability": 7, "data_density": 6},
"brief_reason": "相关",
"content_ref": {"type": "source_store", "source_id": "web_1_p123"},
"original_content": "原文",
"source_authority": "权威性说明",
"task_relevance": "相关性说明",
"information_richness": "丰富度说明",
"data_density": "数据密度说明",
}
legacy_view = build_legacy_doc_info_view(doc_info)
assert legacy_view == {
"doc_time": "2025-05",
"source_authority": "权威性说明",
"task_relevance": "相关性说明",
"original_content": "原文",
"url": "https://example.com",
"information_richness": "丰富度说明",
"data_density": "数据密度说明",
"title": "标题",
"query": "查询",
}
def test_build_legacy_doc_infos_view_does_not_mutate_source_doc_infos():
"""批量转换旧报告链路视图时不应修改原始 doc_infos。"""
doc_infos = [{"title": "标题", "url": "https://example.com", "doc_id": "web_1"}]
legacy_view = build_legacy_doc_infos_view(doc_infos)
assert legacy_view == [
{
"doc_time": "",
"source_authority": "",
"task_relevance": "",
"original_content": "",
"url": "https://example.com",
"information_richness": "",
"data_density": "",
"title": "标题",
"query": "",
}
]
assert doc_infos == [{"title": "标题", "url": "https://example.com", "doc_id": "web_1"}]
def test_extract_key_passages_prefers_query_matches_and_data_dense_text():
content = (
"泛泛介绍新能源行业。\n"
"宁德时代 2025 年动力电池装机量增长 18%,市场份额达到 37%。\n"
"其他无关描述。\n"
"宁德时代海外收入同比增长 21%,欧洲客户订单增加。"
)
passages = extract_key_passages(content, query="宁德时代 市场份额 海外收入", title="宁德时代经营表现")
assert len(passages) == 2
assert "市场份额" in passages[0] or "海外收入" in passages[0]
assert all(len(passage) <= 500 for passage in passages)
def test_extract_key_passages_falls_back_to_front_passages_when_no_match():
content = "第一段没有关键词。\n第二段仍然没有关键词。\n第三段也没有。"
passages = extract_key_passages(content, query="完全不同", title="标题")
assert passages == ["第一段没有关键词。", "第二段仍然没有关键词。", "第三段也没有。"]
def test_extract_key_passages_does_not_treat_numeric_density_as_keyword_match():
content = (
"第一段行业背景。\n"
"无关公司在 2025 年收入增长 99%,利润率提升 18%。\n"
"第三段其他背景。"
)
passages = extract_key_passages(content, query="完全不同", title="标题")
assert passages == ["第一段行业背景。", "无关公司在 2025 年收入增长 99%,利润率提升 18%。", "第三段其他背景。"]
def test_extract_key_passages_splits_chinese_sentences_without_spaces():
content = "第一句无关。第二句包含收入增长 20%。第三句包含利润率 15%。"
passages = extract_key_passages(content, query="收入 利润率", title="经营数据")
assert any("收入增长" in passage for passage in passages)
assert any("利润率" in passage for passage in passages)
def test_split_passages_keeps_decimal_and_version_dots_intact():
content = "利润率提升 1.5%,版本 3.10.2 已发布,详情见 example.com。Revenue grew. Margin improved."
passages = split_passages(content)
assert passages == [
"利润率提升 1.5%,版本 3.10.2 已发布,详情见 example.com。",
"Revenue grew.",
"Margin improved.",
]
def test_build_evidence_atom_excludes_original_content_from_atom():
store = CollectorSourceStore()
record = {
"url": "https://example.com/a",
"title": "Alpha",
"content": "Alpha 在 2025 年收入增长 10%。第二段。",
"type": "page",
}
atom, doc_info = build_evidence_atom(record=record, query="Alpha 收入", source_store=store)
assert atom["doc_id"] == doc_info["doc_id"]
assert atom["source_id"] == doc_info["source_id"]
assert atom["content_ref"]["type"] == "source_store"
assert atom["content_ref"]["source_id"] == atom["source_id"]
assert "original_content" not in atom
assert doc_info["original_content"] == record["content"]
assert store.read(atom["source_id"]) == record["content"]
def test_build_evidence_atom_keeps_distinct_content_for_same_doc_id():
"""同 URL/title 的不同 content 应保留为同 doc_id 下的不同 evidence。"""
store = CollectorSourceStore()
first_record = {
"url": "https://example.com/a",
"title": "Alpha",
"content": "Alpha 第一段收入增长 10%。",
"type": "page",
}
second_record = {
"url": "https://example.com/a",
"title": "Alpha",
"content": "Alpha 第二段利润率提升 5%。",
"type": "page",
}
_, first_doc = build_evidence_atom(record=first_record, query="Alpha 收入", source_store=store)
_, second_doc = build_evidence_atom(record=second_record, query="Alpha 利润率", source_store=store)
assert first_doc["doc_id"] == second_doc["doc_id"]
assert first_doc["source_id"] != second_doc["source_id"]
assert first_doc["content_ref"]["source_id"] == first_doc["source_id"]
assert second_doc["content_ref"]["source_id"] == second_doc["source_id"]
assert read_content_by_ref(first_doc["content_ref"], store) == first_record["content"]
assert read_content_by_ref(second_doc["content_ref"], store) == second_record["content"]
def test_build_evidence_atom_truncates_legacy_content_to_collector_limit():
from openjiuwen_deepsearch.common.common_constants import MAX_COLLECTOR_DOC_CONTENT_LENGTH
store = CollectorSourceStore()
record = {
"url": "https://example.com/large",
"title": "Large",
"content": "A" * (MAX_COLLECTOR_DOC_CONTENT_LENGTH + 1),
"type": "page",
}
atom, doc_info = build_evidence_atom(record=record, query="large", source_store=store)
assert len(doc_info["original_content"]) == MAX_COLLECTOR_DOC_CONTENT_LENGTH
assert len(store.read(atom["source_id"])) == MAX_COLLECTOR_DOC_CONTENT_LENGTH
def test_build_evidence_atom_logs_when_source_store_write_fails(monkeypatch, caplog):
store = CollectorSourceStore()
monkeypatch.setattr(store, "write", lambda doc_id, content: False)
record = {"url": "https://example.com/a", "title": "Alpha", "content": "正文", "type": "page"}
atom, doc_info = build_evidence_atom(record=record, query="Alpha", source_store=store)
assert atom["content_ref"]["type"] == "legacy_doc_infos"
assert doc_info["content_ref"]["type"] == "legacy_doc_infos"
assert "failed to write source_store" in caplog.text
def test_build_prompt_views_never_include_original_content():
doc_infos = [
{
"doc_id": "web_1",
"source_id": "web_1",
"title": "Alpha",
"url": "https://example.com/a",
"source": "example.com",
"publish_time": "2025 1月",
"snippet": "不应进入 compact view 的 snippet",
"key_passages": ["关键片段"],
"summary": "不应进入 compact view 的 summary",
"scores": {"relevance": 8, "answerability": 7, "authority": 6, "data_density": 5},
"original_content": "很长的正文",
}
]
evaluation_docs = build_evaluation_documents(doc_infos)
supervisor_table = build_supervisor_evidence_table(doc_infos)
summary_pack = build_summary_evidence_pack(doc_infos)
assert "original_content" not in str(evaluation_docs)
assert "original_content" not in str(supervisor_table)
assert "original_content" not in str(summary_pack)
assert "snippet" not in evaluation_docs[0]
assert "summary" not in evaluation_docs[0]
assert "snippet" not in supervisor_table[0]
assert "summary" not in supervisor_table[0]
assert "snippet" not in summary_pack["sources"][0]
assert "summary" not in summary_pack["sources"][0]
assert evaluation_docs[0]["key_passages"] == ["关键片段"]
assert supervisor_table[0]["source_id"] == "web_1"
assert summary_pack["sources"][0]["source_id"] == "web_1"
def test_build_supervisor_evidence_table_sorts_all_items():
doc_infos = [
{
"doc_id": f"web_{idx}",
"source_id": f"web_{idx}",
"title": f"Doc {idx}",
"scores": {"relevance": idx, "answerability": idx, "authority": 1, "data_density": idx},
"key_passages": ["关键片段"],
}
for idx in range(30)
]
table = build_supervisor_evidence_table(doc_infos)
assert len(table) == 30
assert table[0]["source_id"] == "web_29"
def test_build_evidence_views_sort_all_items():
doc_infos = [
{
"doc_id": f"web_{idx}",
"source_id": f"web_{idx}",
"title": f"Doc {idx}",
"scores": {"relevance": 30 - idx, "answerability": 30 - idx, "authority": 1, "data_density": 1},
"key_passages": ["P" * 500, "Q" * 500],
}
for idx in range(30)
]
table = build_supervisor_evidence_table(doc_infos)
pack = build_summary_evidence_pack(doc_infos)
assert len(table) == 30
assert len(pack["sources"]) == 30
assert table[0]["source_id"] == "web_0"
assert pack["sources"][0]["source_id"] == "web_0"
def test_build_supervisor_evidence_table_truncates_single_oversized_item():
doc_infos = [
{
"doc_id": "web_big",
"source_id": "web_big",
"title": "T" * 1000,
"source": "S" * 1000,
"key_passages": ["P" * 5000, "Q" * 5000],
"scores": {"relevance": 10, "answerability": 10, "authority": 10, "data_density": 10},
}
]
table = build_supervisor_evidence_table(doc_infos)
assert len(table) == 1
assert len(table[0]["title"]) == 120
assert "summary" not in table[0]
assert all(len(passage) == MAX_PASSAGE_LENGTH for passage in table[0]["key_passages"])
assert len(str(table)) < 2500
def test_normalize_scores_accepts_missing_and_numeric_values():
scores = normalize_scores({"authority": "8.5", "relevance": 9, "answerability": None})
assert scores == {
"authority": 8.5,
"relevance": 9.0,
"answerability": None,
"data_density": None,
}
def test_hydrate_legacy_doc_info_fields_keeps_only_downstream_fields():
doc_info = {
"scores": {"authority": 8.0, "relevance": 9.0, "answerability": 7.5, "data_density": 6.0},
"publish_time": "2025 5月",
}
hydrated = hydrate_legacy_doc_info_fields(doc_info)
assert hydrated["source_authority"] == "该篇文章的信息来源权威性和可信度得分:8.0"
assert hydrated["task_relevance"] == "该篇文章的内容与当前任务的相关性得分:9.0"
assert hydrated["information_richness"] == "该篇文章的信息丰富程度与可答性得分:7.5"
assert hydrated["data_density"] == "该篇文章的数据丰富和密集程度得分:6.0"
assert hydrated["doc_time"] == "2025 5月"
assert "_legacy_compatibility_fields" not in hydrated