import subprocess
import os
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch, call
import pytest
from msserviceprofiler.modelevalstate.config.config import (
CUSTOM_OUTPUT,
MODEL_EVAL_STATE_CONFIG_PATH,
OptimizerConfigField, get_settings
)
from msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process import CustomProcess, tempfile, os, BaseDataField
def test_before_run_no_run_params(monkeypatch):
monkeypatch.setattr(tempfile, "mkstemp", lambda prefix="": (1234, 'tempfile'))
monkeypatch.setattr(os, "environ", {})
process = CustomProcess()
process.before_run()
assert process.run_log_fp == 1234
assert process.run_log == 'tempfile'
assert process.run_log_offset == 0
def test_before_run_with_run_params():
process = CustomProcess()
process.command = ["benchmark", "$CONCURRENCY", "$REQUESTRATE"]
run_params = (
OptimizerConfigField(name="CONCURRENCY", config_position="env", min=10, max=1000, dtype="int", value=10),
OptimizerConfigField(name="REQUESTRATE", config_position="env", min=0.1, max=0.7, value=0.3, dtype="float"),
)
process.before_run(run_params)
assert process.command == ["benchmark", "10", "0.3"]
def test_before_run_env_var_already_set(monkeypatch):
monkeypatch.setattr(os, "environ", {CUSTOM_OUTPUT: "/result",
MODEL_EVAL_STATE_CONFIG_PATH: "config.toml"})
process = CustomProcess()
process.before_run()
assert os.environ[CUSTOM_OUTPUT] == "/result"
assert os.environ[MODEL_EVAL_STATE_CONFIG_PATH] == "config.toml"
def test_check_success_process_still_running(tmpdir):
custom_process = CustomProcess()
custom_process.run_log = Path(tmpdir).joinpath("run_log")
custom_process.run_log_offset = 0
with open(custom_process.run_log, "w") as f:
f.write("test")
custom_process.process = Mock()
custom_process.process.poll.return_value = None
custom_process.print_log = True
def test_check_success_process_succeeded(tmpdir):
custom_process = CustomProcess()
custom_process.run_log = Path(tmpdir).joinpath("run_log")
custom_process.run_log_offset = 0
with open(custom_process.run_log, "w") as f:
f.write("test")
custom_process.process = Mock()
custom_process.process.poll.return_value = 0
custom_process.print_log = True
@patch("psutil.process_iter")
@patch("msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.kill_process")
def test_check_env_no_residual_process(mock_kill_process, mock_process_iter):
mock_process_iter.return_value = [
MagicMock(info={"pid": 1, "name": "not_process"}),
MagicMock(info={"pid": 2, "name": "also_not_target"}),
MagicMock()
]
CustomProcess.kill_residual_process("target_process")
mock_process_iter.assert_called_once()
mock_kill_process.assert_not_called()
@patch("psutil.process_iter")
@patch("msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.kill_process")
def test_check_env_with_residual_process(mock_kill_process, mock_process_iter):
mock_process_iter.return_value = [
MagicMock(info={"pid": 1, "name": "not_target_process"}),
MagicMock(info={"pid": 2, "name": "target_process"}),
MagicMock(info={"pid": 3, "name": "another_target_process"})
]
CustomProcess.kill_residual_process("target_process,another_target_process")
mock_kill_process.assert_any_call("target_process")
mock_kill_process.assert_any_call("another_target_process")
@patch("psutil.process_iter")
@patch("msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.kill_process")
def test_check_env_kill_process_exception(mock_kill_process, mock_process_iter):
mock_process_iter.return_value = [
MagicMock(info={"pid": 1, "name": "target_process"})
]
mock_kill_process.side_effect = Exception("Failed to kill process")
CustomProcess.kill_residual_process("target_process")
mock_kill_process.assert_called_once_with("target_process")
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.kill_residual_process')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.before_run')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.subprocess.Popen')
def test_run_process_name_exists_and_check_env_success(mock_popen, mock_before_run, mock_check_env):
process = CustomProcess()
process.process_name = 'test_process'
process.command = ['test_command']
process.work_path = '/test/work/path'
process.run_log_fp = MagicMock()
process.run_log = '/test/run/log'
process.run()
mock_check_env.assert_called_once_with('test_process')
mock_before_run.assert_called_once()
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.kill_residual_process')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.before_run')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.subprocess.Popen')
def test_run_process_name_exists_and_check_env_fail(mock_popen, mock_before_run, mock_check_env):
process = CustomProcess()
process.process_name = 'test_process'
process.command = ['test_command']
process.work_path = '/test/work/path'
process.run_log_fp = MagicMock()
process.run_log = '/test/run/log'
mock_check_env.side_effect = Exception('kill_residual_process failed')
process.run()
mock_check_env.assert_called_once_with('test_process')
mock_before_run.assert_called_once()
mock_popen.assert_called_once()
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.before_run')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.subprocess.Popen')
def test_run_process_name_not_exists(mock_popen, mock_before_run):
process = CustomProcess()
process.process_name = None
process.command = ['test_command']
process.work_path = '/test/work/path'
process.run_log_fp = MagicMock()
process.run_log = '/test/run/log'
process.run()
mock_before_run.assert_called_once()
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.CustomProcess.before_run')
@patch('msserviceprofiler.modelevalstate.optimizer.interfaces.custom_process.subprocess.Popen')
def test_run_subprocess_popen_os_error(mock_popen, mock_before_run):
process = CustomProcess()
process.process_name = None
process.command = ['test_command']
process.work_path = '/test/work/path'
process.run_log_fp = MagicMock()
process.run_log = '/test/run/log'
mock_popen.side_effect = OSError('subprocess.Popen failed')
with pytest.raises(OSError) as e:
process.run()
assert str(e.value) == 'subprocess.Popen failed'
mock_before_run.assert_called_once()
def test_get_log_run_log_none():
process = CustomProcess()
process.run_log = None
assert process.get_log() is None
@patch('pathlib.Path.exists', return_value=False)
def test_get_log_run_log_not_exists(mock_exists):
process = CustomProcess()
process.run_log = 'nonexistent.log'
assert process.get_log() is None
class TestCustomProcessSupplement:
def test_get_log_file_not_found(self):
"""测试get_log方法处理文件不存在的情况"""
process = CustomProcess()
process.run_log = "/nonexistent/path/test.log"
result = process.get_log()
assert result is None
class TestBaseDataField:
@pytest.mark.parametrize("target_field,expected", [
([], ()),
([OptimizerConfigField(name="field1", config_position="pos1", min=0, max=100, dtype="int")],
(OptimizerConfigField(name="field1", config_position="pos1", min=0, max=100, dtype="int"),)),
(None, ())
])
def test_data_field_property(self, target_field, expected):
"""测试data_field属性获取"""
mock_config = MagicMock()
if target_field is not None:
mock_config.target_field = target_field
else:
delattr(mock_config, 'target_field')
data_field = BaseDataField(config=mock_config)
result = data_field.data_field
assert result == expected
def test_data_field_setter_add_new_field(self):
"""测试data_field属性设置器添加新字段"""
mock_config = MagicMock()
mock_config.target_field = [
OptimizerConfigField(name="existing_field", config_position="existing.position", min=0, max=100, dtype="int")
]
data_field = BaseDataField(config=mock_config)
new_fields = (
OptimizerConfigField(name="new_field", config_position="new.position", min=0, max=50, dtype="float"),
OptimizerConfigField(name="existing_field", config_position="updated.position", min=0, max=200, dtype="int")
)
data_field.data_field = new_fields
assert len(mock_config.target_field) == 1
assert mock_config.target_field[0].name == "existing_field"
assert mock_config.target_field[0].config_position == "updated.position"
assert mock_config.target_field[0].max == 200
def test_data_field_setter_empty_input(self):
"""测试data_field属性设置器使用空输入"""
mock_config = MagicMock()
mock_config.target_field = [
OptimizerConfigField(name="field1", config_position="pos1", min=0, max=100, dtype="int")
]
data_field = BaseDataField(config=mock_config)
data_field.data_field = ()
assert len(mock_config.target_field) == 1
assert mock_config.target_field[0].name == "field1"
def test_data_field_setter_no_target_field(self):
"""测试data_field属性设置器在config没有target_field时的情况"""
mock_config = MagicMock()
if hasattr(mock_config, 'target_field'):
delattr(mock_config, 'target_field')
data_field = BaseDataField(config=mock_config)
new_fields = (
OptimizerConfigField(name="test_field", config_position="test.position", min=0, max=100, dtype="int"),
)
data_field.data_field = new_fields
assert not hasattr(mock_config, 'target_field') or mock_config.target_field == []