import os
import sys
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SRC_ROOT = os.path.join(PROJECT_ROOT, 'src')
if SRC_ROOT not in sys.path:
sys.path.insert(0, SRC_ROOT)
import json
import pytest
from unittest.mock import patch, MagicMock
from virtcca_deploy.services.task_service import TaskService, init_task_service, get_task_service
from virtcca_deploy.common.constants import TASK_TYPE_VM_CREATE, TASK_TYPE_VM_DELETE
class TestTaskService:
def test_create_task(self, app):
"""测试创建任务"""
with app.app_context():
init_task_service()
task_service = get_task_service()
task_params = {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute01-1"]
}
task_id = task_service.create_task(TASK_TYPE_VM_CREATE, task_params)
assert task_id is not None
assert isinstance(task_id, str)
task = task_service.get_task(task_id)
assert task is not None
assert task.task_id == task_id
assert task.task_type == TASK_TYPE_VM_CREATE
assert task.status == "created"
assert json.loads(task.task_params) == task_params
def test_get_task(self, app):
"""测试获取任务"""
with app.app_context():
init_task_service()
task_service = get_task_service()
task_params = {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute01-1"]
}
task_id = task_service.create_task(TASK_TYPE_VM_CREATE, task_params)
task = task_service.get_task(task_id)
assert task is not None
assert task.task_id == task_id
assert task.task_type == TASK_TYPE_VM_CREATE
non_existent_task = task_service.get_task("non-existent-task-id")
assert non_existent_task is None
def test_update_task_status(self, app):
"""测试更新任务状态"""
with app.app_context():
init_task_service()
task_service = get_task_service()
task_params = {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute01-1"]
}
task_id = task_service.create_task(TASK_TYPE_VM_CREATE, task_params)
result = task_service.update_task_status(task_id, "running")
assert result is True
task = task_service.get_task(task_id)
assert task.status == "running"
result = task_service.update_task_status(task_id, "success")
assert result is True
task = task_service.get_task(task_id)
assert task.status == "success"
assert task.completed_at is not None
result = task_service.update_task_status("non-existent-task-id", "failed")
assert result is False
def test_get_tasks_by_status(self, app):
"""测试根据状态获取任务列表"""
with app.app_context():
init_task_service()
task_service = get_task_service()
task_service.create_task(TASK_TYPE_VM_CREATE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute01-1"]
})
task_id_running = task_service.create_task(TASK_TYPE_VM_CREATE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute02-1"]
})
task_id_success = task_service.create_task(TASK_TYPE_VM_DELETE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute03-1"]
})
task_service.update_task_status(task_id_running, "running")
task_service.update_task_status(task_id_success, "success")
created_tasks = task_service.get_tasks_by_status("created")
running_tasks = task_service.get_tasks_by_status("running")
success_tasks = task_service.get_tasks_by_status("success")
assert len(created_tasks) == 1
assert len(running_tasks) == 1
assert len(success_tasks) == 1
def test_get_tasks_by_type(self, app):
"""测试根据类型获取任务列表"""
with app.app_context():
init_task_service()
task_service = get_task_service()
task_service.create_task(TASK_TYPE_VM_CREATE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute01-1"]
})
task_service.create_task(TASK_TYPE_VM_CREATE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute02-1"]
})
task_service.create_task(TASK_TYPE_VM_DELETE, {
"success_vms": [],
"fail_vms": [],
"total_vms": ["compute03-1"]
})
create_tasks = task_service.get_tasks_by_type(TASK_TYPE_VM_CREATE)
delete_tasks = task_service.get_tasks_by_type(TASK_TYPE_VM_DELETE)
assert len(create_tasks) == 2
assert len(delete_tasks) == 1