import os
import json
import shutil
import subprocess
import time
import warnings
from executor.exec_command import CommandExecutor
from utils import change_dict, detect_free_npu_card
class ExecSGLangServer(CommandExecutor):
"""SGLang 服务执行器,用于 ST 场景下启动带 ms_service_profiler 的 SGLang 推理服务"""
def __init__(self, workspace_path):
super().__init__()
self.workspace_path = workspace_path
self.model_path = "/data/Qwen2.5-0.5B-Instruct"
self.port = 7399
self.prof_config_path = os.path.join(workspace_path, "ms_service_profiler_config.json")
self.prof_config = {}
self.set_prof_config(enable=0, prof_dir=os.path.join(workspace_path, "prof_data"))
def set_model_path(self, model_path):
self.model_path = model_path
def set_port(self, port):
self.port = port
def set_prof_config(self, **kwargs):
for key, value in kwargs.items():
change_dict(self.prof_config, key, value=value)
self._json_dump(self.prof_config, self.prof_config_path)
def _json_dump(self, obj, dump_path):
with open(file=dump_path, mode="wt") as f:
json.dump(obj, f, indent=4)
os.chmod(dump_path, 0o640)
def curl_test(self):
"""使用 SGLang OpenAI 兼容的 /v1/completions 接口发送测试请求。使用 subprocess.run 避免 execute() 触发 _reset() 终止 SGLang 进程"""
curl_body = json.dumps({
"model": self.model_path,
"prompt": "Beijing is a",
"max_tokens": 5,
"temperature": 0,
})
for attempt in range(5):
result = subprocess.run(
[
"curl", f"http://127.0.0.1:{self.port}/v1/completions",
"-H", "Content-Type: application/json",
"-X", "POST",
"-d", curl_body,
],
capture_output=True,
timeout=60,
)
if result.returncode == 0:
return True
if attempt < 4:
time.sleep(2)
return False
def ready_go(self):
"""启动 SGLang 服务,支持卡失败时换卡重试"""
self.set_prof_config(enable=1)
base_env = {
"SERVICE_PROF_CONFIG_PATH": os.path.abspath(self.prof_config_path),
}
cmd = [
"python", "-m", "sglang.launch_server",
"--model-path", self.model_path,
"--device", "npu",
"--mem-fraction-static", "0.8",
"--port", str(self.port)
]
env = base_env.copy()
self.execute(cmd, env=env)
exit_code, has_output = self._wait_server_ready()
if exit_code is None and has_output == 0:
return True
return False
def _wait_server_ready(self):
"""等待 SGLang 服务就绪"""
exit_code, has_output = self.wait("Uvicorn running on", timeout=600)
return exit_code, has_output