"""
KADT WebUI - Flask应用
提供可视化部署界面
"""
import os
import json
import argparse
import logging
import importlib.util
from datetime import datetime, timezone, timedelta
from flask import Flask, render_template, request, jsonify
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger('werkzeug').setLevel(logging.WARNING)
BEIJING_TZ = timezone(timedelta(hours=8))
script_dir = os.path.dirname(os.path.abspath(__file__))
base_dir = os.path.dirname(script_dir)
parent_dir = os.path.dirname(base_dir)
KEY_ERROR = 'error'
def _load_script_runner():
"""动态加载script_runner模块"""
spec = importlib.util.spec_from_file_location("script_runner", os.path.join(script_dir, "script_runner.py"))
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.ScriptRunner
raise ImportError("无法加载script_runner模块")
ScriptRunner = _load_script_runner()
template_dir = os.path.join(script_dir, 'templates')
static_dir = os.path.join(script_dir, 'static')
app = Flask(__name__, template_folder=template_dir, static_folder=static_dir)
app.config['SECRET_KEY'] = 'ascend-deployer-webui'
@app.template_filter('datetime')
def format_datetime(timestamp):
"""时间戳格式化过滤器(北京时间)"""
if not timestamp:
return '-'
dt = datetime.fromtimestamp(timestamp, BEIJING_TZ)
return dt.strftime('%Y-%m-%d %H:%M:%S')
config_path = os.path.join(script_dir, 'scripts_config.yaml')
runner = ScriptRunner(config_path, base_dir=parent_dir)
@app.route('/')
def index():
"""首页:显示所有profile卡片"""
profiles = runner.get_profiles()
return render_template('index.html', profiles=profiles)
@app.route('/profile/<name>')
def profile_page(name):
"""Profile详情页:显示workflow步骤"""
workflow = runner.get_workflow(name)
return render_template('workflow.html', profile=name, workflow=workflow)
@app.route('/step/<profile>/<step_name>')
def step_page(profile, step_name):
"""步骤执行页面"""
step_info = runner.get_step_form_info(profile, step_name)
workflow = runner.get_workflow(profile)
current_idx = 0
for i, s in enumerate(workflow):
if s.get('name') == step_name:
current_idx = i
break
return render_template('step.html',
profile=profile,
step=step_info,
workflow=workflow,
current_idx=current_idx)
@app.route('/logs/<task_id>')
def logs_page(task_id):
"""日志查看页面"""
task = runner.get_task_info(task_id)
if not task:
return render_template('error.html', message='任务不存在')
log_result = runner.get_log_content(task_id, 0)
logs = log_result.get('content', '')
return render_template('logs.html', task=task, logs=logs)
@app.route('/tasks')
def tasks_page():
"""任务列表页面"""
tasks = runner.get_all_tasks()
return render_template('tasks.html', tasks=tasks)
@app.route('/api/profiles')
def api_profiles():
"""获取所有profiles"""
return jsonify(runner.get_profiles())
@app.route('/api/workflow/<profile>')
def api_workflow(profile):
"""获取workflow步骤"""
return jsonify(runner.get_workflow(profile))
@app.route('/api/step/<profile>/<step_name>')
def api_step_info(profile, step_name):
"""获取步骤表单信息"""
return jsonify(runner.get_step_form_info(profile, step_name))
@app.route('/api/execute', methods=['POST'])
def api_execute():
"""执行某个步骤"""
data = request.json
profile = data.get('profile')
step_name = data.get('step')
params = data.get('params', {})
if not profile or not step_name:
return jsonify({KEY_ERROR: '缺少profile或step参数'}), 400
try:
result = runner.execute_async(profile, step_name, params)
return jsonify(result)
except ValueError as e:
return jsonify({KEY_ERROR: str(e)}), 400
except Exception as e:
return jsonify({KEY_ERROR: f'执行失败: {str(e)}'}), 500
@app.route('/api/logs/<task_id>')
def api_logs(task_id):
"""获取实时日志"""
last_pos = int(request.args.get('pos', 0))
result = runner.get_log_content(task_id, last_pos)
return jsonify(result)
@app.route('/api/task/<task_id>')
def api_task_info(task_id):
"""获取任务信息"""
task = runner.get_task_info(task_id)
if not task:
return jsonify({'error': '任务不存在'}), 404
return jsonify(task)
@app.route('/api/tasks')
def api_tasks():
"""获取所有任务"""
return jsonify(runner.get_all_tasks())
@app.route('/api/kill/<task_id>', methods=['POST'])
def api_kill_task(task_id):
"""中止正在运行的任务"""
result = runner.kill_task(task_id)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
@app.route('/api/health')
def api_health():
"""健康检查"""
return jsonify({'status': 'ok', 'timestamp': os.path.getmtime(config_path)})
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='KADT WebUI')
parser.add_argument('--port', type=int, default=8080, help='服务端口')
parser.add_argument('--host', default='0.0.0.0', help='绑定地址')
parser.add_argument('--debug', action='store_true', help='调试模式')
args = parser.parse_args()
logger.info("=== KADT WebUI ===")
logger.info(f"启动端口: {args.port}")
logger.info(f"访问地址: http://{args.host if args.host != '0.0.0.0' else '本机IP'}:{args.port}")
logger.info(f"配置文件: {config_path}")
logger.info(f"基准目录: {parent_dir}")
app.run(host=args.host, port=args.port, debug=args.debug, threaded=True)
if __name__ == '__main__':
main()