"""Unit tests for TeamMember module"""
from unittest.mock import (
AsyncMock,
patch,
)
import pytest
import pytest_asyncio
from openjiuwen.agent_teams.context import (
reset_session_id,
set_session_id,
)
from openjiuwen.agent_teams.tools.database import (
DatabaseConfig,
DatabaseType,
TeamDatabase,
)
from openjiuwen.agent_teams.agent.member import TeamMember
from openjiuwen.agent_teams.schema.status import (
ExecutionStatus,
MemberStatus,
)
from openjiuwen.agent_teams.tools.team import TeamBackend
from openjiuwen.core.multi_agent.team_runtime.message_bus import MessageBus
from openjiuwen.core.single_agent.schema.agent_card import AgentCard
@pytest.fixture
def agent_card():
"""Provide AgentCard instance for testing"""
return AgentCard(
name="TestAgent",
description="Test agent for unit tests",
version="1.0.0"
)
@pytest.fixture
def db_config():
"""Provide in-memory database config for testing"""
return DatabaseConfig(db_type=DatabaseType.SQLITE, connection_string=":memory:")
@pytest_asyncio.fixture
async def db(db_config):
"""Provide initialized database instance"""
database = TeamDatabase(db_config)
try:
await database.initialize()
yield database
finally:
await database.close()
@pytest_asyncio.fixture
async def message_bus():
"""Provide MessageBus instance for testing"""
bus = MessageBus()
await bus.start()
try:
yield bus
finally:
await bus.stop()
@pytest_asyncio.fixture
async def team_member(db, agent_card, message_bus):
"""Provide initialized TeamMember instance"""
await db.team.create_team(
team_name="test_team",
display_name="Test Team",
leader_member_name="leader1"
)
await db.member.create_member(
member_name="member1",
team_name="test_team",
display_name="Test Member",
agent_card=agent_card.model_dump_json(),
status=MemberStatus.READY.value,
execution_status=ExecutionStatus.IDLE.value
)
return TeamMember(
member_name="member1",
team_name="test_team",
display_name="Test Member",
agent_card=agent_card,
db=db,
messager=message_bus
)
class TestTeamMemberInit:
"""Test TeamMember initialization"""
@pytest.mark.asyncio
@pytest.mark.level0
async def test_member_initialization(self, team_member, agent_card):
"""Test that member is initialized with correct values"""
assert team_member.member_name == "member1"
assert team_member.team_name == "test_team"
assert team_member.display_name == "Test Member"
assert team_member.agent_card == agent_card
assert await team_member.status() == MemberStatus.READY
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level0
async def test_member_with_optional_fields(self, db, agent_card, message_bus):
"""Test member initialization with optional fields"""
await db.team.create_team(
team_name="test_team",
display_name="Test Team",
leader_member_name="leader1"
)
member = TeamMember(
member_name="member2",
team_name="test_team",
display_name="Test Member with Options",
agent_card=agent_card,
db=db,
messager=message_bus,
prompt="You are a helpful assistant",
desc="A helpful team member"
)
assert member.prompt == "You are a helpful assistant"
assert member.desc == "A helpful team member"
class TestMemberStatus:
"""Test member status management"""
@pytest.mark.asyncio
@pytest.mark.level0
async def test_get_initial_status(self, team_member):
"""Test getting initial status"""
assert await team_member.status() == MemberStatus.READY
@pytest.mark.asyncio
@pytest.mark.level0
async def test_update_status_valid_transition(self, team_member):
"""Test updating status with valid transition"""
result = await team_member.update_status(MemberStatus.BUSY)
assert result is True
assert await team_member.status() == MemberStatus.BUSY
@pytest.mark.asyncio
@pytest.mark.level0
async def test_update_status_invalid_transition(self, team_member):
"""Test updating status with invalid transition"""
await team_member.update_status(MemberStatus.BUSY)
result = await team_member.update_status(MemberStatus.SHUTDOWN)
assert result is False
assert await team_member.status() == MemberStatus.BUSY
@pytest.mark.asyncio
@pytest.mark.level0
async def test_status_transition_ready_to_busy(self, team_member):
"""Test READY -> BUSY transition"""
assert await team_member.status() == MemberStatus.READY
result = await team_member.update_status(MemberStatus.BUSY)
assert result is True
assert await team_member.status() == MemberStatus.BUSY
@pytest.mark.asyncio
@pytest.mark.level0
async def test_status_transition_busy_to_ready(self, team_member):
"""Test BUSY -> READY transition"""
await team_member.update_status(MemberStatus.BUSY)
result = await team_member.update_status(MemberStatus.READY)
assert result is True
assert await team_member.status() == MemberStatus.READY
@pytest.mark.asyncio
@pytest.mark.level0
async def test_status_transition_ready_to_shutdown_requested(self, team_member):
"""Test READY -> SHUTDOWN_REQUESTED transition"""
result = await team_member.update_status(MemberStatus.SHUTDOWN_REQUESTED)
assert result is True
assert await team_member.status() == MemberStatus.SHUTDOWN_REQUESTED
@pytest.mark.asyncio
@pytest.mark.level0
async def test_status_transition_shutdown_requested_to_shutdown(self, team_member):
"""Test SHUTDOWN_REQUESTED -> SHUTDOWN transition"""
await team_member.update_status(MemberStatus.SHUTDOWN_REQUESTED)
result = await team_member.update_status(MemberStatus.SHUTDOWN)
assert result is True
assert await team_member.status() == MemberStatus.SHUTDOWN
@pytest.mark.asyncio
@pytest.mark.level0
async def test_status_transition_ready_to_error(self, team_member):
"""Test READY -> ERROR transition"""
result = await team_member.update_status(MemberStatus.ERROR)
assert result is True
assert await team_member.status() == MemberStatus.ERROR
@pytest.mark.asyncio
@pytest.mark.level1
async def test_status_transition_error_to_ready(self, team_member):
"""Test ERROR -> READY transition"""
await team_member.update_status(MemberStatus.ERROR)
result = await team_member.update_status(MemberStatus.READY)
assert result is True
assert await team_member.status() == MemberStatus.READY
@pytest.mark.asyncio
@pytest.mark.level1
async def test_status_no_transition_from_shutdown(self, team_member):
"""Test that no transitions are allowed from SHUTDOWN"""
await team_member.update_status(MemberStatus.BUSY)
await team_member.update_status(MemberStatus.SHUTDOWN_REQUESTED)
await team_member.update_status(MemberStatus.SHUTDOWN)
result = await team_member.update_status(MemberStatus.READY)
assert result is False
assert await team_member.status() == MemberStatus.SHUTDOWN
class TestExecutionStatus:
"""Test execution status management"""
@pytest.mark.asyncio
@pytest.mark.level1
async def test_get_initial_execution_status(self, team_member):
"""Test getting initial execution status"""
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_update_execution_status_valid_transition(self, team_member):
"""Test updating execution status with valid transition"""
result = await team_member.update_execution_status(ExecutionStatus.STARTING)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.STARTING
@pytest.mark.asyncio
@pytest.mark.level1
async def test_update_execution_status_invalid_transition(self, team_member):
"""Test updating execution status with invalid transition"""
result = await team_member.update_execution_status(ExecutionStatus.RUNNING)
assert result is False
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_idle_to_starting(self, team_member):
"""Test IDLE -> STARTING transition"""
assert await team_member.execution_status() == ExecutionStatus.IDLE
result = await team_member.update_execution_status(ExecutionStatus.STARTING)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.STARTING
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_starting_to_running(self, team_member):
"""Test STARTING -> RUNNING transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
result = await team_member.update_execution_status(ExecutionStatus.RUNNING)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.RUNNING
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_running_to_completing(self, team_member):
"""Test RUNNING -> COMPLETING transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
result = await team_member.update_execution_status(ExecutionStatus.COMPLETING)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.COMPLETING
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_completing_to_completed(self, team_member):
"""Test COMPLETING -> COMPLETED transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
await team_member.update_execution_status(ExecutionStatus.COMPLETING)
result = await team_member.update_execution_status(ExecutionStatus.COMPLETED)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.COMPLETED
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_completed_to_idle(self, team_member):
"""Test COMPLETED -> IDLE transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
await team_member.update_execution_status(ExecutionStatus.COMPLETING)
await team_member.update_execution_status(ExecutionStatus.COMPLETED)
result = await team_member.update_execution_status(ExecutionStatus.IDLE)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_running_to_cancel_requested(self, team_member):
"""Test RUNNING -> CANCEL_REQUESTED transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
result = await team_member.update_execution_status(ExecutionStatus.CANCEL_REQUESTED)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.CANCEL_REQUESTED
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_cancel_requested_to_cancelling(self, team_member):
"""Test CANCEL_REQUESTED -> CANCELLING transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
await team_member.update_execution_status(ExecutionStatus.CANCEL_REQUESTED)
result = await team_member.update_execution_status(ExecutionStatus.CANCELLING)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.CANCELLING
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_cancelling_to_cancelled(self, team_member):
"""Test CANCELLING -> CANCELLED transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.RUNNING)
await team_member.update_execution_status(ExecutionStatus.CANCEL_REQUESTED)
await team_member.update_execution_status(ExecutionStatus.CANCELLING)
result = await team_member.update_execution_status(ExecutionStatus.CANCELLED)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.CANCELLED
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_failed_to_idle(self, team_member):
"""Test FAILED -> IDLE transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.FAILED)
result = await team_member.update_execution_status(ExecutionStatus.IDLE)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_execution_transition_timed_out_to_idle(self, team_member):
"""Test TIMED_OUT -> IDLE transition"""
await team_member.update_execution_status(ExecutionStatus.STARTING)
await team_member.update_execution_status(ExecutionStatus.TIMED_OUT)
result = await team_member.update_execution_status(ExecutionStatus.IDLE)
assert result is True
assert await team_member.execution_status() == ExecutionStatus.IDLE
class TestStatusTransitionsComplex:
"""Test complex status transition scenarios"""
@pytest.mark.asyncio
@pytest.mark.level1
async def test_full_member_lifecycle(self, team_member):
"""Test complete member lifecycle: READY -> BUSY -> READY -> SHUTDOWN_REQUESTED -> SHUTDOWN"""
assert await team_member.status() == MemberStatus.READY
assert await team_member.update_status(MemberStatus.BUSY)
assert await team_member.status() == MemberStatus.BUSY
assert await team_member.update_status(MemberStatus.READY)
assert await team_member.status() == MemberStatus.READY
assert await team_member.update_status(MemberStatus.SHUTDOWN_REQUESTED)
assert await team_member.status() == MemberStatus.SHUTDOWN_REQUESTED
assert await team_member.update_status(MemberStatus.SHUTDOWN)
assert await team_member.status() == MemberStatus.SHUTDOWN
@pytest.mark.asyncio
@pytest.mark.level1
async def test_full_execution_lifecycle(self, team_member):
"""Test complete execution lifecycle: IDLE -> STARTING -> RUNNING -> COMPLETING -> COMPLETED -> IDLE"""
assert await team_member.execution_status() == ExecutionStatus.IDLE
assert await team_member.update_execution_status(ExecutionStatus.STARTING)
assert await team_member.execution_status() == ExecutionStatus.STARTING
assert await team_member.update_execution_status(ExecutionStatus.RUNNING)
assert await team_member.execution_status() == ExecutionStatus.RUNNING
assert await team_member.update_execution_status(ExecutionStatus.COMPLETING)
assert await team_member.execution_status() == ExecutionStatus.COMPLETING
assert await team_member.update_execution_status(ExecutionStatus.COMPLETED)
assert await team_member.execution_status() == ExecutionStatus.COMPLETED
assert await team_member.update_execution_status(ExecutionStatus.IDLE)
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_cancellation_flow(self, team_member):
"""Test execution cancellation flow: IDLE -> STARTING -> RUNNING -> CANCEL_REQUESTED -> CANCELLING -> CANCELLED -> IDLE"""
assert await team_member.execution_status() == ExecutionStatus.IDLE
assert await team_member.update_execution_status(ExecutionStatus.STARTING)
assert await team_member.update_execution_status(ExecutionStatus.RUNNING)
assert await team_member.update_execution_status(ExecutionStatus.CANCEL_REQUESTED)
assert await team_member.update_execution_status(ExecutionStatus.CANCELLING)
assert await team_member.update_execution_status(ExecutionStatus.CANCELLED)
assert await team_member.update_execution_status(ExecutionStatus.IDLE)
assert await team_member.execution_status() == ExecutionStatus.IDLE
@pytest.mark.asyncio
@pytest.mark.level1
async def test_update_status_silent_false_when_row_absent(db, agent_card, message_bus):
"""A handle whose member row is not registered yet tolerates writes.
The leader holds its handle from configure() before BuildTeamTool
materializes its own DB row. update_status / update_execution_status
must short-circuit to False without reaching the DAO (which would
log an ERROR for the missing row).
"""
handle = TeamMember(
member_name="ghost",
team_name="no_such_team",
agent_card=agent_card,
db=db,
messager=message_bus,
)
assert await handle.status() is None
assert await handle.execution_status() is None
with patch.object(db.member, "update_member_status", new_callable=AsyncMock) as status_dao:
assert await handle.update_status(MemberStatus.READY) is False
status_dao.assert_not_called()
with patch.object(
db.member,
"update_member_execution_status",
new_callable=AsyncMock,
) as exec_dao:
assert await handle.update_execution_status(ExecutionStatus.RUNNING) is False
exec_dao.assert_not_called()
assert await handle.status() is None
@pytest.mark.asyncio
@pytest.mark.level1
async def test_leader_member_status_persists_after_build_team(db, agent_card, message_bus):
"""Once build_team materializes the leader row, handle writes land in DB.
Pairs with the eager-handle construction in TeamAgent._setup_agent:
the leader's status transitions reach the database the same way a
teammate's do.
"""
token = set_session_id("leader_status_session")
try:
backend = TeamBackend(
team_name="lt",
member_name="leader1",
db=db,
messager=message_bus,
is_leader=True,
)
await backend.build_team(
display_name="LT",
desc="leader status tracking",
leader_display_name="Leader",
leader_desc="PM",
)
handle = TeamMember(
member_name="leader1",
team_name="lt",
agent_card=agent_card,
db=db,
messager=message_bus,
)
assert await handle.status() == MemberStatus.BUSY
assert await handle.update_status(MemberStatus.READY) is True
assert await handle.status() == MemberStatus.READY
assert await handle.update_status(MemberStatus.BUSY) is True
assert await handle.status() == MemberStatus.BUSY
finally:
reset_session_id(token)