"""
AgentDock - Docker Container Node Manager for MCP Services
"""
import os
import psutil
import uvicorn
import random
import threading
import asyncio
import datetime
import docker
import json
import subprocess
import shutil
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, Response
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from beanie.exceptions import RevisionIdWasChanged
from config import CONFIG, logger, MANAGER_NAME, db, docker_client
from config import DockerContainerNode, DCNodeChecker
from node import node_router, check_nodes_status_loop
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
AgentDock lifecycle management.
Sets up necessary configurations like checking and creating
table nodes if not exists in database, creating subprocess
to update node status, and registering path to node.
"""
from beanie import init_beanie
await init_beanie(database=db,
document_models=[DockerContainerNode, DCNodeChecker],
multiprocessing_mode=True)
await asyncio.sleep(3*random.random()*CONFIG.node.health_check_interval)
exist = False
async for checker in DCNodeChecker.find(DCNodeChecker.creator_id == MANAGER_NAME):
if checker.pid is None:
await checker.delete()
elif not psutil.pid_exists(checker.pid):
await checker.delete()
else:
exist = True
if not exist:
success = False
try:
checker = DCNodeChecker(
creator_id=MANAGER_NAME,
pid=os.getpid(),
interval=float(CONFIG.node.health_check_interval),
)
await checker.save()
success = True
except RevisionIdWasChanged:
logger.warning("NodeChecker already created by another workers!")
if success:
for node in docker_client.containers.list(all=True, filters={
"label": CONFIG.testingNodeLabel
}):
node.remove(force=True)
logger.info("Test node: " + node.id + " removed!")
def monitor_loop():
asyncio.run(check_nodes_status_loop(checker))
monitor = threading.Thread(target=monitor_loop, daemon=True)
monitor.start()
app.monitor = monitor
logger.info(f"NodeChecker created with ident {monitor.ident}!")
yield
async for checker in DCNodeChecker.find(DCNodeChecker.creator_id == MANAGER_NAME):
await checker.delete()
if CONFIG.node.stop_after_exit:
async for node in DockerContainerNode.find(DockerContainerNode.creator_id == MANAGER_NAME):
container = docker_client.containers.get(node.id)
if container is not None and container.attrs["State"]["Status"] != "exited":
container.stop()
logger.info("Stopping Node: " + node.id)
node.status = "exited"
node.health = "Unhealthy"
await node.replace()
if hasattr(app, "monitor"):
app.monitor.join()
db.client.close()
app = FastAPI(
title="AgentDock",
description="Docker Container Node Manager for MCP Services",
version="1.0.0",
lifespan=lifespan
)
app.include_router(node_router, prefix="/node")
templates = Jinja2Templates(directory="templates")
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def serve_index(request: Request):
"""AgentDock main dashboard page."""
nodes = DockerContainerNode.find_all()
manager_info = {
"name": MANAGER_NAME,
"status": "Running",
"instance_count": await nodes.count(),
"node_checker": (await DCNodeChecker.find_one(DCNodeChecker.creator_id == MANAGER_NAME)).model_dump(mode="json"),
}
nodes = await nodes.to_list()
nodes_data = [
{
"id": node.id,
"status": node.status,
"health": node.health,
"creator_id": node.creator_id,
"creator_name": node.creator_name,
"image_name": node.image_name,
} for node in nodes if nodes
]
return templates.TemplateResponse("index.html", {
"request": request,
"manager_info": manager_info,
"nodes_data": nodes_data
})
@app.get('/api/resources/system')
async def get_system_resources():
"""Get system resource usage information."""
try:
load_avg = os.getloadavg()
cpu_count = psutil.cpu_count()
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return JSONResponse({
"status": "success",
"data": {
"load_average": {
"1min": round(load_avg[0], 2),
"5min": round(load_avg[1], 2),
"15min": round(load_avg[2], 2)
},
"cpu": {
"count": cpu_count,
"percent": round(cpu_percent, 1)
},
"memory": {
"total": memory.total,
"available": memory.available,
"used": memory.used,
"percent": round(memory.percent, 1),
"total_gb": round(memory.total / (1024**3), 1),
"available_gb": round(memory.available / (1024**3), 1),
"used_gb": round(memory.used / (1024**3), 1)
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": round((disk.used / disk.total) * 100, 1),
"total_gb": round(disk.total / (1024**3), 1),
"used_gb": round(disk.used / (1024**3), 1),
"free_gb": round(disk.free / (1024**3), 1)
}
}
})
except Exception as e:
logger.error(f"Failed to get system resources: {e}")
return JSONResponse({
"status": "error",
"message": str(e)
}, status_code=500)
@app.get('/api/resources/docker')
async def get_docker_resources():
"""Get Docker container resource usage information."""
try:
containers_data = []
containers = docker_client.containers.list()
for container in containers:
try:
stats = container.stats(stream=False)
cpu_percent = 0.0
try:
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
if system_delta > 0 and 'percpu_usage' in stats['cpu_stats']['cpu_usage']:
cpu_percent = (cpu_delta / system_delta) * len(stats['cpu_stats']['cpu_usage']['percpu_usage']) * 100
elif system_delta > 0:
cpu_count = psutil.cpu_count()
cpu_percent = (cpu_delta / system_delta) * cpu_count * 100
except (KeyError, TypeError, ZeroDivisionError):
cpu_percent = 0.0
memory_usage = 0
memory_limit = 0
memory_percent = 0
try:
memory_usage = stats['memory_stats'].get('usage', 0)
memory_limit = stats['memory_stats'].get('limit', 0)
memory_percent = (memory_usage / memory_limit) * 100 if memory_limit > 0 else 0
except (KeyError, TypeError, ZeroDivisionError):
pass
network_rx = 0
network_tx = 0
try:
if 'networks' in stats:
for interface in stats['networks'].values():
network_rx += interface.get('rx_bytes', 0)
network_tx += interface.get('tx_bytes', 0)
except (KeyError, TypeError):
pass
disk_read = 0
disk_write = 0
try:
if 'blkio_stats' in stats and 'io_service_bytes_recursive' in stats['blkio_stats']:
for item in stats['blkio_stats']['io_service_bytes_recursive']:
if item.get('op') == 'Read':
disk_read += item.get('value', 0)
elif item.get('op') == 'Write':
disk_write += item.get('value', 0)
except (KeyError, TypeError):
pass
container_info = container.attrs
host_config = container_info['HostConfig']
memory_limit_config = host_config.get('Memory', 0)
cpu_limit = host_config.get('NanoCpus', 0)
cpu_quota = host_config.get('CpuQuota', 0)
cpu_period = host_config.get('CpuPeriod', 0)
cpu_limit_cores = 0
if cpu_limit > 0:
cpu_limit_cores = cpu_limit / 1000000000
elif cpu_quota > 0 and cpu_period > 0:
cpu_limit_cores = cpu_quota / cpu_period
has_cpu_limit = cpu_limit > 0 or (cpu_quota > 0 and cpu_period > 0)
has_memory_limit = memory_limit_config > 0
containers_data.append({
"id": container.id[:12],
"name": container.name,
"image": container.image.tags[0] if container.image.tags else "unknown",
"status": container.status,
"created": container.attrs['Created'],
"cpu": {
"percent": round(cpu_percent, 2),
"limit_cores": cpu_limit_cores
},
"memory": {
"usage": memory_usage,
"limit": memory_limit,
"percent": round(memory_percent, 1),
"usage_mb": round(memory_usage / (1024**2), 1),
"limit_mb": round(memory_limit / (1024**2), 1) if memory_limit > 0 else 0,
"limit_config": memory_limit_config,
"limit_config_gb": round(memory_limit_config / (1024**3), 1) if memory_limit_config > 0 else 0
},
"network": {
"rx_bytes": network_rx,
"tx_bytes": network_tx,
"rx_mb": round(network_rx / (1024**2), 2),
"tx_mb": round(network_tx / (1024**2), 2)
},
"disk": {
"read_bytes": disk_read,
"write_bytes": disk_write,
"read_mb": round(disk_read / (1024**2), 2),
"write_mb": round(disk_write / (1024**2), 2)
},
"limits": {
"has_memory_limit": has_memory_limit,
"has_cpu_limit": has_cpu_limit,
"memory_limit_gb": round(memory_limit_config / (1024**3), 1) if memory_limit_config > 0 else None,
"cpu_limit_cores": cpu_limit_cores if cpu_limit_cores > 0 else None
}
})
except Exception as e:
logger.warning(f"Failed to get stats for container {container.name}: {e}")
containers_data.append({
"id": container.id[:12],
"name": container.name,
"image": container.image.tags[0] if container.image.tags else "unknown",
"status": container.status,
"error": str(e)
})
total_containers = len(containers)
unlimited_containers = sum(1 for c in containers_data if 'limits' in c and not (c['limits']['has_memory_limit'] or c['limits']['has_cpu_limit']))
return JSONResponse({
"status": "success",
"data": {
"containers": containers_data,
"summary": {
"total_containers": total_containers,
"unlimited_containers": unlimited_containers,
"limited_containers": total_containers - unlimited_containers
}
}
})
except Exception as e:
logger.error(f"Failed to get Docker resources: {e}")
return JSONResponse({
"status": "error",
"message": str(e)
}, status_code=500)
@app.get('/api/resources/alerts')
async def get_resource_alerts():
"""Get resource alert information."""
try:
alerts = []
load_avg = os.getloadavg()
if load_avg[0] > 6.0:
alerts.append({
"type": "critical",
"category": "system",
"message": f"System load too high: {load_avg[0]:.2f}",
"recommendation": "Consider stopping some Docker containers"
})
elif load_avg[0] > 4.0:
alerts.append({
"type": "warning",
"category": "system",
"message": f"System load is high: {load_avg[0]:.2f}",
"recommendation": "Monitor system performance"
})
memory = psutil.virtual_memory()
if memory.available < 1024 * 1024 * 1024:
alerts.append({
"type": "critical",
"category": "memory",
"message": f"Available memory low: {memory.available / (1024**3):.1f}GB",
"recommendation": "Free up memory or stop containers immediately"
})
elif memory.percent > 85:
alerts.append({
"type": "warning",
"category": "memory",
"message": f"Memory usage high: {memory.percent:.1f}%",
"recommendation": "Consider cleaning memory or stopping some containers"
})
containers = docker_client.containers.list()
unlimited_containers = []
for container in containers:
host_config = container.attrs['HostConfig']
memory_limit = host_config.get('Memory', 0)
cpu_limit = host_config.get('NanoCpus', 0)
cpu_quota = host_config.get('CpuQuota', 0)
if memory_limit == 0 and cpu_limit == 0 and cpu_quota == 0:
if not any(keyword in container.name.lower() for keyword in ['agentdock', 'mongodb', 'nginx', 'redis']):
unlimited_containers.append(container.name)
if unlimited_containers:
alerts.append({
"type": "warning",
"category": "docker",
"message": f"Found {len(unlimited_containers)} containers without resource limits",
"recommendation": "Consider setting CPU and memory limits for containers",
"details": unlimited_containers[:5]
})
return JSONResponse({
"status": "success",
"data": {
"alerts": alerts,
"alert_count": len(alerts),
"has_critical": any(alert['type'] == 'critical' for alert in alerts)
}
})
except Exception as e:
logger.error(f"Failed to get resource alerts: {e}")
return JSONResponse({
"status": "error",
"message": str(e)
}, status_code=500)
@app.post('/api/resources/emergency-stop')
async def emergency_stop_containers():
"""Emergency stop high-resource containers."""
try:
stopped_containers = []
containers = docker_client.containers.list()
container_stats = []
for container in containers:
try:
if any(keyword in container.name.lower() for keyword in ['agentdock', 'mongodb', 'nginx', 'redis']):
continue
stats = container.stats(stream=False)
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
cpu_percent = 0.0
if system_delta > 0:
cpu_percent = (cpu_delta / system_delta) * len(stats['cpu_stats']['cpu_usage']['percpu_usage']) * 100
container_stats.append({
'container': container,
'cpu_percent': cpu_percent,
'memory_usage': stats['memory_stats']['usage']
})
except:
continue
container_stats.sort(key=lambda x: x['cpu_percent'], reverse=True)
for i, item in enumerate(container_stats[:3]):
if i >= 3:
break
container = item['container']
try:
container.stop(timeout=10)
stopped_containers.append({
"name": container.name,
"id": container.id[:12],
"cpu_percent": round(item['cpu_percent'], 2)
})
logger.info(f"Emergency stopped container: {container.name}")
except Exception as e:
logger.error(f"Failed to stop container {container.name}: {e}")
return JSONResponse({
"status": "success",
"message": f"Stopped {len(stopped_containers)} high-resource containers",
"data": {
"stopped_containers": stopped_containers
}
})
except Exception as e:
logger.error(f"Emergency stop failed: {e}")
return JSONResponse({
"status": "error",
"message": str(e)
}, status_code=500)
@app.get('/alive')
async def alive():
"""Health check endpoint."""
try:
db_info = await db.command("serverStatus")
docker_info = docker_client.info()
checker = await DCNodeChecker.find_one(DCNodeChecker.creator_id == MANAGER_NAME)
return {
"status": "healthy",
"service": "AgentDock",
"manager": MANAGER_NAME,
"db_connected": True,
"docker_connected": True,
"node_checker_running": checker is not None,
"timestamp": datetime.datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {str(e)}")
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.datetime.now().isoformat()
}
@app.api_route("/container/{container_id}/mcp", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def route_to_container_8088_direct(request: Request, container_id: str):
"""
Route request to container's port 8088 (MCP port) - direct access.
URL format: /container/{container_id}/mcp
"""
return await route_to_container_port(request, container_id, "", 8088)
@app.api_route("/container/{container_id}/mcp/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def route_to_container_8088_with_path(request: Request, container_id: str, path: str):
"""
Route request to container's port 8088 (MCP port) - with path.
URL format: /container/{container_id}/mcp/{path}
"""
return await route_to_container_port(request, container_id, path, 8088)
@app.api_route("/container/{container_id}/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def route_to_container_8000(request: Request, container_id: str, path: str):
"""
Route request to container's port 8000 (main API port).
URL format: /container/{container_id}/{path}
"""
return await route_to_container_port(request, container_id, path, 8000)
async def route_to_container_port(request: Request, container_id: str, path: str, target_port: int):
"""Route request to specified container port."""
import httpx
import urllib.parse
try:
node = None
node = await DockerContainerNode.find_one(DockerContainerNode.id == container_id)
if not node:
node = await DockerContainerNode.find_one(DockerContainerNode.short_id == container_id)
if not node:
async for n in DockerContainerNode.find_all():
if n.id.startswith(container_id) or n.short_id.startswith(container_id):
node = n
break
if not node:
raise HTTPException(status_code=404, detail=f"Container {container_id} not found")
if node.status != "running":
try:
container = docker_client.containers.get(node.id)
if container is not None:
container.restart()
await asyncio.sleep(5)
logger.info(f"Restarting container: {node.id}")
from node import update_node_status
await update_node_status(node)
except Exception as e:
logger.error(f"Failed to restart container {node.id}: {str(e)}")
raise HTTPException(
status_code=503, detail=f"Container {container_id} is not running, restart attempted")
if target_port == 8000:
if node.port == 0:
raise HTTPException(status_code=503, detail=f"Container {container_id} port 8000 not mapped")
mapped_port = node.port
elif target_port == 8088:
if node.sse_port == 0:
raise HTTPException(status_code=503, detail=f"Container {container_id} port 8088 not mapped")
mapped_port = node.sse_port
else:
raise HTTPException(status_code=400, detail=f"Unsupported target port: {target_port}")
node.last_req_time = datetime.datetime.now()
await node.replace()
if target_port == 8088:
url = f"http://{node.ip}:{mapped_port}/mcp"
else:
url = f"http://{node.ip}:{mapped_port}/{path}"
logger.info(f"Routing {request.method} request to container {container_id}:{target_port} -> {url}")
headers = {k: v for k, v in request.headers.items() if k.lower() != "host"}
timeout = httpx.Timeout(120.0, connect=10.0)
async with httpx.AsyncClient(timeout=timeout) as client:
try:
if request.method in ["POST", "PUT", "PATCH"]:
data = await request.body()
response = await client.stream(
request.method, url, data=data, headers=headers
).__aenter__()
else:
params = request.query_params
if params:
url += '?' + urllib.parse.urlencode(params)
response = await client.stream(
request.method, url, headers=headers
).__aenter__()
status_code = response.status_code
response_headers = dict(response.headers)
content_type = response_headers.get('content-type', '').lower()
if 'application/json' in content_type:
try:
content = await response.aread()
await response.aclose()
return Response(
content=content,
status_code=status_code,
headers=response_headers,
media_type=content_type
)
except Exception as e:
logger.error(f"Error reading JSON response from container {container_id}: {str(e)}")
await response.aclose()
raise HTTPException(status_code=502, detail=f"Error reading response: {str(e)}")
else:
async def stream_response():
try:
async for chunk in response.aiter_bytes():
yield chunk
except Exception as e:
logger.error(f"Error streaming response from container {container_id}: {str(e)}")
yield f"Error communicating with container: {str(e)}".encode()
finally:
await response.aclose()
return StreamingResponse(
stream_response(),
status_code=status_code,
headers=response_headers
)
except httpx.RequestError as e:
logger.error(f"Request error when connecting to container {container_id}: {str(e)}")
raise HTTPException(status_code=503, detail=f"Error connecting to container: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error when routing to container {container_id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in route_to_container_port: {str(e)}")
raise HTTPException(status_code=500, detail=f"Server error: {str(e)}")
@app.get('/api/container/{container_id}/urls')
async def get_container_urls(request: Request, container_id: str):
"""Get container access URL information."""
try:
node = None
node = await DockerContainerNode.find_one(DockerContainerNode.id == container_id)
if not node:
node = await DockerContainerNode.find_one(DockerContainerNode.short_id == container_id)
if not node:
async for n in DockerContainerNode.find_all():
if n.id.startswith(container_id) or n.short_id.startswith(container_id):
node = n
break
if not node:
raise HTTPException(status_code=404, detail=f"Container {container_id} not found")
base_url = str(request.base_url).rstrip('/')
urls = {
"container_id": node.id,
"short_id": node.short_id,
"status": node.status,
"direct_access": {
"port_8000": f"http://{node.ip}:{node.port}" if node.port > 0 else None,
"port_8088": f"http://{node.ip}:{node.sse_port}/mcp" if node.sse_port > 0 else None
},
"proxy_access": {
"api_url": f"{base_url}/container/{node.short_id}" if node.port > 0 else None,
"mcpapi_url": f"{base_url}/container/{node.short_id}/mcpapi" if node.port > 0 else None,
"mcp_url": f"{base_url}/container/{node.short_id}/mcp" if node.sse_port > 0 else None
}
}
return JSONResponse(content=urls)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting container URLs: {str(e)}")
raise HTTPException(status_code=500, detail=f"Server error: {str(e)}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)