import os
import shlex
import subprocess
import threading
try:
from queue import Queue
except ImportError:
from Queue import Queue
from ansible.module_utils.check_output_manager import check_event
from ansible.module_utils.check_utils import CheckUtil as util
from ansible.module_utils.deepseek_cntr.mindie_service_config import CONFIG_FILE_MAP, SINGLE_NODE, DOUBLE_NODE
BLOCKSIZE = 1024 * 1024 * 100
class DeepseekCntrCheck:
def __init__(self, module, error_messages):
self.module = module
self.worker_num = module.params["worker_num"]
self.cntr_mnt_path = module.params["cntr_mnt_path"]
self.weight_mount_path = self.module.params["weight_mount_path"]
self.model_weight_path = self.module.params["model_weight_path"]
self.mindie_image_name = self.module.params["mindie_image_name"]
self.mindie_image_file = self.module.params["mindie_image_file"]
self.npu_info = module.params["npu_info"]
self.worker_num = module.params["worker_num"]
self.master_ip = module.params["master_ip"]
self.worker_groups = module.params["worker_groups"]
self.error_messages = error_messages
self._queue = Queue()
@check_event
def check_deepseek_cntr(self):
self.check_dependency()
self.check_network()
self.check_mount_path()
self.check_image_name_and_file()
self.check_worker_num_and_master_ip()
def check_dependency(self):
if not self.module.get_bin_path("docker"):
util.record_error("[ASCEND][ERROR] Can not find docker", self.error_messages)
if not self.module.get_bin_path('npu-smi'):
util.record_error("[ASCEND][ERROR] Can not find npu-smi.", self.error_messages)
if not self.module.get_bin_path('hccn_tool'):
util.record_error("[ASCEND][ERROR] Can not find hccn_tool.", self.error_messages)
def check_network(self):
if not self.module.get_bin_path('npu-smi'):
return
rc, out, err = self.module.run_command("npu-smi info -t topo", check_rc=False)
if rc != 0:
util.record_error("[ASCEND][ERROR] Failed to check HCCS status. "
"Command 'npu-smi info -t topo' failed with return code {}. "
"Error message: {}. "
"Please verify npu-smi installation and NPU device accessibility.".format(
rc, err),
self.error_messages)
elif "HCCS" not in out:
util.record_error("[ASCEND][ERROR] HCCS is not enabled. Please enable HCCS before proceeding.",
self.error_messages)
if self.worker_num == 1:
return
find_cmd = "npu-smi info -l"
_, outputs, _ = self.module.run_command(find_cmd, check_rc=True)
npu_ids = []
for line in outputs.split('\n'):
if "NPU ID" in line:
npu_ids.append(line.split(":")[-1].strip())
if not npu_ids:
util.record_error("[ASCEND][ERROR] Can not find any npu device, using command:{}".format(find_cmd),
self.error_messages)
for npu_id in npu_ids:
check_cmd = {
"hccn_tool -i {} -ip -g".format(npu_id): "ipaddr",
}
self.run_commands_in_threads(check_cmd)
error_messages = []
while not self._queue.empty():
cmd, msg = self._queue.get()
error_messages.append("[ASCEND][ERROR] Execute cmd {} failed, {}".format(cmd, msg))
for error_msg in error_messages:
util.record_error(error_msg, self.error_messages)
def run_commands_in_threads(self, commands):
threads = []
for cmd, expect_out in commands.items():
thread = threading.Thread(target=self.run_command, args=(cmd, expect_out))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
def run_command(self, cmd, expect_out=None):
try:
rc, out, err = self.module.run_command(cmd, check_rc=False)
output = out + err
if rc != 0 or (expect_out and expect_out not in output):
self._queue.put((cmd, output))
except Exception as e:
self._queue.put((cmd, str(e)))
def check_mount_path(self):
"""
检查是否提供了挂载路径和路径是否实际存在
"""
paths_to_check = {
'weight_mount_path': self.weight_mount_path,
'model_weight_path': self.model_weight_path,
'cntr_mnt_path': self.cntr_mnt_path
}
for path_name, path_value in paths_to_check.items():
if not path_value:
util.record_error('[ASCEND][ERROR] Please provide a value for the {} parameter.'.format(path_name),
self.error_messages)
if not os.path.exists(self.weight_mount_path):
util.record_error('[ASCEND][ERROR] weight_mount_path: {} is not existed.'.format(self.weight_mount_path),
self.error_messages)
return
if os.path.islink(self.weight_mount_path):
util.record_error(
"[ASCEND][ERROR] The specified weight_mount_path '{}' should not be a symbolic link.".format(
self.weight_mount_path), self.error_messages)
return
mount_path = os.path.abspath(self.cntr_mnt_path)
model_path = os.path.abspath(self.model_weight_path)
if not model_path.startswith(mount_path):
util.record_error(
'[ASCEND][ERROR] The model_weight_path must be under the cntr_mnt_path directory.',
self.error_messages)
def check_image_name_and_file(self):
if not self.mindie_image_name and not self.mindie_image_file:
return
if self.mindie_image_file:
if not os.path.exists(self.mindie_image_file):
util.record_error("[ASCEND][ERROR] The specified mindie_image_file '{}' does not exist.".format(
self.mindie_image_file), self.error_messages)
if os.path.islink(self.mindie_image_file):
util.record_error(
"[ASCEND][ERROR] The specified mindie_image_file '{}' should not be a symbolic link.".format(
self.mindie_image_file), self.error_messages)
if self.mindie_image_name:
if not isinstance(self.mindie_image_name, str):
util.record_error("[ASCEND][ERROR] The mindie_image_name parameter must be a string, got {}.".format(
type(self.mindie_image_name).__name__), self.error_messages)
return
if ':' not in self.mindie_image_name:
util.record_error(
"[ASCEND][ERROR] The mindie_image_name '{}' must include a tag. "
"Valid format example: mindie:2.1.RC1-xx-xx-py311-ubuntu22.04-aarch64".format(
self.mindie_image_name), self.error_messages)
docker_exists = self.module.get_bin_path("docker")
if not docker_exists:
util.record_error(
"[ASCEND][ERROR] Docker not found. Image '{}' not verified locally.".format(
self.mindie_image_name), self.error_messages)
return
check_cmd = ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"]
rc, out, err = self.module.run_command(check_cmd)
if rc != 0:
util.record_error(
"[ASCEND][ERROR] Failed to list docker images. Command '{}' failed with return code {}. "
"Error message: {}".format(" ".join(check_cmd), rc, err.strip()),
self.error_messages)
return
image_lines = out.splitlines()
if self.mindie_image_name not in image_lines:
util.record_error(
"[ASCEND][ERROR] mindie_image_name: '{}' not found locally.".format(self.mindie_image_name),
self.error_messages)
def check_worker_num_and_master_ip(self):
"""
检查 worker_num 和 master_ip 的合法性,基于 CONFIG_FILE_MAP 中定义的支持配置
"""
scene = self.npu_info.get("scene", "")
if self.worker_num not in CONFIG_FILE_MAP:
util.record_error(
"[ASCEND][ERROR] Unsupported worker_num: {}. "
"Supported worker numbers are: {}".format(
self.worker_num,
list(CONFIG_FILE_MAP.keys())),
self.error_messages
)
return
worker_config = CONFIG_FILE_MAP.get(self.worker_num, {})
if scene not in worker_config:
util.record_error(
"[ASCEND][ERROR] For worker_num={}, unsupported on this device ".format(
self.worker_num,
self.error_messages
))
return
if self.worker_num == DOUBLE_NODE:
if not self.master_ip:
util.record_error(
"[ASCEND][ERROR] When worker_num > 1 , mindie_master must be provided",
self.error_messages
)
return
if self.master_ip not in self.worker_groups:
util.record_error(
"[ASCEND][ERROR] mindie_master '{}' must be in worker list {}".format(self.master_ip,
self.worker_groups),
self.error_messages
)