import io
import os
import sys
import time
from contextlib import asynccontextmanager
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if backend_dir not in sys.path:
sys.path.append(backend_dir)
import uvicorn
from dotenv import load_dotenv
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
project_root = os.path.dirname(backend_dir)
load_dotenv(os.path.join(project_root, '.env'))
from openjiuwen_studio.ops.modules.prompt.infra.database import create_database_tables
from openjiuwen_studio.memory_engine_start import MemoryEngineManager
from openjiuwen_studio.routers import register
from openjiuwen_studio.core.database import engine
from openjiuwen_studio.models.db_fun_base import Base
from openjiuwen_studio.models import (
AgentBaseDB,
AgentExecutionDB,
AgentExecutionDetailsDB,
AgentPublishDB,
AgentWorkflowRelationDB,
EmbeddingModelConfig,
EvaluationDB,
EvaluationTaskDB,
EvaluationRunDB,
EvaluationTaskResultDB,
KnowledgeBaseDB,
KnowledgeBaseDocumentDB,
KnowledgeBaseWeblinkDB,
MemoryBaseDB,
ModelConfig,
ModelUsageLog,
PluginBaseDB,
PluginPublishDB,
PromptRelationDB,
ReferenceDB,
RuntimeInfoDB,
SpaceDB,
SpaceUserDB,
SystemEmbeddingModelDB,
SystemLLMModelDB,
TagDB,
ToolBaseDB,
TriggerDB,
TriggerExecutionLogDB,
UserDB,
VLMModelConfig,
WorkflowBaseDB,
WorkflowExecutionDB,
WorkflowExecutionDetailsDB,
WorkflowPublishDB,
)
from openjiuwen.core.common.logging import logger, interface_logger
from openjiuwen_studio.core.common.logging.events import CustomLogEventType
from openjiuwen_studio.core.alembic_version_check import check_alembic_versions
from openjiuwen_studio.ops.config import settings as ops_settings
from openjiuwen_studio.core.config import settings
from openjiuwen_studio.models.trace_detail import TraceDetailDB
from openjiuwen_studio.models.trace_summary import TraceSummaryDB
from openjiuwen_studio.models.tag import workflow_tag_association
from openjiuwen_studio.core.manager.redis_manager.redis_client import redis_manager_bytes
from openjiuwen.core.runner import Runner
from openjiuwen.core.runner.runner_config import get_runner_config
from openjiuwen.core.session.checkpointer.checkpointer import CheckpointerConfig
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
@asynccontextmanager
async def lifespan_func(app: FastAPI):
logger.info("🚀 Starting Jiuwen Agent Studio Backend...")
target_tables = [
ModelConfig.__table__,
ModelUsageLog.__table__,
EmbeddingModelConfig.__table__,
VLMModelConfig.__table__,
AgentBaseDB.__table__,
AgentPublishDB.__table__,
PromptRelationDB.__table__,
TagDB.__table__,
UserDB.__table__,
SpaceDB.__table__,
SpaceUserDB.__table__,
WorkflowBaseDB.__table__,
WorkflowPublishDB.__table__,
PluginBaseDB.__table__,
PluginPublishDB.__table__,
ToolBaseDB.__table__,
WorkflowExecutionDB.__table__,
WorkflowExecutionDetailsDB.__table__,
AgentExecutionDB.__table__,
AgentExecutionDetailsDB.__table__,
AgentWorkflowRelationDB.__table__,
ReferenceDB.__table__,
TraceDetailDB.__table__,
TraceSummaryDB.__table__,
KnowledgeBaseDB.__table__,
KnowledgeBaseDocumentDB.__table__,
KnowledgeBaseWeblinkDB.__table__,
SystemLLMModelDB.__table__,
SystemEmbeddingModelDB.__table__,
MemoryBaseDB.__table__,
RuntimeInfoDB.__table__,
TriggerDB.__table__,
TriggerExecutionLogDB.__table__,
EvaluationDB.__table__,
EvaluationTaskDB.__table__,
EvaluationRunDB.__table__,
EvaluationTaskResultDB.__table__
]
if engine.url.drivername == "sqlite":
renamed_count = 0
for table in target_tables:
if not hasattr(table, "indexes"):
logger.warning(f"Table {table.name} has no indexes attribute, skipping...")
continue
for idx in table.indexes:
old_idx_name = idx.name
idx.name = f"{old_idx_name}_{table.name}"
logger.info(f"{table.name}: Renamed index: {old_idx_name} ---> {idx.name}")
renamed_count += 1
logger.info(f"Duplicate index renaming completed. Total renamed indexes: {renamed_count}")
Base.metadata.create_all(
bind=engine,
tables=target_tables,
checkfirst=True
)
await MemoryEngineManager.init()
workflow_tag_association.create(bind=engine, checkfirst=True)
check_alembic_versions()
create_database_tables()
logger.info("✅ Database tables created")
if settings.enable_redis_checkpoint:
runner_config = get_runner_config()
runner_config.checkpointer_config = CheckpointerConfig(
type="redis",
conf={
"connection": {
"redis_client": redis_manager_bytes.client
}
}
)
Runner.set_config(runner_config)
await Runner.start()
from openjiuwen_studio.core.scheduler.scheduler import init_scheduler
from openjiuwen_studio.core.scheduler.sync import sync_triggers_to_scheduler
from openjiuwen_studio.core.database import get_database_url
_scheduler = init_scheduler(get_database_url())
await sync_triggers_to_scheduler(_scheduler)
_scheduler.start()
logger.info("✅ Trigger scheduler started")
yield
logger.info("🛑 Shutting down Jiuwen Agent Studio Backend...")
from openjiuwen_studio.core.scheduler.scheduler import get_scheduler
try:
get_scheduler().shutdown(wait=False)
logger.info("✅ Trigger scheduler stopped")
except Exception as e:
logger.warning(f"⚠️ Error shutting down scheduler: {e}")
app = FastAPI(
title="Jiuwen Agent Studio API",
description="Backend API for Jiuwen Agent Studio - Professional LLM Agent Development Platform",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc",
openapi_url="/api/openapi.json",
lifespan=lifespan_func,
swagger_ui_init_oauth={
"usePkceWithAuthorizationCodeGrant": True,
"appName": "Jiuwen Agent Studio API",
"clientId": "swagger-ui",
}
)
class LogMiddleware:
"""
纯 ASGI 中间件,兼容 StreamingResponse
不使用 BaseHTTPMiddleware,因为它与 StreamingResponse 有兼容性问题:
当流式响应过程中发生异常时,会抛出 RuntimeError: Caught handled exception, but response already started
"""
def __init__(self, asgi_app):
self.app = asgi_app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
start_time = time.perf_counter()
request = Request(scope, receive=receive)
route_path = self._get_route_path(request)
endpoint_name = self._truncate_path(route_path, max_parts=3)
status_code = None
error_occurred = False
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
await send(message)
try:
await self.app(scope, receive, send_wrapper)
except Exception as e:
error_occurred = True
interface_logger.error(
"Request failed",
event_type=CustomLogEventType.INTERFACE_SRV.value,
interface_name=endpoint_name,
execution_time_ms=round((time.perf_counter() - start_time) * 1000),
exception=str(e),
error_code=str(status.HTTP_500_INTERNAL_SERVER_ERROR),
metadata={
"route_path": route_path
}
)
logger.error(f"Request failed: {e}")
raise
if error_occurred:
return
interface_logger.info(
"",
event_type=CustomLogEventType.INTERFACE_SRV.value,
interface_name=endpoint_name,
execution_time_ms=round((time.perf_counter() - start_time) * 1000),
error_code=str(status_code or status.HTTP_200_OK),
metadata={
"route_path": route_path
}
)
@staticmethod
def _truncate_path(path: str, max_parts: int = 3) -> str:
if not path or path == "/":
return "/"
parts = path.strip("/").split("/")
truncated = parts[:max_parts]
return "/" + "/".join(truncated) if truncated else "/"
@staticmethod
def _get_route_path(request: Request) -> str:
try:
route = request.scope.get("route")
if route and hasattr(route, "path") and isinstance(route.path, str):
return route.path
else:
return request.url.path
except Exception:
return request.url.path
app.add_middleware(LogMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://127.0.0.1:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
return {
"message": "Welcome to Jiuwen Agent Studio Backend",
"docs": "/api/docs",
"health": "/api/health"
}
register.router_register(app)
def main():
config = {
"host": os.getenv("HOST", "0.0.0.0"),
"port": int(os.getenv("BACKEND_PORT", "8000")),
"reload": False,
"log_level": "info",
"access_log": True,
"workers": int(os.getenv("WORKER_NUM", 1))
if (os.getenv("INDEX_MANAGER_TYPE") == "milvus" and os.getenv("DB_TYPE") == "mysql")
else 1,
}
logger.info("🚀 Starting Jiuwen Agent Studio Backend in development mode...")
logger.info(f"📍 Server will be available at: http://{config['host']}:{config['port']}")
logger.info(f"📚 API Documentation: http://{config['host']}:{config['port']}/api/docs")
logger.info(f"🔍 Health Check: http://{config['host']}:{config['port']}/api/health")
logger.info("🔄 Auto-reload enabled for development")
logger.info("⏹️ Press Ctrl+C to stop the server")
logger.info("-" * 60)
uvicorn.run(
"openjiuwen_studio.main:app",
loop="asyncio",
**config
)
if __name__ == "__main__":
main()