11a0e446创建于 1月12日历史提交
from beanie import Document, Indexed, UpdateResponse
from typing import Any, Annotated, Optional, Type, TypeVar

T = TypeVar("T", bound="Environment")

class Environment(Document):
    """
    Beanie Document to represent an environment.
    """
    name: Annotated[str, Indexed(unique=True)]
    description: str = ""

    in_use: bool = False
    
    class Settings:
        is_root = True
        
    @classmethod
    async def create(cls: Type[T], name: str, in_use: bool = False) -> Optional[T]:
        """
        Create or Get a new semaphore instance with the given name.
        
        :param name: Unique name for the semaphore.
        :return: An instance of DistributedSemaphore.
        """
        doc = await cls.find_one({"name": name}, with_children=True)
        if doc is None:
            doc = cls(name=name, in_use=in_use)
            try:
                await doc.insert()
            except Exception as e:
                doc = await cls.find_one({"name": name}, with_children=True)
                if doc is None:
                    raise e
        return doc
    
    @classmethod
    async def get(cls: Type[T]) -> Optional[T]:
        """
        Get an available environment that is not in use.
        """
        doc = await cls.find_one(
            {"in_use": False}, with_children=True
        ).update(
            {"$set": {"in_use": True}},
            response_type=UpdateResponse.NEW_DOCUMENT
        )
        return doc
    
    async def set(self):
        """
        Set the environment as in use.
        """
        doc = await Environment.find_one(
            {"name": self.name, "in_use": False}, with_children=True
        ).update(
            {"$set": {"in_use": True}},
            response_type=UpdateResponse.NEW_DOCUMENT
        )
        return doc is not None
    
    async def reset(self):
        """
        Reset the environment to not in use.
        """
        doc = await Environment.find_one(
            {"name": self.name, "in_use": True}, with_children=True
        ).update(
            {"$set": {"in_use": False}},
            response_type=UpdateResponse.NEW_DOCUMENT
        )
        return doc is not None
    
    async def is_in_use(self) -> bool:
        """
        Check if the environment is currently in use.
        """
        doc = await Environment.find_one(
            {"name": self.name, "in_use": True}, with_children=True
        )
        return doc is not None
    

class OSWorldEnv(Environment):
    """
    Database model for OSWorld environments.
    """
    description: str = "OSWorld Environment for Virtual Desktop Automation"

    ip: str = ""
    port: int = 0
    os_type: str = "Ubuntu"
    screen_width: int = 1920
    screen_height: int = 1080
    
    @property
    def connect_string(self) -> str:
        return f"http://{self.ip}:{self.port}/mcp"