import asyncio
import html
from datetime import datetime, timezone
from copy import deepcopy
import json
import logging
import re
import uuid
from dataclasses import dataclass
from typing import Tuple, List, Dict
from urllib.parse import urlparse
from tenacity import (
RetryError,
after_log,
retry,
stop_after_attempt,
retry_if_exception_type,
)
from openjiuwen_deepsearch.algorithm.prompts.template import apply_system_prompt
from openjiuwen_deepsearch.algorithm.research_collector.collector_evidence import build_legacy_doc_infos_view
from openjiuwen_deepsearch.algorithm.report.config import ReportFormat
from openjiuwen_deepsearch.algorithm.report.doc_prefilter import (
build_balanced_doc_batches,
build_doc_variant_key,
extract_doc_score,
prefilter_doc_infos_for_classification,
)
from openjiuwen_deepsearch.algorithm.report.report_utils import (
ArticlePart,
MarkdownOutlineRenumber,
XYChartMermaidGenerator,
PieChartMermaidGenerator,
TimelineChartMermaidGenerator,
validate_visualization_extraction_schema,
validate_visualization_normalization_schema,
)
from openjiuwen_deepsearch.algorithm.report.table_caption_utils import ensure_markdown_table_captions
from openjiuwen_deepsearch.common.exception import CustomValueException
from openjiuwen_deepsearch.common.status_code import StatusCode
from openjiuwen_deepsearch.config.config import Config
from openjiuwen_deepsearch.framework.openjiuwen.agent.search_context import Outline
from openjiuwen_deepsearch.common.common_constants import CHINESE, ENGLISH
from openjiuwen_deepsearch.utils.common_utils.llm_utils import ainvoke_llm_with_stats
from openjiuwen_deepsearch.utils.common_utils.stream_utils import get_current_time, MessageType, StreamEvent
from openjiuwen_deepsearch.utils.log_utils.log_manager import LogManager
from openjiuwen_deepsearch.utils.constants_utils.node_constants import AgentLlmName, NodeId
from openjiuwen_deepsearch.utils.constants_utils.session_contextvars import llm_context, session_context
logger = logging.getLogger(__name__)
EFFECT_SUB_REPORT_TAG = "### sub_report_tag ###"
MAX_LOOP_ROUND = 10
@dataclass
class VisualizationInsertPlanContext:
messages: list
current_inputs: Dict
report_lines: list[str]
invalid_rows: set[int]
mermaid_map: dict[int, str]
original_report: str
@dataclass
class VisualizationInsertRenderContext:
report_lines: list[str]
insertions: list[dict]
mermaid_map: dict[int, str]
title_meta_map: dict[int, dict]
newline: str
language: str
class Reporter:
def __init__(self, llm_model_name):
self._llm = llm_context.get().get(llm_model_name)
self.gen_report_context = None
@staticmethod
def strip_leading_number(s: str) -> str:
"""移除标题前导编号并返回清洗后的文本。"""
return re.sub(
r"^(?:\d+(?:[.\-\s]\d+)*|第?[一二三四五六七八九十\d]+[、章])\s*", "", s
)
@staticmethod
def _section_sort_key(section_id) -> tuple[int, int | str]:
"""Keep report sections ordered numerically when section ids are strings."""
text = str(section_id).strip()
if text.isdigit():
return 0, int(text)
return 1, text
@staticmethod
def _make_payload(message_id: str, event: str, content: str = "") -> dict:
payload = {
"message_id": message_id,
"agent": NodeId.SUB_REPORTER.value,
"content": content,
"message_type": MessageType.MESSAGE_CHUNK.value,
"event": event,
"created_time": get_current_time()
}
return payload
@staticmethod
def clean_markdown_headers(md_text: str) -> str:
"""
Process Markdown text:
1. Remove numbering from H1-H3 headers (e.g. "一、", "(一)", "1.", "(1)", "(1)").
2. Convert H4+ headers to unordered list items and remove numbering.
"""
def clean_header(line: str, level: int) -> str:
"""
Generic header cleanup helper.
level is the header level (number of '#').
"""
pattern = rf'^\s*{"#" * level}\s*[\(\(]?[一二三四五六七八九十0-9]+[\.、\)\)]?\s*'
return re.sub(pattern, f'{"#" * level} ', line)
lines = md_text.splitlines()
new_lines = []
for line in lines:
stripped = line.strip()
if stripped.startswith("# "):
new_lines.append(clean_header(line, 1))
elif stripped.startswith("## "):
new_lines.append(clean_header(line, 2))
elif stripped.startswith("### "):
new_lines.append(clean_header(line, 3))
elif re.match(r"^\s*#{4,}\s+", line):
content = re.sub(r"^\s*#{4,}\s+", "", line).strip()
content = re.sub(
r"^[\(\(]?[一二三四五六七八九十0-9]+[\.、\)\)]?\s*", "", content
)
transferred_header = f"- **{content}**"
new_lines.append(transferred_header)
else:
new_lines.append(line)
return "\n".join(new_lines)
@staticmethod
def _get_invalid_rows_for_insertion(report_lines: list[str]) -> set[int]:
"""
Identify rows that must NOT be used as visualization insertion anchors.
This follows `insert_visualization.md` forbidden insertion locations:
- fenced code blocks (``` or ~~~) and their inner lines
- indented code blocks (4 spaces or tab)
- list items (ordered/unordered)
- blockquotes ('>')
- markdown tables (lines starting with '|', ignoring leading whitespace)
"""
invalid_rows: set[int] = set()
in_code_block = False
for i, line in enumerate(report_lines, 1):
stripped = line.strip()
if stripped.startswith(("```", "~~~")):
invalid_rows.add(i)
in_code_block = not in_code_block
continue
if in_code_block:
invalid_rows.add(i)
continue
if line.startswith(" ") or line.startswith("\t"):
invalid_rows.add(i)
continue
if stripped.startswith(">"):
invalid_rows.add(i)
continue
if re.match(r"^(\d+[.)]\s+|[-*+]\s+)", stripped):
invalid_rows.add(i)
continue
if line.lstrip().startswith("|"):
invalid_rows.add(i)
return invalid_rows
@staticmethod
def _precheck_value_variation(
visualization_content: dict, section_idx: int
) -> bool:
try:
payload = json.loads(
visualization_content.get("sub_section_visualization_content", "")
)
chart_type = payload.get("image_type", "")
if chart_type in ("bar", "line"):
records = payload.get("records", [])
values: list[float] = []
for row in records:
if (
isinstance(row, list)
and len(row) == 2
and isinstance(row[1], (int, float))
):
values.append(float(row[1]))
if values and len(set(values)) < 3:
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "insufficient_value_variation"
return False
except Exception as e:
logger.warning(
"%s [process_visualization_task] section_idx: [%s] "
"value-variation precheck failed: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
str(e),
)
return True
@staticmethod
def _generate_mermaid_code(visualization_content: dict, section_idx: int) -> dict:
visualization_content["mermaid_content"] = ""
mermaid_ok = False
mermaid_type = None
try:
mermaid_type = json.loads(
visualization_content.get("sub_section_visualization_content", "")
).get("image_type", "")
except json.JSONDecodeError:
mermaid_type = ""
def _render_mermaid(chart_type: str, generator) -> bool:
try:
payload = json.loads(
visualization_content.get("sub_section_visualization_content", "")
)
records = payload.get("records", [])
if not isinstance(records, list) or not (3 <= len(records) <= 12):
raise ValueError(f"{chart_type} records length out of range")
mermaid_code = generator.generate_from_json(
json.dumps(payload, ensure_ascii=False)
)
visualization_content["mermaid_content"] = mermaid_code
return True
except Exception as e:
logger.warning(
"%s [process_visualization_task] section_idx: [%s], %s mermaid generation failed: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
chart_type,
str(e),
)
return False
if mermaid_type == "bar":
mermaid_ok = _render_mermaid("bar", XYChartMermaidGenerator)
elif mermaid_type == "line":
mermaid_ok = _render_mermaid("line", XYChartMermaidGenerator)
elif mermaid_type == "pie":
mermaid_ok = _render_mermaid("pie", PieChartMermaidGenerator)
elif mermaid_type == "timeline":
mermaid_ok = _render_mermaid("timeline", TimelineChartMermaidGenerator)
else:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [process_visualization_task] section_idx: [{section_idx}], "
f"unsupported mermaid_type: {mermaid_type}"
)
if not mermaid_ok:
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "mermaid_generation_failed"
return visualization_content
@staticmethod
def check_chapter_format(text, section_idx) -> tuple[bool, str]:
"""Validate subsection outline plain-text numbering"""
try:
n = section_idx
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
if not lines:
return False, "outline is empty"
sub_pat = re.compile(rf"^\s*{n}\.(\d+)\s*")
main_space_pat = re.compile(rf"^\s*{n}\s+.+")
main_dot_pat = re.compile(rf"^\s*{n}\.(?!\d)\s*.+")
third_pat = re.compile(r"\d+\.\d+\.\d+")
has_main = False
sub_numbers = []
for line_no, ln in enumerate(lines, start=1):
if ln.lstrip().startswith("#"):
preview = ln[:120] + ("..." if len(ln) > 120 else "")
return (
False,
f"line {line_no}: markdown heading not allowed "
f"(use plain '{n} title' / '{n}.1 title', not '#'): {preview!r}",
)
if third_pat.search(ln):
preview = ln[:120] + ("..." if len(ln) > 120 else "")
return (
False,
f"line {line_no}: third-level numbering not allowed (e.g. {n}.1.1): {preview!r}",
)
sub_match = sub_pat.match(ln)
if sub_match:
sub_numbers.append(int(sub_match.group(1)))
elif main_space_pat.match(ln) or main_dot_pat.match(ln):
if has_main:
preview = ln[:120] + ("..." if len(ln) > 120 else "")
return (
False,
f"line {line_no}: duplicate level-1 title for section {n}: {preview!r}",
)
has_main = True
elif re.match(r"\d+", ln):
preview = ln[:120] + ("..." if len(ln) > 120 else "")
return (
False,
f"line {line_no}: line starts with digits but is not a valid "
f"'{n} title' or '{n}.x' subsection title: {preview!r}",
)
sorted_subs = sorted(set(sub_numbers))
if not sorted_subs:
return (
False,
f"no valid subsection lines like '{n}.1 title' "
f"(found {len(lines)} non-empty line(s); level-1 present={has_main})",
)
if sorted_subs[0] != 1:
return (
False,
f"first subsection must be {n}.1, got {n}.{sorted_subs[0]} "
f"(subsection indices found: {sorted_subs})",
)
if not has_main:
return (
False,
f"missing level-1 title line like '{n} section title' "
f"(subsection indices found: {sorted_subs})",
)
return True, ""
except Exception as e:
if LogManager.is_sensitive():
return False, f"format check exception for section_idx={section_idx}"
return False, f"format check exception for section_idx={section_idx}: {e}"
@staticmethod
def is_valid_chapter_format(text, section_idx) -> bool:
"""Check chapter format"""
ok, reason = Reporter.check_chapter_format(text, section_idx)
if not ok:
logger.warning(
"%s [is_valid_chapter_format] section_idx=%s invalid: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
reason,
)
return ok
@staticmethod
def add_references(sub_section_content: str, references: list, language: str):
"""Add references for subsection content"""
logger.info(f"Adding references to sub_section_content")
if not references:
logger.info(f"No references found. can not add references.")
return sub_section_content
if sub_section_content:
if language == CHINESE:
append = "\n## 参考文章\n"
else:
append = "\n## References\n"
temp_ref = "\n".join(f"[{i + 1}] {s}" for i, s in enumerate(references))
sub_section_content = sub_section_content + append + temp_ref
return sub_section_content
@staticmethod
def refresh_reference(sub_reports_content, sub_references, all_classified_contents):
"""Refresh references"""
refreshed_references = ""
raw_references = "\n".join(sub_references) if sub_references else ""
if raw_references:
refreshed_references, ref_map = _deduplicate_and_renumber_ref(
raw_references
)
if not LogManager.is_sensitive():
logger.info("refreshed_references: [%s]", refreshed_references)
sub_reports_content, all_classified_contents = (
_replace_citations_and_classified_index(
sub_reports_content, all_classified_contents, ref_map
)
)
return dict(
sub_reports_content="\n\n".join(sub_reports_content),
sub_references=refreshed_references,
refreshed_all_classified_contents=all_classified_contents,
)
@staticmethod
def _is_valid_insert_plan(
plan_obj: object,
report_lines: list[str],
invalid_rows: set[int],
mermaid_map: dict[int, str],
) -> tuple[bool, str]:
if not isinstance(plan_obj, dict):
return (
False,
"Plan must be a JSON object with an 'insertions' array.",
)
insertions = plan_obj.get("insertions")
if not isinstance(insertions, list):
return (
False,
"Invalid 'insertions': expected an array of {after_row, index} objects.",
)
used_indices: set[int] = set()
for item in insertions:
if not isinstance(item, dict):
return (
False,
"Each insertion must be an object with 'after_row' and 'index' integers.",
)
after_row = item.get("after_row")
index = item.get("index")
if not isinstance(after_row, int) or not isinstance(index, int):
return (
False,
"Fields 'after_row' and 'index' must both be integers.",
)
if after_row < 1 or after_row > len(report_lines):
return (
False,
"after_row is out of range for the current report lines.",
)
if after_row in invalid_rows:
return (
False,
"after_row points into a forbidden line (code block/list/table).",
)
if index not in mermaid_map:
return (
False,
"index does not exist in the provided visualization data.",
)
if index in used_indices:
return (
False,
"Duplicate index detected; each index can appear only once.",
)
used_indices.add(index)
return True, ""
@staticmethod
def get_section_title_by_id(index, current_outline):
"""根据 section id 从大纲中获取章节标题。"""
if not current_outline or not isinstance(current_outline, Outline):
logger.warning("can not get section title for current outline is invalid.")
return ""
if index < 0 or index >= len(current_outline.sections):
logger.warning("can not get section title for index is out of range.")
return ""
return current_outline.sections[index].title
@staticmethod
def export_outline_without_plans(outline: Outline | dict):
"""导出不包含执行计划信息的大纲结构。"""
if not outline or not isinstance(outline, (Outline, dict)):
logger.warning(
"export_outline_without_plans: unsupported outline type or empty outline."
)
return outline
is_dict = isinstance(outline, dict)
obj = Outline.model_validate(outline) if is_dict else outline
data = obj.model_dump(exclude={"sections": {"__all__": {"plans"}}})
return data if is_dict else Outline.model_validate(data)
@staticmethod
def _get_background_knowledge_contents(background_knowledge: list) -> list[str]:
"""Extract usable text snippets from dependency-writing background knowledge."""
if not isinstance(background_knowledge, list):
return []
contents = []
for item in background_knowledge:
if not isinstance(item, dict):
continue
content = str(item.get("content_summary", "") or "").strip()
if not content:
continue
section_id = str(item.get("section_id", "") or "").strip()
if section_id:
content = f"[Parent Section {section_id}] {content}"
contents.append(content)
return contents
@staticmethod
def _is_missing_subsection_report_context(
section_task: str,
sub_section_outline: str,
has_collected_infos: bool,
has_background_knowledge: bool,
) -> bool:
"""Check whether subsection report generation lacks required context."""
if not section_task:
return True
if not sub_section_outline:
return True
return not (has_collected_infos or has_background_knowledge)
async def generate_report(self, gen_report_context: dict) -> Tuple[bool, str]:
"""
generate general report according to report_style/report_format/report_lang.
Args:
gen_report_context: the context which generate report needed
Returns:
tuple[bool, str]: The response.
bool: Is request success.
str: Success: Report path (maybe empty), Error: Error messages.
"""
if LogManager.is_sensitive():
logger.debug("[generate_report] generate start")
else:
logger.debug(
"[generate_report] generate start, gen_report_context: %s",
gen_report_context,
)
if not self._set_context_variables(gen_report_context):
logger.error(f"[generate_report] Error: Set context variables failed")
return False, "Error: Set context variables failed"
self.gen_report_context["current_outline"] = self.export_outline_without_plans(
self.gen_report_context.get("current_outline", {})
)
sub_report_res = await self._process_sub_report()
if not sub_report_res.get("sub_reports_content"):
logger.error(f"[generate_report] Error: No sub-reports content found")
return False, "Error: No sub-reports content found"
gen_report_context["all_classified_contents"] = sub_report_res.get(
"refreshed_all_classified_contents"
)
abstract_task = asyncio.create_task(
self.generate_abstract(sub_report_res.get("sub_reports_content"))
)
conclusion_task = asyncio.create_task(
self.generate_conclusion(sub_report_res.get("sub_reports_content"))
)
try:
abstract = await abstract_task
conclusion = await conclusion_task
except RetryError as retry_err:
logger.error(
f"[generate_report] Report generation failed after retries: {retry_err}"
)
return False, f"Report generation failed after retries: {retry_err}"
except Exception as e:
if LogManager.is_sensitive():
logger.error(
f"[generate_report] Unexpected error during report generation"
)
return False, f"Unexpected error during report generation"
logger.error(
f"[generate_report] Unexpected error during report generation: {e}"
)
return False, f"Unexpected error during report generation: {e}"
current_outline = self.gen_report_context.get("current_outline", "")
if not current_outline:
error_message = "has no current outline"
logger.error(f"[generate_report] Generate report error: {error_message}")
return False, error_message
report_content = (
f"{'# ' + current_outline.title}\n\n"
f"{self._post_process_abstract(abstract)}\n\n"
f"{sub_report_res.get('sub_reports_content')}\n\n"
f"{self._post_process_conclusion(conclusion)}\n\n"
f"{ArticlePart.get_title('reference', gen_report_context['language'])}"
f"{sub_report_res.get('sub_references')}\n\n"
)
self.gen_report_context["report"] = report_content
if LogManager.is_sensitive():
logger.debug("[generate_report] generate success")
else:
logger.debug(
"[generate_report] generate success, general report content:\n[%s]",
report_content,
)
if not report_content.strip():
logger.error("[generate_report] md report content is empty.")
return False, "md report content empty."
return True, "success"
@retry(
stop=stop_after_attempt(Config().service_config.report_max_generate_retry_num),
retry=retry_if_exception_type(Exception),
after=after_log(logger, logging.WARNING),
)
async def generate_abstract(self, sub_reports_content: str) -> str:
"""Generate abstract for report"""
logger.info(f"Start to generate abstract with llm...")
report_format = ReportFormat.MARKDOWN
prompt = f"report_abstract_{report_format.get_name()}"
abstract = await self._generate_with_llm(
"abstract", prompt, sub_reports_content
)
logger.info(f"Generating report abstract Done.")
return abstract
@retry(
stop=stop_after_attempt(Config().service_config.report_max_generate_retry_num),
retry=retry_if_exception_type(Exception),
after=after_log(logger, logging.WARNING),
)
async def generate_conclusion(self, sub_reports_content: str) -> str:
"""Generate conclusion for report"""
logger.info(f"Start to generate conclusion with llm...")
report_type = "professional"
if isinstance(self.gen_report_context, dict):
report_type = self.gen_report_context.get("report_type", "professional")
if report_type == "brief":
prompt = "report_conclusion_markdown"
else:
report_format = ReportFormat.MARKDOWN
prompt = f"report_implications_and_recommendations_{report_format.get_name()}"
conclusion = await self._generate_with_llm(
"conclusion", prompt, sub_reports_content
)
logger.info(f"Generating report conclusion Done.")
return conclusion
async def generate_sub_report(
self, current_inputs: dict
) -> tuple[bool, str, str, list]:
"""生成子章节报告。
Args:
current_inputs: 子章节生成所需的上下文参数。
Returns:
元组,依次表示是否成功、报告内容、子报告内容和分类后的内容列表。
"""
section_idx = current_inputs.get("section_idx", 1)
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] start to generate subsection report, "
f"section_idx: [{section_idx}]"
)
if LogManager.is_sensitive():
logger.info(
f"{EFFECT_SUB_REPORT_TAG} section_idx: [{section_idx}], "
f"doc infos len: {len(current_inputs.get('doc_infos', []))}"
)
else:
logger.debug(
"%s [generate_sub_report] section_idx: [%s], doc infos is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
current_inputs.get("doc_infos", []),
)
rtp = current_inputs.get("report_type_policy") or {}
if isinstance(rtp, dict):
current_inputs.setdefault("report_type", rtp.get("report_type", "professional"))
current_inputs.setdefault("paragraph_style", rtp.get("paragraph_style", "detailed"))
current_inputs.setdefault("require_summary_first", rtp.get("require_summary_first", False))
current_inputs.setdefault(
"require_methodology_and_risk", rtp.get("require_methodology_and_risk", False)
)
doc_infos = current_inputs.get("doc_infos", [])
background_contents = self._get_background_knowledge_contents(
current_inputs.get("sub_report_background_knowledge", [])
)
if not doc_infos:
if not background_contents:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] fail to generate subsection report, "
f"section_idx: [{section_idx}], not found doc infos"
)
return False, "Not found doc infos", "", []
logger.info(
"%s [generate_sub_report] section_idx: [%s], no doc_infos found, "
"use dependency background knowledge as fallback.",
EFFECT_SUB_REPORT_TAG,
section_idx,
)
current_inputs["sub_section_core_content"] = background_contents
current_inputs["sub_section_references"] = []
current_inputs["classified_content"] = []
classified_content = []
else:
classify_success, classified_content = await self._classify_doc_infos(
current_inputs
)
if LogManager.is_sensitive():
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"classified_content len: {len(classified_content)}"
)
else:
logger.debug(
"%s [generate_sub_report] section_idx: [%s], classified_content is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
classified_content,
)
if classify_success:
selected_urls = classified_content.get("selected_url_list", [])
selected_urls = list(dict.fromkeys(selected_urls))
if not selected_urls:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
"no selected urls returned from classification"
)
return False, "no selected urls from classification", "", []
classify_doc_infos_res_top_k_num = current_inputs.get(
"classify_doc_infos_res_top_k_num", 10
)
classified_infos, classified_doc_infos = _get_classified_infos(
doc_infos,
selected_urls,
max_source_id_count=classify_doc_infos_res_top_k_num,
)
current_inputs["sub_section_core_content"] = classified_infos.get(
"core_content_list", []
)
current_inputs["sub_section_references"] = classified_infos.get(
"references", []
)
for idx, doc_info in enumerate(classified_doc_infos):
doc_info.pop("query", None)
doc_info["index"] = idx + 1
current_inputs["classified_content"] = classified_doc_infos
else:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] Error: Classify doc information failed for "
f"[{classified_content}], section_idx: [{section_idx}]"
)
return False, "classify_doc_infos fail", "", []
classified_content = current_inputs.get("classified_content", [])
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_report] section_idx: [%s], sub section content is: [%s], "
"sub section references: [%s], classified content: [%s]",
EFFECT_SUB_REPORT_TAG,
section_idx,
current_inputs.get("sub_section_core_content", []),
current_inputs.get("sub_section_references", []),
current_inputs.get("classified_content", []),
)
max_attempt_num = current_inputs.get("max_generate_retry_num", 3)
for attempt_num in range(max_attempt_num):
gen_sub_res = await self._generate_sub_section_outline(current_inputs)
outline_text = gen_sub_res.get("sub_section_outline") or ""
if gen_sub_res["rs_success"]:
ok, reason = self.check_chapter_format(outline_text, section_idx)
if ok:
current_inputs["sub_section_outline"] = outline_text
break
fail_detail = f"outline format invalid: {reason}"
else:
fail_detail = f"LLM outline generation failed: {outline_text[:200]}"
if LogManager.is_sensitive():
outline_log = f"<{len(outline_text)} chars>"
else:
preview = outline_text.replace("\n", "\\n")
outline_log = preview[:500] + ("..." if len(preview) > 500 else "")
logger.warning(
"%s [generate_sub_report] section_idx: [%s], "
"section outline failed on attempt %s/%s: %s | outline=%s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt_num + 1,
max_attempt_num,
fail_detail,
outline_log,
)
if attempt_num == max_attempt_num - 1:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"Error: Generate section outline failed, reach the max_attempt_num: {max_attempt_num}."
)
return False, "generate section outline fail", "", classified_content
if current_inputs.get("visualization_enable", True):
try:
visualization_result = await self._generate_content_for_visualization(
current_inputs
)
current_inputs["visualization_result"] = visualization_result[
"visualization_content"
]
except Exception as e:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}] "
f"visualization generation failed, skip visuals: {str(e)}"
)
current_inputs["visualization_result"] = []
session = session_context.get()
stream_id = str(uuid.uuid4())
for attempt_num in range(max_attempt_num):
write_res = await self._write_subsection_reports(current_inputs)
if write_res["success"]:
if LogManager.is_sensitive():
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"reports generated: successfully"
)
else:
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"reports generated: {write_res['result']}"
)
await session.write_custom_stream(
self._make_payload(stream_id, StreamEvent.SUMMARY_RESPONSE.value, "SUCCESS"))
return (
True,
write_res["result"],
current_inputs.get("sub_report_content", ""),
classified_content,
)
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"Warning: Generate section report failed on attempt {attempt_num + 1}/{max_attempt_num}. retry ..."
)
await session.write_custom_stream(
self._make_payload(
stream_id,
StreamEvent.SUMMARY_RESPONSE.value,
"generate section report fail"
)
)
if attempt_num == max_attempt_num - 1:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_report] section_idx: [{section_idx}], "
f"Error: Generate section report failed, reach the max_attempt_num: {max_attempt_num}."
)
return False, "generate section report fail", "", classified_content
async def _generate_with_llm(self, task_type, prompt, content):
if isinstance(self.gen_report_context, dict):
self.gen_report_context["CURRENT_TIME"] = datetime.now(
tz=timezone.utc
).strftime("%a %b %d %H:%M:%S %Y %Z")
llm_input = apply_system_prompt(prompt, self.gen_report_context)
llm_input.append(dict(role="user", content=f"Main Content: {content}\n\n"))
if not LogManager.is_sensitive():
logger.debug(
"llm input when generating %s with llm: %s", task_type, llm_input
)
agent_name_by_task_type = {
"abstract": AgentLlmName.REPORTER_ABSTRACT.value,
"conclusion": AgentLlmName.REPORTER_CONCLUSION.value,
}
agent_name = agent_name_by_task_type.get(task_type)
if agent_name is None:
raise KeyError(f"Unsupported report task type: {task_type}")
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=agent_name,
)
if not LogManager.is_sensitive():
logger.debug(
"llm output when generating %s with llm: %s", task_type, llm_output
)
return llm_output.get("content")
def _post_process_abstract(self, content: str) -> str:
language = self.gen_report_context["language"]
if content is None or content == "":
return ArticlePart.get_not_found_prompt("abstract", language)
header = ArticlePart.get_title("abstract", language)
content = re.sub(r"\[?citation:\d+\]?", "", content)
if language == CHINESE:
if content.startswith("摘要") and len(content) >= 2:
content = content[2:]
if content and content[0] in [":", ":", "—", "–", " ", " "]:
content = content[1:]
content = content.lstrip()
elif language == ENGLISH:
if content.lower().startswith("abstract") and len(content) >= 8:
content = content[8:]
if content and content[0] in [":", " ", "-"]:
content = content[1:]
content = content.lstrip()
if content.startswith(header):
return content
return header + content
def _post_process_conclusion(self, content: str) -> str:
language = self.gen_report_context["language"]
if content is None or content == "":
return ArticlePart.get_not_found_prompt("conclusion", language)
header = ArticlePart.get_title("conclusion", language)
content = re.sub(r"\[?citation:\d+\]?", "", content)
if content.startswith(header):
return content
return header + content
def _set_context_variables(self, gen_report_context: dict) -> bool:
"""Set context to instance"""
if gen_report_context is None:
return False
self.gen_report_context = gen_report_context
rtp = self.gen_report_context.get("report_type_policy")
if isinstance(rtp, dict):
self.gen_report_context.setdefault("report_type", rtp.get("report_type", "professional"))
self.gen_report_context.setdefault("paragraph_style", rtp.get("paragraph_style", "detailed"))
self.gen_report_context.setdefault(
"require_summary_first", rtp.get("require_summary_first", False)
)
self.gen_report_context.setdefault(
"require_methodology_and_risk", rtp.get("require_methodology_and_risk", False)
)
return True
async def _add_sub_report_transaction(self, current_inputs: dict):
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_transaction] Starting section_idx: %s, current_inputs: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs,
)
summary_prev = current_inputs.get("summary_prev", "")
summary_next = current_inputs.get("summary_next", "")
if not summary_prev and not summary_next:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [_generate_sub_report_transaction] section_idx:"
f"{current_inputs.get('section_idx', 1)}, source summary are empty."
)
return current_inputs.get("content", "")
try:
llm_input = apply_system_prompt(
"generate_transition_sentence",
dict(
section_id=current_inputs.get("section_idx", 1),
language=current_inputs.get("language", "zh-CN"),
title_prev=current_inputs.get("title_prev", ""),
title_next=current_inputs.get("title_next", ""),
summary_prev=summary_prev,
summary_next=summary_next,
user_query=current_inputs.get("user_query", ""),
),
)
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_transaction] section_idx: %s llm_input is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_input,
)
retry_num = Config().service_config.report_max_generate_retry_num
for i in range(retry_num):
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.REPORTER_TRANSACTION.value,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_transaction] section_idx: %s llm_output is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_output,
)
if not llm_output or not llm_output.get("content"):
if i == retry_num - 1:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [_generate_sub_report_transaction] "
f"generate transaction reach max attempt times."
f"section id is {current_inputs.get('section_idx', 1)}"
)
return current_inputs.get("content", "")
else:
content = current_inputs.get("content", "")
old = current_inputs.get("title_next", "")
new = old + "\n" + llm_output.get("content")
msg = content.replace(old, new, 1)
return msg
except Exception as e:
if LogManager.is_sensitive():
error_msg = (
f"Error while generating section {current_inputs.get('section_idx', 1)}"
f"report's transaction."
)
else:
error_msg = (
f"Error generating section {current_inputs.get('section_idx', 1)}"
f"report's transaction: {str(e)}"
)
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [_generate_sub_report_transaction] {error_msg}",
exc_info=True,
)
return current_inputs.get("content", "")
async def _process_sub_report(self) -> dict:
"""Process sub reports"""
sub_reports_content = []
sub_references = []
all_classified_contents = self.gen_report_context.get(
"all_classified_contents", []
)
current_report = self.gen_report_context.get("current_report")
if (
not current_report
or not hasattr(current_report, "sub_reports")
or not current_report.sub_reports
):
logger.error(
"Current_report not found in context or sub_reports is empty; use empty content."
)
return dict(
sub_reports_content="",
sub_references="",
refreshed_all_classified_contents=[],
)
sub_report_content_list = []
for sub_report in current_report.sub_reports:
sub_report_item = type(
"SubReportItem",
(),
{
"section_id": sub_report.section_id,
"content": (
sub_report.content.sub_report_content_text
if sub_report.content
else ""
),
"content_summary": (
sub_report.content.sub_report_content_summary
if sub_report.content
else ""
),
},
)()
sub_report_content_list.append(sub_report_item)
if not sub_report_content_list or all(
not item.content for item in sub_report_content_list
):
logger.error("All content in sub_reports is empty; use empty content.")
return dict(
sub_reports_content="",
sub_references="",
refreshed_all_classified_contents=[],
)
outline_renum = MarkdownOutlineRenumber()
sub_report_content_list.sort(
key=lambda x: Reporter._section_sort_key(x.section_id)
)
transition_tasks = []
transition_indices = []
for index, item in enumerate(sub_report_content_list):
if not item or not item.content:
logger.error(
f"sub report content is empty and sub report index is {index + 1}"
)
continue
section_content = item.content
if section_content:
section_content = outline_renum.renumber_headers(section_content)
if index == 0:
current_inputs = dict(
title_prev="",
summary_prev="",
title_next=Reporter.get_section_title_by_id(
index, self.gen_report_context.get("current_outline", None)
),
summary_next=item.content_summary,
language=self.gen_report_context.get("language", "zh-CN"),
user_query=self.gen_report_context.get("report_task", ""),
content=section_content,
section_idx=index + 1,
)
transition_tasks.append(
asyncio.create_task(
self._add_sub_report_transaction(current_inputs)
)
)
transition_indices.append(index)
elif 0 < index < len(sub_report_content_list):
current_inputs = dict(
title_prev=Reporter.get_section_title_by_id(
index - 1,
self.gen_report_context.get("current_outline", None),
),
summary_prev=sub_report_content_list[index - 1].content_summary,
title_next=Reporter.get_section_title_by_id(
index, self.gen_report_context.get("current_outline", None)
),
summary_next=item.content_summary,
language=self.gen_report_context.get("language", "zh-CN"),
user_query=self.gen_report_context.get("report_task", ""),
content=section_content,
section_idx=index + 1,
)
transition_tasks.append(
asyncio.create_task(
self._add_sub_report_transaction(current_inputs)
)
)
transition_indices.append(index)
tasks_results = await asyncio.gather(*transition_tasks)
for index, section_content in zip(transition_indices, tasks_results):
if not section_content:
logger.error(
f"section content is empty and sub report index is {index + 1}"
)
continue
sub_report_content_list[index].content = section_content
ref_split = re.split(
r"#+\s*[0-9.]*\s*(参考文章|References)\s*",
section_content,
flags=re.IGNORECASE,
)
if len(ref_split) >= 3:
content_part = ref_split[0].strip()
references = ref_split[2].strip()
sub_references.append(references if references else "")
sub_reports_content.append(content_part)
else:
sub_references.append("")
sub_reports_content.append(section_content.strip())
logger.info(f"子章节标题重排记录:{outline_renum.history}")
return Reporter.refresh_reference(
sub_reports_content, sub_references, all_classified_contents
)
async def _classify_with_llm(
self, current_inputs: dict, section_task: str, doc_infos: List[Dict]
) -> Tuple[bool, str]:
"""调用分类 LLM 为当前章节选择相关文档 URL。
Args:
current_inputs: 当前子报告生成上下文。
section_task: 当前章节标题。
doc_infos: 候选文档信息列表。
Returns:
元组,包含调用是否成功,以及 LLM 原始输出或失败原因。
"""
section_idx = current_inputs.get("section_idx", 1)
section_description = current_inputs.get(
"section_description", ""
)
subsection_outline = current_inputs.get("sub_section_outline", "")
max_attempt_num = current_inputs.get("max_generate_retry_num", 3)
for attempt in range(1, max_attempt_num + 1):
try:
legacy_doc_infos = build_legacy_doc_infos_view(doc_infos)
infos_for_llm = (
f"Section title is {section_task},"
f"User query is {current_inputs.get('report_task', '')},"
f"Document infos is {legacy_doc_infos},"
f"Section description is {section_description},"
f"Subsection outline is {subsection_outline}"
)
tmp_context = {
"messages": [dict(role="user", content=infos_for_llm)],
"top_k": current_inputs.get("classify_doc_infos_res_top_k_num", 10),
}
llm_input = apply_system_prompt("classify_doc_infos", tmp_context)
if not LogManager.is_sensitive():
logger.debug(
"%s [classify_with_llm] section_idx: [%s], llm_input is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_input,
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_CLASSIFY_DOC_INFOS.value,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [classify_with_llm] section_idx: [%s], llm_output is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_output,
)
if not llm_output or not llm_output.get("content"):
error_msg = "LLM returned empty content for the section"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [classify_with_llm] section_idx: [{section_idx}] try the {attempt} "
f"times, error: {error_msg}"
)
raise CustomValueException(
error_code=StatusCode.LLM_RESPONSE_ERROR.code, message=error_msg
)
return True, llm_output.get("content")
except Exception as e:
if LogManager.is_sensitive():
error_msg = f"Error classify doc infos"
else:
error_msg = f"Error classify doc infos: {str(e)}"
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [classify_with_llm] section_idx: [{section_idx}] "
f"retry the {attempt}/{max_attempt_num} times, {error_msg}",
exc_info=True,
)
if attempt >= max_attempt_num:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [classify_with_llm] section_idx: [{section_idx}] "
f"retry reach the max_attempt_num: {max_attempt_num}"
)
return False, error_msg
return (
False,
f"classify doc_infos failed after retry max_attempt_num: {max_attempt_num}",
)
async def _classify_doc_infos(self, current_inputs: dict):
"""根据当前章节从候选文档中选择相关 URL。
Args:
current_inputs: 包含章节信息、候选 doc_infos 和分类配置的上下文。
Returns:
元组,包含分类是否成功,以及分类结果或失败原因。成功时结果使用
selected_url_list 表示选中的文档 URL。
"""
section_idx = current_inputs.get("section_idx", 1)
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] Starting to classify doc infos, section_idx: "
f"[{section_idx}]"
)
section_task = self.strip_leading_number(
current_inputs.get("section_task", "")
)
raw_doc_infos = current_inputs.get("doc_infos", [])
classify_doc_infos_single_time_num = current_inputs.get(
"classify_doc_infos_single_time_num", 60
)
classify_doc_infos_res_top_k_num = current_inputs.get(
"classify_doc_infos_res_top_k_num", 10
)
classify_doc_infos_prefilter_multiplier = current_inputs.get(
"classify_doc_infos_prefilter_multiplier", 5
)
prefilter_result = prefilter_doc_infos_for_classification(
raw_doc_infos,
result_top_k=classify_doc_infos_res_top_k_num,
prefilter_multiplier=classify_doc_infos_prefilter_multiplier,
)
doc_infos = prefilter_result.doc_infos
logger.info(
"%s [classify_doc_infos] section_idx: [%s] prefilter stats: "
"original_count=%s, deduped_count=%s, filtered_count=%s, candidate_limit=%s, "
"url_key_count=%s, content_variant_count=%s, step_bucket_count=%s, score_stats=%s",
EFFECT_SUB_REPORT_TAG,
section_idx,
prefilter_result.original_count,
len(prefilter_result.deduped_doc_infos),
prefilter_result.filtered_count,
prefilter_result.candidate_limit,
prefilter_result.url_key_count,
prefilter_result.content_variant_count,
prefilter_result.step_bucket_count,
prefilter_result.score_stats,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [classify_doc_infos] section_idx: [%s] prefilter step_bucket_stats=%s",
EFFECT_SUB_REPORT_TAG,
section_idx,
prefilter_result.step_bucket_stats,
)
if not section_task or not doc_infos:
if section_task and prefilter_result.deduped_doc_infos:
doc_infos = prefilter_result.deduped_doc_infos
else:
error_msg = "Missing 'section_task' or 'doc_infos' in context (section title required)"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] {error_msg}"
)
return False, error_msg
async def classify_until_converged(candidate_doc_infos: list[dict], *, is_fallback: bool = False):
round_count = 0
doc_infos_for_round = candidate_doc_infos
while round_count < MAX_LOOP_ROUND:
round_count += 1
logger.info(
"%s [classify_doc_infos] section_idx: [%s] start round NO. [%s]",
EFFECT_SUB_REPORT_TAG,
section_idx,
round_count,
)
batches = build_balanced_doc_batches(
doc_infos_for_round,
classify_doc_infos_single_time_num,
)
results = await asyncio.gather(
*[
self._classify_with_llm(current_inputs, section_task, batch)
for batch in batches
],
return_exceptions=True,
)
merged_urls = []
merged_url_set = set()
for res in results:
if isinstance(res, Exception):
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] "
f"round:[{round_count}], classify task raised exception: {str(res)}",
exc_info=True,
)
continue
res_flag, json_str = res
if not res_flag:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] "
f"round:[{round_count}], partly classify doc_infos with llm failed, failed reason: "
f"{json_str}"
)
continue
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
if LogManager.is_sensitive():
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] "
f"round:[{round_count}], partly classify doc_infos with llm failed, "
f"failed reason: parse classified doc information failed"
)
else:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] "
f"round:[{round_count}], partly classify doc_infos with llm failed, "
f"failed reason: parse classified doc information failed: {e}"
)
continue
for url in data.get("selected_url_list", []):
if url in merged_url_set:
continue
merged_urls.append(url)
merged_url_set.add(url)
if not merged_urls:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] "
f"round:[{round_count}], no selected urls returned."
)
return False, "no selected urls from classification"
if len(merged_urls) <= classify_doc_infos_res_top_k_num:
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [classify_doc_infos] section_idx: [{section_idx}] successfully "
f"ended on the NO.[{round_count}] round"
)
return True, {"selected_url_list": merged_urls}
doc_infos_for_round = [
doc for doc in doc_infos_for_round if doc.get("url") in merged_url_set
]
suffix = " during fallback" if is_fallback else ""
return False, f"Exceeded max loop round{suffix}: {MAX_LOOP_ROUND}"
success, result = await classify_until_converged(doc_infos)
if success:
return success, result
fallback_doc_infos = prefilter_result.deduped_doc_infos
if result == "no selected urls from classification" and fallback_doc_infos:
logger.warning(
"%s [classify_doc_infos] section_idx: [%s] prefiltered classification returned no URLs, "
"retry with deduped full candidates.",
EFFECT_SUB_REPORT_TAG,
section_idx,
)
return await classify_until_converged(fallback_doc_infos, is_fallback=True)
return success, result
async def _generate_sub_section_outline(self, current_inputs: dict) -> dict:
"""Generate subsection outline"""
section_idx = current_inputs.get("section_idx", 1)
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_section_outline] Starting to generate sub section outline, "
f"section_idx: [{section_idx}]"
)
report_task = current_inputs.get("report_task", "")
section_task = self.strip_leading_number(
current_inputs.get("section_task", "")
)
section_description = current_inputs.get(
"section_description", ""
)
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_section_outline] section_idx: [%s], section description: [%s]",
EFFECT_SUB_REPORT_TAG,
section_idx,
section_description,
)
collected_infos = current_inputs.get(
"sub_section_core_content", []
)
if not section_task or not collected_infos:
error_msg = "Missing 'section_task' or 'sub_section_core_content' in context (section title required)"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_section_outline] section_idx: [{section_idx}] "
f"{error_msg}"
)
return dict(rs_success=False, sub_section_outline=error_msg)
try:
sub_content_message = (
f"Section id is {section_idx},"
f"Section title is {section_task},"
f"User query is {report_task},"
f"Collected information is {collected_infos},"
f"Section description is {section_description},"
)
tmp_context = {}
tmp_context["messages"] = [dict(role="user", content=sub_content_message)]
tmp_context["section_idx"] = section_idx
tmp_context["language"] = current_inputs.get("language")
tmp_context["has_template"] = current_inputs.get("has_template")
tmp_context["section_title"] = section_task
tmp_context["section_description"] = section_description
tmp_context["report_type"] = current_inputs.get("report_type", "professional")
tmp_context["paragraph_style"] = current_inputs.get("paragraph_style", "detailed")
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_section_outline] has_template: "
f"{tmp_context['has_template']}"
)
llm_input = apply_system_prompt("sub_section_outline", tmp_context)
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_section_outline] section_idx: [%s] llm_input is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_input,
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_OUTLINE.value,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_section_outline] section_idx: [%s] llm_output is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_output,
)
if not llm_output or not llm_output.get("content"):
raise CustomValueException(
error_code=StatusCode.LLM_RESPONSE_ERROR.code,
message=f"LLM returned empty content for the section {section_idx}",
)
return dict(rs_success=True, sub_section_outline=llm_output.get("content"))
except Exception as e:
if LogManager.is_sensitive():
error_msg = "Error generating sub section outline"
else:
error_msg = f"Error generating sub section outline: {str(e)}"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_section_outline] section_idx: [{section_idx}] "
f"{error_msg}",
exc_info=True,
)
return dict(rs_success=False, sub_section_outline=error_msg)
async def _extract_data_from_text(
self,
visualization_dict: dict,
validation_error: str = "",
previous_records: str | None = None,
) -> dict:
section_idx = visualization_dict.get("section_idx", 1)
tmp_context = {
"language": visualization_dict.get("language", "zh-CN"),
"section_outline": visualization_dict.get("section_outline", ""),
"origin_content": visualization_dict.get("origin_content", ""),
}
validation_error = (validation_error or "").strip()
if validation_error:
tmp_context["messages"] = [
dict(
role="user",
content=(
"Previously extracted data did not pass validation: "
f"{validation_error}\n"
+ (
f"Previous extracted chart JSON: {previous_records}\n"
if previous_records
else ""
)
+ "Do NOT reuse, copy, or edit the previous extracted data. "
"Re-extract strictly from origin_content and output a fresh JSON."
),
)
]
try:
llm_input = apply_system_prompt(
"sub_section_visualization_content", tmp_context
)
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_section_visualization_content] section_idx: [%s] llm_input is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_input,
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_VISUALIZATION_CONTENT.value,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [generate_sub_section_visualization_content] section_idx: [%s] llm_output is %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
llm_output,
)
if not llm_output or not llm_output.get("content"):
raise CustomValueException(
error_code=StatusCode.LLM_RESPONSE_ERROR.code,
message=f"LLM generated empty visualization content for section {section_idx}",
)
payload = (llm_output.get("content") or "").strip()
return dict(rs_success=True, sub_section_visualization_content=payload)
except Exception as e:
if LogManager.is_sensitive():
error_msg = "Error generating visualization content"
else:
error_msg = f"Error generating visualization content: {str(e)}"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [generate_sub_section_visualization_content] section_idx: [{section_idx}] "
f"{error_msg}",
exc_info=True,
)
return dict(rs_success=False, visualization_content=error_msg)
async def _validate_chart_compliance(
self,
extracted_chart_json: str,
section_idx: int,
section_outline: str,
max_attempt_num: int,
) -> dict:
"""Validate extracted chart data with compliance prompt."""
payload = (extracted_chart_json or "").strip()
for attempt in range(max_attempt_num):
try:
llm_input = apply_system_prompt(
"chart_compliance_validate",
dict(
extracted_chart_json=payload,
section_outline=section_outline,
),
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_CHART_COMPLIANCE.value,
)
if not llm_output or not llm_output.get("content"):
logger.warning(
"%s [validate_chart_compliance] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
"LLM generated empty compliance content",
)
continue
raw = (llm_output.get("content") or "").strip()
result = json.loads(raw)
if not isinstance(result, dict):
logger.warning(
"%s [validate_chart_compliance] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
"LLM returned non-object compliance JSON",
)
continue
valid = bool(result.get("valid", False))
error_msg = str(result.get("error_msg", "") or "").strip()
if valid:
return dict(valid=True, error_msg="")
return dict(valid=False, error_msg=error_msg)
except Exception as e:
if isinstance(e, (json.JSONDecodeError, TypeError, ValueError)):
error_msg = (
"LLM returned invalid compliance JSON"
if LogManager.is_sensitive()
else f"LLM returned invalid compliance JSON: {str(e)}"
)
elif LogManager.is_sensitive():
error_msg = "chart compliance validation error"
else:
error_msg = f"chart compliance validation error: {str(e)}"
logger.warning(
"%s [validate_chart_compliance] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
error_msg,
)
return dict(valid=False, error_msg="")
async def _validate_chart_traceability(
self,
extracted_chart_json: str,
origin_content: str,
section_idx: int,
max_attempt_num: int,
) -> dict:
"""Validate extracted chart data traceability with origin content."""
payload = (extracted_chart_json or "").strip()
origin_text = (origin_content or "").strip()
for attempt in range(max_attempt_num):
try:
llm_input = apply_system_prompt(
"chart_data_traceability_check",
dict(
extracted_chart_json=payload,
origin_content=origin_text,
),
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_CHART_TRACEABILITY.value,
)
if not llm_output or not llm_output.get("content"):
logger.warning(
"%s [validate_chart_traceability] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
"LLM generated empty traceability content",
)
continue
raw = (llm_output.get("content") or "").strip()
result = json.loads(raw)
if not isinstance(result, dict):
logger.warning(
"%s [validate_chart_traceability] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
"LLM returned non-object traceability JSON",
)
continue
valid = bool(result.get("valid", False))
error_msg = str(result.get("error_msg", "") or "").strip()
if valid:
return dict(valid=True, error_msg="")
return dict(valid=False, error_msg=error_msg)
except Exception as e:
if isinstance(e, (json.JSONDecodeError, TypeError, ValueError)):
error_msg = (
"LLM returned invalid traceability JSON"
if LogManager.is_sensitive()
else f"LLM returned invalid traceability JSON: {str(e)}"
)
elif LogManager.is_sensitive():
error_msg = "chart traceability validation error"
else:
error_msg = f"chart traceability validation error: {str(e)}"
logger.warning(
"%s [validate_chart_traceability] section_idx: [%s] "
"attempt %s/%s error: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
attempt + 1,
max_attempt_num,
error_msg,
)
return dict(valid=False, error_msg="")
async def _extract_visualization_data(
self,
visualization_dict: dict,
visualization_content: dict,
max_attempt_num: int,
section_idx: int,
) -> tuple[bool, dict, dict | None]:
extract_ok = False
extracted_obj = None
validation_error = ""
previous_records: str | None = None
for i in range(max_attempt_num):
visualization_content = await self._extract_data_from_text(
visualization_dict, validation_error, previous_records
)
if not LogManager.is_sensitive():
logger.debug("%s [process_visualization_task] Extract data: %s.", EFFECT_SUB_REPORT_TAG,
visualization_content)
raw_payload = (
visualization_content.get("sub_section_visualization_content") or ""
).strip()
if raw_payload == "{}":
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "no_chart_data"
return False, visualization_content, None
try:
extracted_obj = json.loads(raw_payload)
except Exception:
extracted_obj = None
extract_ok = isinstance(
extracted_obj, dict
) and validate_visualization_extraction_schema(extracted_obj)
if extract_ok:
traceability = await self._validate_chart_traceability(
raw_payload,
visualization_dict.get("origin_content", ""),
section_idx,
max_attempt_num,
)
if not traceability.get("valid", False):
traceability_error = (
traceability.get("error_msg", "") or ""
).strip()
logger.warning(
"%s [process_visualization_task] section_idx: [%s], "
"traceability check failed: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
traceability_error,
)
validation_error = (
f"Traceability validation failed: {traceability_error}"
if traceability_error
else ""
)
validation_error += (
"\nYou must only extract complete records where every field"
"(category, value, unit) can be fully traced to the original content."
" Do not invent, fabricate, or infer any data that does not"
" have a clear corresponding description in the source."
)
previous_records = raw_payload or None
extract_ok = False
continue
compliance = await self._validate_chart_compliance(
raw_payload,
section_idx,
visualization_dict.get("section_outline", ""),
max_attempt_num,
)
if compliance.get("valid", False):
validation_error = ""
previous_records = None
break
compliance_error = (compliance.get("error_msg", "") or "").strip()
validation_error = (
f"Compliance/Relevance validation failed: {compliance_error}"
if compliance_error
else ""
)
previous_records = raw_payload or None
logger.warning(
"%s [process_visualization_task] section_idx: [%s], "
"compliance check failed: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
compliance_error,
)
extract_ok = False
continue
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [process_visualization_task] section_idx: [{section_idx}], "
f"Warning: Extract data from text on attempt {i + 1}/{max_attempt_num}. retry ..."
)
if not extract_ok:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [process_visualization_task] section_idx: [{section_idx}], "
"Skip mermaid generation due to invalid extracted data."
)
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "extract_data_failed"
return False, visualization_content, None
return True, visualization_content, extracted_obj
async def _build_visualization_mermaid(
self,
visualization_content: dict,
extracted_obj: dict,
visualization_dict: dict,
max_attempt_num: int,
section_idx: int,
) -> dict:
normalized = await self._normalize_visualization_content(
visualization_content,
extracted_obj,
visualization_dict,
max_attempt_num,
section_idx,
)
if not normalized:
return visualization_content
if not self._precheck_value_variation(visualization_content, section_idx):
return visualization_content
return self._generate_mermaid_code(visualization_content, section_idx)
async def _normalize_visualization_content(
self,
visualization_content: dict,
extracted_obj: dict,
visualization_dict: dict,
max_attempt_num: int,
section_idx: int,
) -> bool:
image_title = extracted_obj.get("image_title", "")
image_type = extracted_obj.get("image_type", "")
extracted_records = extracted_obj.get("records", [])
if image_type == "timeline":
timeline_records = []
for row in extracted_records:
if not isinstance(row, list) or len(row) != 3:
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "extract_data_failed"
return False
timeline_records.append([row[0], row[1]])
if len(timeline_records) != len(extracted_records):
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "extract_data_failed"
return False
final_obj = {
"image_title": image_title,
"image_type": "timeline",
"unit": "",
"records": timeline_records,
}
visualization_content["sub_section_visualization_content"] = json.dumps(
final_obj, ensure_ascii=False
)
return True
final_obj = None
records_json = json.dumps({"records": extracted_records}, ensure_ascii=False)
normalize_context = {
"language": visualization_dict.get("language", "zh-CN"),
"records_json": records_json,
}
normalize_input = apply_system_prompt(
"sub_section_visualization_normalize_units", normalize_context
)
for j in range(max_attempt_num):
normalize_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=normalize_input,
agent_name=AgentLlmName.SUB_REPORTER_VISUALIZATION_NORMALIZE.value,
)
if not normalize_output or not normalize_output.get("content"):
continue
normalized_payload = (normalize_output.get("content") or "").strip()
if normalized_payload == "{}":
continue
try:
normalized_obj = json.loads(normalized_payload)
except Exception as e:
if not LogManager.is_sensitive():
logger.warning(
"%s [process_visualization_task] section_idx: [%s], "
"normalize_units json decode failed on attempt %s/%s: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
j + 1,
max_attempt_num,
str(e),
)
continue
if not validate_visualization_normalization_schema(
normalized_obj, image_type
):
continue
if len(normalized_obj.get("records", [])) != len(extracted_records):
continue
final_obj = {
"image_title": image_title,
"image_type": image_type,
"unit": normalized_obj.get("unit", ""),
"records": normalized_obj.get("records", []),
}
break
if not final_obj:
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "normalize_failed"
return False
visualization_content["sub_section_visualization_content"] = json.dumps(
final_obj, ensure_ascii=False
)
return True
async def _process_visualization_task(self, visualization_dict: dict) -> dict:
"""Process one visualization task (LLM content + Mermaid generation)"""
section_idx = visualization_dict.get("section_idx", 1)
max_attempt_num = visualization_dict.get("max_attempt_num", 3)
visualization_content = dict(rs_success=True, visualization_content="")
origin_content = (visualization_dict.get("origin_content") or "").strip()
if not origin_content:
visualization_content["rs_success"] = False
visualization_content["error_msg"] = "origin_content_empty"
return visualization_content
extract_ok, visualization_content, extracted_obj = (
await self._extract_visualization_data(
visualization_dict,
visualization_content,
max_attempt_num,
section_idx,
)
)
if not extract_ok:
return visualization_content
return await self._build_visualization_mermaid(
visualization_content,
extracted_obj,
visualization_dict,
max_attempt_num,
section_idx,
)
async def _generate_content_for_visualization(self, current_inputs: dict) -> dict:
"""Generate content for visualization with concurrent LLM calls"""
section_idx = current_inputs.get("section_idx", 1)
section_outline = (current_inputs.get("sub_section_outline", "") or "").strip()
if not section_outline:
logger.warning(
"%s [generate_sub_section_visualization_content] section_idx: [%s], "
"missing sub_section_outline, skip visualization generation.",
EFFECT_SUB_REPORT_TAG,
section_idx,
)
return dict(rs_success=True, visualization_content=[])
section_task = self.strip_leading_number(current_inputs.get("section_task", ""))
logger.info(
"%s [generate_sub_section_visualization_content] Start generating content, section_idx: [%s]",
EFFECT_SUB_REPORT_TAG,
section_idx,
)
classified_content_for_visualization = deepcopy(
current_inputs.get("classified_content", [])
)
if not isinstance(classified_content_for_visualization, list):
logger.warning(
"%s [generate_sub_section_visualization_content] section_idx: [%s], "
"classified_content is not a list, skip visualization.",
EFFECT_SUB_REPORT_TAG,
section_idx,
)
return dict(rs_success=True, visualization_content=[])
visualization_content = self._select_visualization_from_classified_content(
classified_content_for_visualization
)
n = len(visualization_content)
if n == 0:
return dict(rs_success=True, visualization_content=visualization_content)
tasks = []
for i in range(n):
visualization_dict = {
"section_idx": section_idx,
"title": visualization_content[i].get("title", ""),
"origin_content": visualization_content[i].get("original_content", ""),
"data_density": visualization_content[i].get("data_density", -1.0),
"language": current_inputs.get("language", "zh-CN"),
"section_title": section_task,
"section_outline": section_outline,
"max_attempt_num": current_inputs.get("max_generate_retry_num", 3),
}
task = self._process_visualization_task(visualization_dict)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
logger.error(
"%s [generate_sub_section_visualization_content] section_idx: [%s], "
"error in task [%s]: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
i,
str(res),
)
visualization_content[i]["sub_section_visualization_content"] = ""
visualization_content[i]["mermaid_content"] = ""
else:
if res.get("rs_success"):
visualization_content[i]["sub_section_visualization_content"] = res[
"sub_section_visualization_content"
]
visualization_content[i]["mermaid_content"] = res["mermaid_content"]
else:
visualization_content[i]["sub_section_visualization_content"] = ""
visualization_content[i]["mermaid_content"] = ""
logger.warning(
"%s [generate_sub_section_visualization_content] section_idx: [%s], reason: %s",
EFFECT_SUB_REPORT_TAG,
section_idx,
res.get("error_msg", "Unknown"),
)
return dict(rs_success=True, visualization_content=visualization_content)
async def _generate_sub_report_summary(self, current_inputs: dict):
"""generate sub report summary"""
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_summary] Starting section_idx: %s, current_inputs: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs,
)
sub_report_content = current_inputs.get("sub_report_content", "")
if not sub_report_content:
logger.warning(
f"{EFFECT_SUB_REPORT_TAG} [_generate_sub_report_summary] section_idx:"
f"{current_inputs.get('section_idx', 1)}, sub report content is empty."
)
return dict(rs_success=True, result="")
sub_content_message = f"sub report content is {sub_report_content}"
current_outline = current_inputs.get("current_outline", {})
current_outline_without_plans = Reporter.export_outline_without_plans(
current_outline
)
try:
llm_input = apply_system_prompt(
"sub_report_summary",
dict(
messages=[dict(role="user", content=sub_content_message)],
section_id=current_inputs.get("section_idx", 1),
language=current_inputs.get("language", "zh-CN"),
outline=current_outline_without_plans,
user_query=current_inputs.get("report_task", ""),
report_type=current_inputs.get("report_type", "professional"),
paragraph_style=current_inputs.get("paragraph_style", "detailed"),
audience_role=current_inputs.get("audience_role", ""),
tone=current_inputs.get("tone", ""),
),
)
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_summary] section_idx: %s llm_input is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_input,
)
retry_num = Config().service_config.report_max_generate_retry_num
for i in range(retry_num):
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER_SUMMARY.value,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [_generate_sub_report_summary] section_idx: %s llm_output is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_output,
)
if not llm_output or not llm_output.get("content"):
if i == retry_num - 1:
raise CustomValueException(
error_code=StatusCode.AGENT_RETRY_FAILED_ALL_ATTEMPTS.code,
message=f"return empty summary content for the section "
f"{current_inputs.get('section_idx', 1)}",
)
else:
return dict(rs_success=True, result=llm_output.get("content"))
except Exception as e:
if LogManager.is_sensitive():
error_msg = (
f"Error while generating section {current_inputs.get('section_idx', 1)}"
f"report's summary."
)
else:
error_msg = (
f"Error generating section {current_inputs.get('section_idx', 1)}"
f"report's summary: {str(e)}"
)
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [_generate_sub_report_summary] {error_msg}",
exc_info=True,
)
return dict(rs_success=False, result="")
async def _write_subsection_reports(self, current_inputs: dict) -> dict:
"""Write subsection report to disk"""
if LogManager.is_sensitive():
logger.info(
f"{EFFECT_SUB_REPORT_TAG} [write_subsection_reports] Starting section_idx: "
f"{current_inputs.get('section_idx', 1)}"
)
else:
logger.debug(
"%s [write_subsection_reports] Starting section_idx: %s, current_inputs: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs,
)
section_task = self.strip_leading_number(
current_inputs.get("section_task", "")
)
has_collected_infos = bool(current_inputs.get("classified_content", []))
background_knowledge_contents = self._get_background_knowledge_contents(
current_inputs.get("sub_report_background_knowledge", [])
)
logger.debug(
"%s [write_subsection_reports] section_idx: %s, background_knowledge_contents: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
background_knowledge_contents,
extra={"skip_truncation": True},
)
has_background_knowledge = bool(background_knowledge_contents)
if self._is_missing_subsection_report_context(
section_task=section_task,
sub_section_outline=current_inputs.get("sub_section_outline", ""),
has_collected_infos=has_collected_infos,
has_background_knowledge=has_background_knowledge,
):
error_msg = (
"Missing 'section_task' or sub section outline or collected infos/background knowledge in context."
)
current_inputs["sub_report_content"] = ""
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [write_subsection_reports] section_idx: "
f"{current_inputs.get('section_idx', 1)} {error_msg}"
)
return dict(success=False, result=error_msg)
if not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] Processing section %s: %s, (total report: %s),",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
section_task,
(current_inputs.get("report_task", "") or "unknown"),
)
logger.debug(
"%s sub section outline: %s, section id is %s, classified content is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("sub_section_outline", ""),
current_inputs.get("section_idx", 1),
current_inputs.get("classified_content", []),
)
infos = ""
for item in current_inputs.get("classified_content", []):
infos += (
f"\n[citation:{item.get('index', 1)} begin]网页时间: {item.get('doc_time', '')}|||"
f"网页权威性:{item.get('source_authority', '')}|||网页相关性:{item.get('task_relevance', '')}|||"
f"网页内容:{item.get('original_content', '')}[citation:{item.get('index', 1)} end]"
)
current_outline = current_inputs.get("current_outline", {})
current_outline_without_plans = Reporter.export_outline_without_plans(
current_outline
)
sub_content_message = (
f"Section id is {current_inputs.get('section_idx', 1)},"
f"Section title is {section_task},"
f"User query is {current_inputs.get('report_task', '')},"
f"Collected information is {infos},"
f"Overall outline is {current_outline_without_plans},"
f"References is {current_inputs.get('sub_section_references', '')},"
f"Current Chapter Outline is "
f"{current_inputs.get('sub_section_outline', '')},"
f"Background Knowledge is {background_knowledge_contents}"
)
try:
report_type = current_inputs.get("report_type", "professional")
sub_report_prompt = (
"sub_report_brief_markdown"
if report_type == "brief"
else "sub_report_markdown"
)
llm_input = apply_system_prompt(
sub_report_prompt,
dict(
messages=[dict(role="user", content=sub_content_message)],
language=current_inputs.get("language"),
section_iscore=current_inputs.get("section_iscore", False),
report_type=report_type,
paragraph_style=current_inputs.get("paragraph_style", "detailed"),
require_summary_first=current_inputs.get("require_summary_first", False),
require_methodology_and_risk=current_inputs.get("require_methodology_and_risk", False),
audience_role=current_inputs.get("audience_role", ""),
tone=current_inputs.get("tone", ""),
),
)
if not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] section_idx: %s llm_input is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_input,
)
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER.value,
need_stream_out=True,
)
if not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] section_idx: %s llm_output is %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
llm_output,
)
if not llm_output or not llm_output.get("content"):
raise CustomValueException(
error_code=StatusCode.LLM_RESPONSE_ERROR.code,
message=f"LLM returned empty content for the section {current_inputs.get('section_idx', 1)}",
)
current_inputs["sub_report_content"] = llm_output.get("content")
if current_inputs.get("visualization_enable", True):
if not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] section_idx: [%s] "
"sub_report_content before insert visualization: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs.get("sub_report_content", ""),
)
try:
insert_result = await self._insert_visualization(current_inputs)
if insert_result.get("rs_success", False):
current_inputs["sub_report_content"] = insert_result.get(
"result", ""
)
else:
has_visuals = any(
isinstance(item, dict) and item.get("mermaid_content")
for item in current_inputs.get("visualization_result", [])
)
if has_visuals and not LogManager.is_sensitive():
logger.warning(
"%s [write_subsection_reports] section_idx: [%s] "
"insert visualization failed, use original content.",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
)
elif not has_visuals and not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] section_idx: [%s] "
"no visualization data to insert.",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
)
except Exception as e:
logger.warning(
"%s [write_subsection_reports] section_idx: [%s] "
"insert visualization error, use original content: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
str(e),
)
if not LogManager.is_sensitive():
logger.debug(
"%s [write_subsection_reports] section_idx: [%s] "
"sub_report_content after insert visualization: %s",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs.get("sub_report_content", ""),
)
current_inputs["sub_report_content"] = ensure_markdown_table_captions(
current_inputs["sub_report_content"],
current_inputs.get("language"),
current_inputs.get("section_idx", ""),
)
sub_report_summary = await self._generate_sub_report_summary(current_inputs)
current_inputs["sub_report_summary"] = sub_report_summary.get("result", "")
current_inputs["sub_report_content"] = self.add_references(
self.clean_markdown_headers(current_inputs["sub_report_content"]),
current_inputs.get("sub_section_references", []),
current_inputs.get("language"),
).strip()
if not current_inputs.get("sub_report_content", ""):
logger.error(
f"{EFFECT_SUB_REPORT_TAG} sub report content is blank, section_id: "
f"{current_inputs.get('section_idx', 1)}"
)
return dict(success=False, result="no sub report content found")
if not LogManager.is_sensitive():
logger.debug(
"%s[write_subsection_reports] success generate section [%s] sub_report, sub report content:\n[%s]",
EFFECT_SUB_REPORT_TAG,
current_inputs.get("section_idx", 1),
current_inputs["sub_report_content"],
extra={"skip_truncation": True},
)
return dict(success=True, result="success")
except Exception as e:
current_inputs["sub_report_content"] = ""
if LogManager.is_sensitive():
error_msg = f"Error generating section {current_inputs.get('section_idx', 1)} report"
else:
error_msg = f"Error generating section {current_inputs.get('section_idx', 1)} report: {str(e)}"
logger.error(
f"{EFFECT_SUB_REPORT_TAG} [write_subsection_reports] {error_msg}",
exc_info=True,
)
return dict(success=False, result=error_msg)
@staticmethod
def _select_visualization_from_classified_content(
classified_content_for_visualization,
):
selected_visualizations = []
for item in classified_content_for_visualization:
if not isinstance(item, dict):
continue
item_data_density = item.get("data_density")
if item_data_density is not None:
try:
if isinstance(item_data_density, (int, float)):
point = float(item_data_density)
else:
density_str = str(item_data_density)
if ":" in density_str:
point_str = density_str.split(":", 1)[1]
elif ":" in density_str:
point_str = density_str.split(":", 1)[1]
else:
point_str = density_str
point = float(point_str.strip())
if point >= 9.0:
selected_visualizations.append(item)
except (ValueError, IndexError):
logger.warning(
"%s [select_visualization] invalid data_density: %s",
EFFECT_SUB_REPORT_TAG,
item_data_density,
)
return selected_visualizations
async def _request_visualization_insert_plan(
self, context: VisualizationInsertPlanContext
) -> dict:
base_messages = list(context.messages)
active_messages = base_messages
max_attempt_num = context.current_inputs.get("max_generate_retry_num", 3)
for attempt in range(max_attempt_num):
llm_input = apply_system_prompt(
"insert_visualization",
dict(
messages=active_messages,
language=context.current_inputs.get("language"),
),
)
try:
llm_output = await ainvoke_llm_with_stats(
llm=self._llm,
messages=llm_input,
agent_name=AgentLlmName.SUB_REPORTER.value,
need_stream_out=False,
)
except Exception as e:
logger.error(
"%s LLM error when inserting visualization for section [%s]: %s",
EFFECT_SUB_REPORT_TAG,
context.current_inputs.get("section_idx", 1),
str(e),
)
return dict(rs_success=False, plan=None, result=context.original_report)
if not llm_output or not llm_output.get("content"):
logger.warning(
"%s [insert_visualization] section_idx: [%s] empty output, retrying (%s/%s).",
EFFECT_SUB_REPORT_TAG,
context.current_inputs.get("section_idx", 1),
attempt + 1,
max_attempt_num,
)
active_messages = base_messages[:1] + [
dict(
role="user",
content=(
"Your output is empty or invalid. Return JSON only with schema: "
'{"insertions":[{"after_row":int,"index":int},...]}'
),
)
]
continue
raw = (llm_output.get("content") or "").strip()
try:
plan = json.loads(raw)
except Exception:
plan = None
is_valid, error_msg = self._is_valid_insert_plan(
plan, context.report_lines, context.invalid_rows, context.mermaid_map
)
if not is_valid:
logger.warning(
"%s [insert_visualization] section_idx: [%s] "
"invalid insertion plan, retrying (%s/%s).",
EFFECT_SUB_REPORT_TAG,
context.current_inputs.get("section_idx", 1),
attempt + 1,
max_attempt_num,
)
active_messages = base_messages[:1] + [
dict(
role="user",
content=(
"Your previous output is invalid. Return JSON only with schema: "
'{"insertions":[{"after_row":int,"index":int},...]} '
"Issue: "
f"{error_msg}. "
"Ensure after_row is valid and index exists in visualization data."
),
)
]
continue
return dict(rs_success=True, plan=plan, result="")
return dict(rs_success=False, plan=None, result=context.original_report)
@staticmethod
def _apply_visualization_insertions(
context: VisualizationInsertRenderContext,
) -> str:
out_lines = list(context.report_lines)
offset = 0
for ins in context.insertions:
after_row = ins["after_row"]
index = ins["index"]
mermaid_code = context.mermaid_map.get(index, "")
if not mermaid_code:
continue
block = [
context.newline,
f"```mermaid{context.newline}",
*[f"{line}{context.newline}" for line in mermaid_code.splitlines()],
f"```{context.newline}",
]
title_meta = context.title_meta_map.get(index, {})
image_title = (title_meta.get("image_title") or "").strip()
citation_index = int(title_meta.get("citation_index", 0) or 0)
if not image_title:
image_title = (
"图表标题" if context.language == CHINESE else "Image Title"
)
citation_text = f"[citation:{citation_index}]" if citation_index > 0 else ""
safe_image_title = html.escape(image_title, quote=True)
title_with_citation = f"{safe_image_title}{citation_text}".strip()
if title_with_citation:
block.append(
f'<div style="text-align: center;">{context.newline}{context.newline}'
f"**{title_with_citation}**{context.newline}{context.newline}</div>"
f"{context.newline}{context.newline}"
)
insert_at = after_row + offset
prev_index = insert_at - 1
if 0 <= prev_index < len(out_lines):
if not out_lines[prev_index].endswith(("\n", "\r\n")):
out_lines[prev_index] += context.newline
out_lines[insert_at:insert_at] = block
offset += len(block)
return "".join(out_lines)
async def _insert_visualization(self, current_inputs: Dict) -> dict:
"""
Insert placeholders for visualization content in the markdown report.
"""
try:
report_markdown = current_inputs.get("sub_report_content", "")
if not isinstance(report_markdown, str):
report_markdown = str(report_markdown or "")
original_report = report_markdown
visualization_list = current_inputs.get("visualization_result", [])
if not isinstance(visualization_list, list) or not visualization_list:
return dict(rs_success=False, result=original_report)
report_lines = report_markdown.splitlines(keepends=True)
newline = "\r\n" if "\r\n" in report_markdown else "\n"
invalid_rows = Reporter._get_invalid_rows_for_insertion(report_lines)
numbered_lines = []
for i, line in enumerate(report_lines, 1):
line_clean = line.rstrip("\r\n")
numbered_lines.append(f"[ROW:{i}] {line_clean}{newline}")
numbered_report = "".join(numbered_lines)
visualization_dict = {}
mermaid_map: dict[int, str] = {}
title_meta_map: dict[int, dict] = {}
url_to_citation_index = {}
for classified_item in current_inputs.get("classified_content", []):
if isinstance(classified_item, dict) and "url" in classified_item:
url_to_citation_index[classified_item["url"]] = classified_item.get(
"index", 0
)
placeholder_index = 1
for item in visualization_list:
if (
isinstance(item, dict)
and "url" in item
and item.get("mermaid_content")
):
viz_payload = (
item.get("sub_section_visualization_content") or ""
).strip()
try:
viz_obj = json.loads(viz_payload) if viz_payload else None
except Exception:
viz_obj = None
if not isinstance(viz_obj, dict):
continue
mermaid_map[placeholder_index] = item.get("mermaid_content", "")
title_meta_map[placeholder_index] = {
"image_title": viz_obj.get("image_title", ""),
"citation_index": url_to_citation_index.get(
item.get("url", ""), 0
),
}
placement_item = {
"index": placeholder_index,
"image_title": viz_obj.get("image_title", ""),
"image_type": viz_obj.get("image_type", ""),
"unit": viz_obj.get("unit", ""),
"records": viz_obj.get("records", []),
}
visualization_dict[item["url"]] = placement_item
placeholder_index += 1
if not mermaid_map:
return dict(rs_success=False, result=original_report)
llm_input_message = numbered_report.rstrip("\r\n") + "\n\n"
llm_input_message += "=== VISUALIZATION DATA ===\n"
for item in current_inputs.get("classified_content", []):
if (
isinstance(item, dict)
and "url" in item
and item["url"] in visualization_dict
):
llm_input_message += (
json.dumps(visualization_dict[item["url"]], ensure_ascii=False)
+ "\n"
)
llm_input_message += "=== END VISUALIZATION DATA ===\n"
messages = [dict(role="user", content=llm_input_message)]
plan_result = await self._request_visualization_insert_plan(
VisualizationInsertPlanContext(
messages=messages,
current_inputs=current_inputs,
report_lines=report_lines,
invalid_rows=invalid_rows,
mermaid_map=mermaid_map,
original_report=original_report,
)
)
if not plan_result.get("rs_success") or not plan_result.get("plan"):
return dict(rs_success=False, result=original_report)
plan = plan_result["plan"]
insertions = sorted(
plan.get("insertions", []), key=lambda x: x["after_row"]
)
rendered = self._apply_visualization_insertions(
VisualizationInsertRenderContext(
report_lines=report_lines,
insertions=insertions,
mermaid_map=mermaid_map,
title_meta_map=title_meta_map,
newline=newline,
language=current_inputs.get("language"),
)
)
return dict(rs_success=True, result=rendered)
except Exception as e:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} Unexpected error when inserting visualization for the section "
f"{current_inputs.get('section_idx', 1)}: {str(e)}",
exc_info=True,
)
return dict(rs_success=False, result=original_report)
def _deduplicate_and_renumber_ref(raw_text: str) -> Tuple[str, Dict[str, int]]:
lines = raw_text.splitlines()
seen = {}
result = []
mapping = {}
index = 1
paragraph_id = 0
for line in lines:
line = line.strip()
if not line:
paragraph_id += 1
continue
if re.match(r"^\[1\]", line):
ref_index = 1
paragraph_id += 1
else:
match = re.match(r"^\[(\d+)\]", line)
if match:
ref_index = int(match.group(1))
else:
continue
content = re.sub(r"^\[\d+\]\s*", "", line).strip()
key = f"{paragraph_id}-{ref_index}"
if content not in seen:
seen[content] = index
result.append(f"[{index}] {content}")
index += 1
mapping[key] = seen[content]
return "\n\n".join(result), mapping
def _replace_citations_and_classified_index(
paragraphs: List[str],
classified_contents: List[List[Dict]],
ref_map: Dict[str, int],
) -> Tuple[List[str], List[List[Dict]]]:
if not ref_map or not classified_contents:
return paragraphs, classified_contents
updated_paragraphs: List[str] = []
updated_classified_contents: List[List[Dict]] = []
for i, para in enumerate(paragraphs):
sub_classified_contents = classified_contents[i]
if not sub_classified_contents:
updated_paragraphs.append(para)
updated_classified_contents.append([])
continue
index_map = {
str(item["index"]): ref_map.get(f"{i + 1}-{item['index']}")
for item in sub_classified_contents
}
updated_para = para
for original_index, final_index in index_map.items():
if final_index is not None:
updated_para = re.sub(
rf"\[citation:{original_index}\]",
f"[citation:{final_index}]",
updated_para,
)
updated_paragraphs.append(updated_para)
updated_sub_classified_content: List[Dict] = []
for item in sub_classified_contents:
updated_item = item.copy()
final_index = index_map.get(str(item["index"]))
if final_index is not None:
updated_item["index"] = final_index
updated_sub_classified_content.append(updated_item)
updated_classified_contents.append(updated_sub_classified_content)
return updated_paragraphs, updated_classified_contents
def _get_classified_infos(doc_infos: list, urls: list, max_source_id_count: int | None = 10):
"""根据分类结果 URL 提取下游写作所需的信息。
Args:
doc_infos: 信息收集节点输出的文档信息列表。
urls: 分类模型选中的文档 URL 列表。
Returns:
元组,包含分类后的引用与原文内容,以及匹配到的文档信息列表。
"""
def escape_markdown_text(value: object) -> str:
text = str(value or "")
text = re.sub(r"[\r\n\t]+", " ", text)
return re.sub(r"([\\`*_{}\[\]()#+\-.!|<>])", r"\\\1", text)
def format_reference_link(title_value: object, url_value: object) -> str:
title = escape_markdown_text(title_value)
url = str(url_value or "").strip()
if not url or any(ord(ch) < 32 or ord(ch) == 127 for ch in url):
escaped_url = escape_markdown_text(url)
return f"{title} ({escaped_url})" if title and escaped_url else title or escaped_url
parsed_url = urlparse(url)
scheme = parsed_url.scheme.lower()
is_allowed_url = scheme in {"http", "https", "localdataset"} and (
bool(parsed_url.netloc) if scheme in {"http", "https"} else bool(parsed_url.netloc or parsed_url.path)
)
if not is_allowed_url:
escaped_url = escape_markdown_text(url)
return f"{title} ({escaped_url})" if title and escaped_url else title or escaped_url
escaped_url = url.replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)")
return f"[{title}]({escaped_url})"
if not doc_infos:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} No classified infos found. can not get classified infos."
)
return {}, []
if not urls:
logger.error(
f"{EFFECT_SUB_REPORT_TAG} No urls found. can not get classified infos."
)
return {}, []
classified_infos = {"references": [], "core_content_list": []}
classified_doc_infos = []
doc_dict: dict[str, list[dict]] = {}
doc_order: dict[int, int] = {}
for index, item in enumerate(doc_infos):
doc_dict.setdefault(item["url"], []).append(item)
doc_order[id(item)] = index
matched_items = []
matched_order: dict[int, int] = {}
matched_by_url: dict[str, list[dict]] = {}
for url in urls:
for item in doc_dict.get(url, []):
matched_order[id(item)] = len(matched_items)
matched_items.append(item)
matched_by_url.setdefault(url, []).append(item)
def source_key_for(item: dict) -> str:
return build_doc_variant_key(item)
def item_rank_key(item: dict) -> tuple[float, int, int]:
return (
extract_doc_score(item).composite,
len(str(item.get("original_content") or "")),
-matched_order.get(id(item), doc_order.get(id(item), 0)),
)
def best_representatives(items: list[dict]) -> list[dict]:
source_representatives: dict[str, dict] = {}
for item in items:
source_key = source_key_for(item)
current = source_representatives.get(source_key)
if current is None or item_rank_key(item) > item_rank_key(current):
source_representatives[source_key] = item
return sorted(source_representatives.values(), key=item_rank_key, reverse=True)
selected_items: list[dict] = []
selected_source_keys: set[str] = set()
max_count = None if max_source_id_count is None else max(0, int(max_source_id_count))
if max_count is not None:
for url in urls:
if len(selected_items) >= max_count:
break
representatives = best_representatives(matched_by_url.get(url, []))
if not representatives:
continue
top_item = representatives[0]
source_key = source_key_for(top_item)
if source_key in selected_source_keys:
continue
selected_items.append(top_item)
selected_source_keys.add(source_key)
remaining_representatives = best_representatives(matched_items)
if max_count is None:
selected_items = remaining_representatives
else:
for item in remaining_representatives:
if len(selected_items) >= max_count:
break
source_key = source_key_for(item)
if source_key in selected_source_keys:
continue
selected_items.append(item)
selected_source_keys.add(source_key)
if max_count is not None:
selected_items = selected_items[:max_count]
seen_reference_urls: set[str] = set()
for item in selected_items:
item_url = str(item.get("url") or "")
if item_url not in seen_reference_urls:
classified_infos["references"].append(
format_reference_link(item.get("title", ""), item_url)
)
seen_reference_urls.add(item_url)
classified_infos["core_content_list"].append(item.get("original_content", ""))
classified_doc_infos.append(item)
return classified_infos, classified_doc_infos