import asyncio
import json
import traceback
from langchain_core.messages import AIMessage, HumanMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from web_apps import app
from web_apps.llm.agents.data_extract_langgraph import DataExtractLangGraph as DataExtractAgent
from web_apps.llm.llm_utils import get_llm, resolve_model_config
from utils.common_utils import gen_uuid, get_now_time, parse_json
from web_apps.rag.services.rag_service import get_knowledge
from web_apps.llm.services.tool_service import get_tools
from web_apps.llm.tools.data_tools import get_chat_data_tools, get_chat_data_tool
from web_apps.llm.agents.tools_call_deepagents import ToolsCallDeepAgent
from web_apps.llm.services.conversation_service import get_or_create_conversation, get_messages, add_message
from web_apps.llm.tools.memory_tools import get_memory_tools
from web_apps.llm.services.conversation_service import add_archival_memory
from utils.logger.logger import get_logger
logger = get_logger(p_name='system_log', f_name='llm_services', log_level='INFO')
EVENT_TYPE_MAP = {
'text': "MESSAGE",
'html': "HTML",
'data': "DATATABLE",
'step': "STEP",
'flow': "STEP",
'thinking': "THINKING",
'waiting_feedback': "WAITING_FEEDBACK"
}
def format_stream_event(conversation_id, chunk, event_type=None):
"""
统一的流式事件格式化函数
Args:
conversation_id: 会话ID
chunk: 数据块(可以是 dict 或 str)
event_type: 事件类型(如果为None,从chunk中推断)
Returns:
格式化的 SSE 事件字符串
"""
if isinstance(chunk, dict):
content = chunk.get('content', '')
chunk_type = chunk.get('type', 'text')
event = event_type or EVENT_TYPE_MAP.get(chunk_type, "MESSAGE")
else:
content = chunk
event = event_type or "MESSAGE"
msg = {
"conversationId": conversation_id,
"data": {
"message": content
},
"event": event
}
return f"data:{json.dumps(msg, ensure_ascii=False)}\n\n"
def format_end_event(conversation_id):
"""格式化结束事件"""
msg = {
"conversationId": conversation_id,
"data": None,
"event": "MESSAGE_END"
}
return f"data:{json.dumps(msg, ensure_ascii=False)}\n\n"
def format_error_event(conversation_id, error_message):
"""格式化错误事件"""
msg = {
"conversationId": conversation_id,
"data": {
"message": error_message
},
"event": "ERROR"
}
return f"data:{json.dumps(msg, ensure_ascii=False)}\n\n"
def llm_query_data(reader, llm, query_prompt):
'''
使用llm查询数据
'''
agent = DataExtractAgent(llm, reader)
res = agent.run(query_prompt)
llm_result = agent.llm_result
return True, res, llm_result
async def _get_mcp_tools_async(mcp_tool_config):
"""
异步获取 MCP 工具(支持部分失败容错)
Args:
mcp_tool_config: MCP 工具配置字典
Returns:
MCP 工具列表(即使部分服务器失败也返回成功的工具)
"""
all_tools = []
failed_servers = []
for server_name, server_config in mcp_tool_config.items():
try:
logger.info(f"正在加载 MCP 服务器: {server_name}")
single_server_config = {server_name: server_config}
client = MultiServerMCPClient(single_server_config)
server_tools = await client.get_tools()
logger.info(f"成功从服务器 '{server_name}' 获取 {len(server_tools)} 个工具: {[tool.name for tool in server_tools]}")
all_tools.extend(server_tools)
except Exception as e:
logger.error(f"从服务器 '{server_name}' 获取工具失败: {str(e)}")
failed_servers.append(server_name)
continue
if failed_servers:
logger.warning(f"以下 MCP 服务器加载失败: {failed_servers}")
logger.info(f"MCP 工具加载完成,总共成功获取 {len(all_tools)} 个工具")
return all_tools
def _get_mcp_tools_sync(mcp_tool_config):
"""
同步方式获取 MCP 工具(在同步环境中调用)
Args:
mcp_tool_config: MCP 工具配置字典
Returns:
MCP 工具列表
"""
try:
try:
loop = asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, _get_mcp_tools_async(mcp_tool_config))
return future.result()
except RuntimeError:
return asyncio.run(_get_mcp_tools_async(mcp_tool_config))
except Exception as e:
logger.error(f"同步获取 MCP 工具失败: {str(e)}")
return []
def format_history(messages):
history_str = []
for msg in messages:
prefix = "Human" if isinstance(msg, HumanMessage) else "AI"
history_str.append(f"{prefix}: {msg.content}")
return "\n".join(history_str[-6:])
def generate_history_summary(messages, llm=None):
if llm is None:
llm = get_llm()
history_text = "\n".join([f"{m['question']}\n{m['answer']}" for m in messages])
return llm.invoke(
"请将以下对话历史压缩成一段保留核心事实的摘要,"
"用第三人称表述且保留数据细节:\n" + history_text
).content
def generate_prompt(content):
llm = get_llm()
prompt = f"请为以下内容:\n{content}\n\n生成一个详细格式的llm prompt,只返回prompt,不要其他内容"
for c in llm.stream(prompt):
msg = {
"conversationId": '',
"data": {
"message": c.content
},
"event": "MESSAGE"
}
t = f"data:{json.dumps(msg, ensure_ascii=False)}"
yield f"{t}\n\n"
msg = {
"conversationId": '',
"data": None,
"event": "MESSAGE_END"
}
t = f"data:{json.dumps(msg, ensure_ascii=False)}"
yield f"{t}\n\n"
class ChatHandler:
def __init__(self, req_dict):
self.req_dict = req_dict
self.conversation_id = req_dict.get('conversationId')
if not self.conversation_id:
self.conversation_id = gen_uuid()
self.app_id = req_dict.get('appId', '')
self.chat_config = parse_json(req_dict.get('chatConfig'), {})
self.message = req_dict.get('message', '')
self.metadata = json.loads(self.chat_config.get('metadata', '{}'))
model_id = self.chat_config.get('modelId', 'default')
llm_config = {'conversation_id': self.conversation_id, **self.metadata}
if model_id and model_id != 'default':
llm_config.update(resolve_model_config(model_id))
self.llm = get_llm(llm_config)
self.system_prompt = self.chat_config.get('prompt', '')
self.history_size = self.chat_config.get('msgNum', 3)
def prepare_context(self, user_info = None):
if user_info is None:
user_info = {'id': 0, 'user_name': 'test'}
"""准备聊天上下文,返回(prompt, llm, agent_enable, tools)"""
datamodelIds = self.chat_config.get('datamodelIds', '')
knowledgeIds = self.chat_config.get('knowledgeIds', '')
toolIds = self.chat_config.get('toolIds', '')
knowledge = ''
if knowledgeIds != '':
rag_metadata = parse_json({'dataset_id': knowledgeIds}, {'dataset_id': '1'})
if datamodelIds:
rag_metadata['datamodel_id'] = datamodelIds
if self.metadata.get('topNumber'):
rag_metadata['k'] = self.metadata.get('topNumber')
if self.metadata.get('similarity'):
rag_metadata['score_threshold'] = self.metadata.get('similarity')
knowledge = get_knowledge(self.message, metadata=rag_metadata)
memory_enable = self.metadata.get('multiSession')
core_memory = ''
chat_history = []
conversation = get_or_create_conversation(
self.conversation_id,
{'user_id': user_info.get('id'), 'user_name': user_info.get('username'), 'message': self.message, 'app_id': self.app_id}
)
history_messages, _ = get_messages(self.conversation_id, page=1, size=self.history_size)
for msg in history_messages:
chat_history.extend([HumanMessage(content=msg["question"]), AIMessage(content=msg["answer"])])
if memory_enable:
core_memory = conversation.core_memory
knowledge_section = f"结合知识库信息,回答用户的问题,若知识库中无相关信息,请尝试直接回答。\n知识库:{knowledge}\n" if knowledge else ''
core_memory_section = f"[核心记忆]\n{core_memory}\n\n" if core_memory else ''
history_section = format_history(chat_history) if chat_history else ""
history_part = f"对话历史:\n{history_section}\n\n" if history_section else ""
system_part = f"System: {self.system_prompt}\n\n" if self.system_prompt else ''
prompt = (
f"{system_part}"
f"{core_memory_section}"
f"{history_part}"
f"{knowledge_section}"
"当前对话:\n"
f"Human: {self.message}\n"
"AI:"
)
tools = []
agent_enable = toolIds != ''
if datamodelIds:
datamodel_ids = datamodelIds.split(',') if isinstance(datamodelIds, str) else datamodelIds
data_tools = get_chat_data_tools(datamodel_ids)
enable_review = self.metadata.get('dataReview', '0') == '1'
for data_tool in data_tools:
data_tool.conversation_id = self.conversation_id
data_tool.enable_review = enable_review
data_tool.llm = self.llm
tools += data_tools
agent_enable = True
if memory_enable:
tools += get_memory_tools(self.conversation_id)
agent_enable = True
builtin_tools, mcp_tool_config = get_tools(toolIds)
tools += builtin_tools
if mcp_tool_config:
logger.info(f"检测到 MCP 工具配置,开始获取 MCP 工具: {list(mcp_tool_config.keys())}")
mcp_tools = _get_mcp_tools_sync(mcp_tool_config)
if mcp_tools:
tools += mcp_tools
agent_enable = True
logger.info(f"成功添加 {len(mcp_tools)} 个 MCP 工具到工具列表")
else:
logger.warning("MCP 工具获取失败或为空")
return prompt, self.llm, agent_enable, tools
def handle_chat_close(self, answer):
if answer != '':
add_message(self.conversation_id, self.message, answer)
memory_enable = self.metadata.get('multiSession', '0') == '1'
if memory_enable:
history_messages, total = get_messages(self.conversation_id, page=1, size=self.history_size)
if total > self.history_size and total % self.history_size == 0:
archived_messages, _ = get_messages(
self.conversation_id,
page=2,
size=self.history_size
)
if archived_messages:
summary = generate_history_summary(archived_messages, self.llm)
add_archival_memory(self.conversation_id, summary)
def chat_generate(req_dict, user_info=None):
'''
流式对话 - 使用 LangGraph Agent 架构
工作流程:
1. 准备上下文(知识库、记忆、历史)
2. 路由执行:
- 如果有工具 → ToolsCallDeepAgent
- 否则 → 直接 LLM 流式回答
3. 统一格式化输出
'''
with app.app_context():
chat_handler = ChatHandler(req_dict)
conversation_id = chat_handler.conversation_id
answer = ''
try:
prompt, llm, agent_enable, tools = chat_handler.prepare_context(user_info)
if agent_enable and tools:
agent = ToolsCallDeepAgent(
tools=tools,
llm=llm,
system_prompt=chat_handler.system_prompt
)
for chunk in agent.chat(prompt):
if chunk.get('type') == 'text':
answer += str(chunk.get('content', ''))
yield format_stream_event(conversation_id, chunk)
else:
for c in llm.stream(prompt):
answer += c.content
yield format_stream_event(conversation_id, c.content)
yield format_end_event(conversation_id)
except Exception as e:
error_msg = f"处理出错: {str(e)}\n{traceback.format_exc()}"
yield format_error_event(conversation_id, error_msg)
finally:
chat_handler.handle_chat_close(answer)
def chat_run(req_dict, user_info=None):
'''
同步对话 - 返回完整结果(使用 LangGraph 架构)
工作流程:
1. 准备上下文(知识库、记忆、历史)
2. 路由执行:
- 如果有工具 → ToolsCallDeepAgent
- 否则 → 直接 LLM 回答
3. 返回统一格式结果
'''
with app.app_context():
chat_handler = ChatHandler(req_dict)
answer = ''
try:
prompt, llm, agent_enable, tools = chat_handler.prepare_context(user_info)
if agent_enable and tools:
agent = ToolsCallDeepAgent(
tools=tools,
llm=llm,
system_prompt=chat_handler.system_prompt
)
output = agent.run(prompt)
else:
output = llm.invoke(prompt).content
if isinstance(output, dict) and 'content' in output and 'type' in output:
if output['type'] == 'text':
answer = str(output['content'])
return output
else:
answer = str(output)
return {'content': answer, 'type': 'text'}
except Exception as e:
error_msg = f'处理出错:{str(e)}\n{traceback.format_exc()}'
return {'content': error_msg, 'type': 'text'}
finally:
chat_handler.handle_chat_close(answer)
def data_chat_generate(req_dict):
'''
数据对话 - 流式接口(使用 LangGraph 架构)
工作流程:
1. 验证 LLM 配置
2. 准备数据工具和知识库
3. 获取历史对话作为上下文
4. 路由执行:
- 如果有数据工具 → ToolsCallDeepAgent + DataChatTool
- 否则 → 直接 LLM 回答
5. 统一格式化输出
'''
with app.app_context():
message = req_dict['message']
model_id = req_dict.get('model_id', '')
conversation_id = req_dict.get('conversationId')
enable_review = req_dict.get('enable_review', False)
chat_config = parse_json(req_dict.get('chatConfig'), {})
if not enable_review and chat_config:
metadata = parse_json(chat_config.get('metadata', '{}'), {})
enable_review = metadata.get('dataReview', '0') == '1'
if not conversation_id:
conversation_id = gen_uuid()
try:
_llm = get_llm()
if _llm is None:
yield format_error_event(conversation_id, '未找到对应llm配置!')
return
data_tool = get_chat_data_tool(model_id, is_chat=True)
if data_tool:
data_tool.conversation_id = conversation_id
data_tool.enable_review = enable_review
if data_tool is None:
for c in _llm.stream(message):
yield format_stream_event(conversation_id, c.content)
else:
knowledge = get_knowledge(message, metadata={'datamodel_id': model_id})
if knowledge:
search_step = {
'content': {
'title': '检索知识库',
'content': knowledge,
'time': get_now_time(res_type='datetime')
},
'type': 'flow'
}
yield format_stream_event(conversation_id, search_step)
data_tool.knowledge = knowledge
history_messages, _ = get_messages(conversation_id, page=1, size=3)
history_context = ""
if history_messages:
history_context = "\n### 对话历史\n"
for msg in history_messages:
history_context += f"human: {msg['question']}\nAI: {msg.get('answer', '')}\n"
data_tool.set_history_context(history_context)
agent = ToolsCallDeepAgent(
tools=[data_tool],
llm=_llm,
system_prompt=f"{history_context}\n你是一个数据分析助手,能够帮助用户分析数据。"
)
final_answer = ""
for chunk in agent.chat(message):
if chunk.get('type') == 'text':
final_answer += chunk.get('content', '')
yield format_stream_event(conversation_id, chunk)
if final_answer == '' and data_tool._agent and data_tool._agent.answer != '':
final_answer = data_tool._agent.answer
if final_answer:
add_message(conversation_id, message, final_answer)
yield format_end_event(conversation_id)
except Exception as e:
error_msg = f"数据对话处理出错: {str(e)}\n{traceback.format_exc()}"
yield format_error_event(conversation_id, error_msg)
raise e
if __name__ == '__main__':
req_dict = {
"conversationId": "test1111111",
"message": "字典表中字典项最多的10个字典, 画出统计图",
'chatConfig': {
"msgNum": 1,
"prologue": None,
"modelId": "default",
"presetQuestion": "",
"datamodelIds": "c20ae41fcaa74597ab83293add482ff0",
"toolIds": "now_time,a7128b38-b866-41ea-a912-2f25a65f10ec,fe504d80-41ad-4e5e-90c9-aca37d74f3c3",
"metadata": '{"multiSession": true, "dataReview": "1"}'
},
}
with app.app_context():
for i in chat_generate(req_dict, {}):
print(i)