import logging
import os
import subprocess
import sys
import requests
from oe_cli_mcp_server.server import config
FASTAPI_BASE_URL = f"http://127.0.0.1:{config.fastapi_port}"
PUBLIC_CONFIG_PATH = "config/private/mcp_server"
logger = logging.getLogger(__name__)
def send_http_request(action: str, params: dict = None):
"""调用 FastAPI 接口(替代原 Socket 调用)"""
try:
if action == "add":
url = f"{FASTAPI_BASE_URL}/package/add"
response = requests.post(url, params={"value": params.get("value")})
elif action == "remove":
url = f"{FASTAPI_BASE_URL}/package/remove"
response = requests.post(url, params={"value": params.get("value")})
elif action == "list":
url = f"{FASTAPI_BASE_URL}/package/list"
response = requests.get(url)
elif action == "init":
url = f"{FASTAPI_BASE_URL}/package/init"
response = requests.post(url)
else:
return {"success": False, "message": f"不支持的操作:{action}"}
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"success": False, "message": f"接口调用失败:{str(e)}"}
def handle_add(pkg_input):
"""处理 -add 命令"""
if os.path.isfile(pkg_input) and pkg_input.endswith(".zip"):
params = {"value": os.path.abspath(pkg_input)}
else:
logger.error(f"不支持的包类型:{pkg_input}(仅支持 zip 文件)")
raise SystemExit(1)
result = send_http_request("add", params)
if result["success"]:
logger.info(f"{result['message']}")
else:
logger.error(f"{result['message']}")
return result["success"]
def handle_remove(pkg_input):
"""处理 -remove 命令"""
params = {"value": pkg_input}
result = send_http_request("remove", params)
if result["success"]:
logger.info(f"{result['message']}")
else:
logger.error(f"{result['message']}")
if result["success"]:
logger.info("\n正在重启服务使变更生效...")
result["success"] = handle_restart()
return result["success"]
def handle_tool():
"""处理 -tool 命令"""
result = send_http_request("list")
if not result["success"]:
logger.error(f"{result['message']}")
return False
logger.info("\n当前已加载工具包(共%s个,总函数数:%s):",
result['data']['total_packages'],
result['data']['total_functions'])
for pkg, funcs in result["data"]["pkg_funcs"].items():
logger.info("- %s:%s个工具", pkg, len(funcs))
if funcs:
logger.info(" └─ %s", ', '.join(funcs))
return True
def handle_init():
"""处理 -init 命令"""
result = send_http_request("init")
if result["success"]:
logger.info(f"{result['message']}")
else:
logger.error(f"{result['message']}")
if result["success"]:
logger.info("\n正在重启服务使初始化生效...")
result["success"] = handle_restart()
return result["success"]
def handle_start():
"""处理 -start 命令"""
try:
subprocess.run(["sudo", "systemctl", "start", "oe-mcp"], check=True)
logger.info("服务启动成功")
return True
except Exception as e:
logger.error(f"启动失败:{str(e)}")
return False
def handle_stop():
"""处理 -stop 命令"""
try:
subprocess.run(["sudo", "systemctl", "stop", "oe-mcp"], check=True)
logger.info("服务终止成功")
return True
except Exception as e:
logger.error(f"终止失败:{str(e)}")
return False
def handle_restart():
"""处理 -restart 命令"""
try:
subprocess.run(["sudo", "systemctl", "restart", "oe-mcp"], check=True)
logger.info("服务重启成功")
return True
except Exception as e:
logger.error(f"重启失败:{str(e)}")
return False
def handle_log():
"""处理 -log 命令"""
try:
subprocess.run(["sudo", "journalctl", "-u", "oe-mcp", "-f"], check=True)
except KeyboardInterrupt:
logger.info("\n日志查看退出")
except Exception as e:
logger.error(f"查看失败:{str(e)}")
return True