#!/usr/bin/env python3
"""
Script Runner - 配置驱动的脚本执行引擎
负责解析配置、构建命令、执行脚本、管理日志
"""

import os
import re
import shlex
import yaml
import json
import time
import signal
import atexit
import logging
import subprocess
import threading
import glob
from dataclasses import dataclass, asdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


@dataclass
class TaskContext:
    """任务上下文信息"""
    task_id: str
    profile: str
    step_name: str
    params: Dict
    cmd: List[str]
    log_file: str


class ScriptRunner:
    """配置驱动的脚本执行器"""
    
    BEIJING_TZ = timezone(timedelta(hours=8))
    TRUE_VALUES = [True, 'true', 'True', 'on', 'yes', '1']
    
    def __init__(self, config_path: str, base_dir: str = None):
        """
        初始化执行器
        
        Args:
            config_path: 配置文件路径
            base_dir: 脚本基准目录(kadt的父目录)
        """
        self.config_path = config_path
        self.base_dir = base_dir or os.path.dirname(os.path.dirname(config_path))
        self.config = self.load_config()
        self.running_tasks: Dict[str, Dict] = {}
        self.task_counter = 0
        self.lock = threading.Lock()
        self.cleaning_up = False
        self.should_exit = False
        
        self.logs_dir = os.path.join(self.base_dir, "kadt/webui/logs")
        os.makedirs(self.logs_dir, exist_ok=True)
        
        self.recover_tasks_on_startup()
        atexit.register(self.cleanup_on_exit)
        signal.signal(signal.SIGTERM, self.signal_handler)
        signal.signal(signal.SIGINT, self.signal_handler)

    @staticmethod
    def is_process_alive(pid: int) -> bool:
        """检查进程是否存活"""
        if not pid:
            return False
        try:
            os.kill(pid, 0)
            return True
        except ProcessLookupError:
            return False
        except PermissionError:
            return True
        except OSError:
            return False

    @staticmethod
    def build_server_line(server: Any) -> Optional[str]:
        """构建单个服务器的配置行"""
        if isinstance(server, dict):
            ip = server.get('ip', '')
            if not ip:
                return None
            pass_val = server.get('pass', '')
            if pass_val:
                return f"{ip} ansible_ssh_user=\"root\" ansible_ssh_pass=\"{pass_val}\""
            return f"{ip} ansible_ssh_user=\"root\""
        elif isinstance(server, str) and server.strip():
            return server.strip()
        return None
    
    @staticmethod
    def process_inventory_line(line: str, in_worker: bool, server_lines: List[str]) -> tuple:
        """
        处理单行inventory内容
        
        Returns:
            (new_line, in_worker_flag, should_skip)
        """
        stripped = line.strip()
        
        if stripped.startswith('[worker]'):
            return line, True, False
        
        if in_worker:
            if stripped.startswith('[') and not stripped.startswith('[worker:'):
                return line, False, False
            if stripped.startswith('#') or not stripped:
                return None, True, True
            return None, True, True
        
        return line, in_worker, False
    
    @staticmethod
    def process_select_param(cmd: List[str], param_def: Dict, value: Any) -> None:
        """处理select类型参数"""
        options = param_def.get('options', [])
        is_bool_select = options and set(str(o).lower() for o in options) == {'true', 'false'}
        bool_flag = param_def.get('bool_flag', False)
        
        if is_bool_select and bool_flag:
            if str(value).lower() == 'true':
                cmd.append(f"--{param_def['name']}")
        elif value:
            cmd.append(f"--{param_def['name']}")
            cmd.append(str(value))
    
    @staticmethod
    def process_standard_param(cmd: List[str], param_name: str, value: Any) -> None:
        """处理标准类型参数(text/number/password/software_list等)"""
        if value:
            cmd.append(f"--{param_name}")
            cmd.append(str(value))
    
    @staticmethod
    def should_skip_param(param_def: Dict, value: Any) -> bool:
        """判断是否应该跳过该参数"""
        param_type = param_def.get('type', 'text')
        frontend_only = param_def.get('frontend_only', False)
        return frontend_only or param_type in ['servers', 'help_panel']
    
    @staticmethod
    def write_task_header(log_file: str, task_info: Dict, cmd: List[str]) -> None:
        """写入任务开始日志头"""
        with open(log_file, 'w', encoding='utf-8') as f:
            f.write(f"=== 任务开始: {task_info['task_id']} ===\n")
            f.write(f"Profile: {task_info['profile']}\n")
            f.write(f"Step: {task_info['step']}\n")
            f.write(f"Command: {' '.join(cmd)}\n")
            f.write("=" * 50 + "\n\n")
            f.flush()
    @staticmethod
    def decode_line(line: bytes) -> str:
        """解码输出行"""
        try:
            return line.decode('utf-8', errors='replace')
        except UnicodeDecodeError:
            return line.decode('latin-1', errors='replace')
    
    @staticmethod
    def write_task_footer(log_file: str, task_info: Dict, returncode: int) -> None:
        """写入任务结束日志"""
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write("\n" + "=" * 50 + "\n")
            f.write(f"=== 任务结束: {task_info['task_id']} ===\n")
            f.write(f"状态: {task_info['status']}\n")
            f.write(f"返回码: {returncode}\n")
            f.write(f"耗时: {task_info['end_time'] - task_info['start_time']:.2f}s\n")
    
    @staticmethod
    def terminate_process(pid: int) -> bool:
        """终止进程(SIGTERM -> SIGKILL)"""
        try:
            os.kill(pid, signal.SIGTERM)
            time.sleep(0.3)
            
            try:
                os.kill(pid, 0)
                os.kill(pid, signal.SIGKILL)
            except ProcessLookupError:
                pass
            
            return True
        except ProcessLookupError:
            return True
        except Exception as e:
            logger.warning(f"终止进程失败: {e}")
            return False
    
    @staticmethod
    def is_task_expired(task: Dict, max_age_hours: int = 24) -> bool:
        """判断任务是否过期"""
        end_time = task.get('end_time', task.get('start_time', 0))
        return (time.time() - end_time) / 3600 > max_age_hours
    
    @staticmethod
    def load_task_status_from_file(status_file: str) -> Optional[Dict]:
        """从状态文件加载任务"""
        try:
            with open(status_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except (json.JSONDecodeError, OSError, IOError) as e:
            logger.warning(f"加载任务状态文件失败: {status_file}, {e}")
            return None
    
    def load_config(self) -> Dict:
        """加载YAML配置文件"""
        with open(self.config_path, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)

    def get_profiles(self) -> List[Dict]:
        """返回所有可用的profile列表"""
        profiles = []
        for name, data in self.config.get('profiles', {}).items():
            profiles.append({
                'name': name,
                'display_name': data.get('display_name', name),
                'description': data.get('description', ''),
                'icon': data.get('icon', 'default'),
                'workflow_count': len(data.get('workflow', []))
            })
        return profiles

    def get_workflow(self, profile_name: str) -> List[Dict]:
        """返回某profile的workflow步骤列表"""
        profile = self.config.get('profiles', {}).get(profile_name)
        if not profile:
            return []
        return profile.get('workflow', [])

    def get_step(self, profile_name: str, step_name: str) -> Optional[Dict]:
        """获取单个步骤的配置"""
        workflow = self.get_workflow(profile_name)
        for step in workflow:
            if step.get('name') == step_name:
                return step
        return None

    def get_step_form_info(self, profile: str, step_name: str) -> Dict:
        """
        获取步骤的表单信息(包含动态选项)
        
        Returns:
            {name, title, description, params: [{name, type, label, options, ...}]}
        """
        step = self.get_step(profile, step_name)
        if not step:
            return {}
        
        params_info = []
        for param in step.get('params', []):
            param_info = dict(param)
            if param.get('type') == 'select':
                param_info['options'] = self.get_select_options(param)
            params_info.append(param_info)
        
        STR_NAME = 'name'
        return {
            STR_NAME: step[STR_NAME],
            'title': step.get('title', step[STR_NAME]),
            'description': step.get('description', ''),
            'standalone': step.get('standalone', True),
            'skip_allowed': step.get('skip_allowed', True),
            'retry_allowed': step.get('retry_allowed', True),
            'needs_reboot': step.get('needs_reboot', False),
            'params': params_info
        }

    def get_select_options(self, param: Dict) -> List:
        """获取select参数的选项列表"""
        if param.get('source') == 'dynamic':
            return self.get_dynamic_options(param.get('source_cmd', ''))
        return param.get('options', [])

    def get_dynamic_options(self, source_cmd: str) -> List[str]:
        """执行命令获取动态选项列表"""
        try:
            cmd_parts = shlex.split(source_cmd)
            result = subprocess.run(
                cmd_parts,
                capture_output=True,
                text=True,
                timeout=30,
                cwd=self.base_dir
            )
            if result.returncode == 0:
                return [line.strip() for line in result.stdout.strip().split('\n') if line.strip()]
            return []
        except (subprocess.SubprocessError, OSError, ValueError) as e:
            logger.warning(f"获取动态选项失败: {e}")
            return []

    def build_server_lines(self, servers: List) -> List[str]:
        """构建服务器配置行列表"""
        if not servers:
            return ["localhost ansible_connection='local' ansible_ssh_user='root'"]
        
        server_lines = []
        for server in servers:
            line = self.build_server_line(server)
            if line:
                server_lines.append(line)
        
        return server_lines if server_lines else ["localhost ansible_connection='local' ansible_ssh_user='root'"]

    def update_inventory(self, servers: List) -> bool:
        """
        更新inventory_file中的[worker]部分
        
        Args:
            servers: 服务器列表,格式为 [{'ip': 'xxx', 'pass': 'xxx'}, ...]
        
        Returns:
            是否更新成功
        """
        inventory_path = os.path.join(self.base_dir, "ascend_deployer/inventory_file")
        
        try:
            with open(inventory_path, 'r', encoding='utf-8') as f:
                lines = f.readlines()
            
            server_lines = self.build_server_lines(servers)
            new_lines = []
            in_worker = False
            worker_replaced = False
            
            for line in lines:
                new_line, in_worker, should_skip = self.process_inventory_line(line, in_worker, server_lines)
                
                if should_skip:
                    continue
                
                if line.strip().startswith('[worker]') and not worker_replaced:
                    new_lines.append(line)
                    for server_line in server_lines:
                        new_lines.append(f"{server_line}\n")
                    worker_replaced = True
                    continue
                
                if new_line:
                    new_lines.append(new_line)
            
            with open(inventory_path, 'w', encoding='utf-8') as f:
                f.writelines(new_lines)
            
            return True
        except Exception as e:
            logger.error(f"更新inventory_file失败: {e}")
            return False

    def process_checkbox_param(self, cmd: List[str], value: Any) -> None:
        """处理checkbox类型参数"""
        if value in self.TRUE_VALUES:
            cmd.append(value)

    def build_command(self, profile: str, step_name: str, params: Dict) -> List[str]:
        """
        根据配置和用户输入构建shell命令
        
        Args:
            profile: profile名称
            step_name: 步骤名称
            params: 用户输入的参数
        
        Returns:
            构建好的命令列表
        """
        step = self.get_step(profile, step_name)
        if not step:
            raise ValueError(f"步骤 {step_name} 不存在于 profile {profile}")
        
        script_path = os.path.join(self.base_dir, "kadt", step['script'])
        cmd = ['bash', script_path]
        
        for param_def in step.get('params', []):
            param_name = param_def['name']
            param_type = param_def.get('type', 'text')
            value = params.get(param_name)
            
            if self.should_skip_param(param_def, value):
                continue
            
            if param_def.get('required') and not value:
                raise ValueError(f"参数 {param_name} 是必填项")
            
            self.add_param_to_command(cmd, param_def, value)
        
        return cmd

    def add_param_to_command(self, cmd: List[str], param_def: Dict, value: Any) -> None:
        """根据参数类型添加到命令"""
        param_name = param_def['name']
        param_type = param_def.get('type', 'text')
        
        if param_type == 'checkbox':
            if value in self.TRUE_VALUES:
                cmd.append(f"--{param_name}")
        elif param_type == 'select':
            self.process_select_param(cmd, param_def, value)
        else:
            self.process_standard_param(cmd, param_name, value)

    def generate_task_id(self) -> str:
        """生成唯一的任务ID"""
        with self.lock:
            self.task_counter += 1
            return f"task_{int(time.time())}_{self.task_counter}"

    def create_task_info(self, ctx: TaskContext) -> Dict:
        """创建任务信息字典"""
        step = self.get_step(ctx.profile, ctx.step_name)
        return {
            'task_id': ctx.task_id,
            'profile': ctx.profile,
            'step': ctx.step_name,
            'step_title': step.get('title', ctx.step_name),
            'status': 'running',
            'start_time': time.time(),
            'log_file': ctx.log_file,
            'command': ' '.join(ctx.cmd),
            'pid': None,
            'params': ctx.params
        }

    def handle_servers_param(self, step: Dict, params: Dict) -> None:
        """处理servers参数,更新inventory"""
        servers_param = params.get('servers')
        if servers_param is not None:
            self.update_inventory(servers_param)
            return
        
        if step.get('params'):
            has_servers_param = any(p.get('type') == 'servers' for p in step.get('params', []))
            if has_servers_param:
                self.update_inventory([])

    def execute_async(self, profile: str, step_name: str, params: Dict) -> Dict:
        """
        异步执行脚本
        
        Args:
            profile: profile名称
            step_name: 步骤名称
            params: 参数
        
        Returns:
            任务信息字典
        """
        task_id = self.generate_task_id()
        log_file = os.path.join(self.base_dir, "kadt/webui/logs", f"{task_id}.log")
        
        step = self.get_step(profile, step_name)
        self.handle_servers_param(step, params)
        
        cmd = self.build_command(profile, step_name, params)
        ctx = TaskContext(task_id, profile, step_name, params, cmd, log_file)
        task_info = self.create_task_info(ctx)
        
        thread = threading.Thread(
            target=self.run_process_task,
            args=(task_info, cmd),
            daemon=True
        )
        thread.start()
        
        with self.lock:
            self.running_tasks[task_id] = task_info
        
        return {'task_id': task_id, 'status': 'started', 'message': f'任务 {task_id} 已启动'}

    def run_process_task(self, task_info: Dict, cmd: List[str]) -> None:
        """执行进程任务(线程目标函数)"""
        log_file = task_info['log_file']
        task_id = task_info['task_id']
        
        try:
            self.write_task_header(log_file, task_info, cmd)
            
            script_dir = os.path.dirname(cmd[1])
            proc = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                stdin=subprocess.DEVNULL,
                bufsize=0,
                cwd=script_dir,
                start_new_session=True
            )
            
            with self.lock:
                task_info['pid'] = proc.pid
            self.save_task_status(task_id, task_info)
            
            self.read_process_output(proc, log_file)
            proc.wait()
            
            self.update_task_final_status(task_info, proc.returncode)
            self.save_task_status(task_id, task_info)
            self.write_task_footer(log_file, task_info, proc.returncode)
            
        except Exception as e:
            self.handle_task_error(task_info, log_file, e)

    def read_process_output(self, proc: subprocess.Popen, log_file: str) -> None:
        """读取进程输出并写入日志"""
        ansi_escape = re.compile(r'\x1b\[[0-9;]*[A-Za-z]')
        while True:
            line = proc.stdout.readline()
            if not line:
                break
            decoded_line = self.decode_line(line)
            clean_line = ansi_escape.sub('', decoded_line)
            with open(log_file, 'a', encoding='utf-8') as f_log:
                f_log.write(clean_line)
                f_log.flush()

    def update_task_final_status(self, task_info: Dict, returncode: int) -> None:
        """更新任务最终状态"""
        with self.lock:
            task_info['status'] = 'success' if returncode == 0 else 'failed'
            task_info['end_time'] = time.time()
            task_info['returncode'] = returncode

    def handle_task_error(self, task_info: Dict, log_file: str, error: Exception) -> None:
        """处理任务执行错误"""
        with self.lock:
            task_info['status'] = 'error'
            task_info['error'] = str(error)
            task_info['end_time'] = time.time()
        
        self.save_task_status(task_info['task_id'], task_info)
        
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write(f"\n错误: {error}\n")

    def get_task_info(self, task_id: str) -> Optional[Dict]:
        """获取任务信息"""
        with self.lock:
            return self.running_tasks.get(task_id)

    def get_log_content(self, task_id: str, last_pos: int = 0) -> Dict:
        """
        读取日志内容(从last_pos开始)
        
        Args:
            task_id: 任务ID
            last_pos: 上次读取位置
        
        Returns:
            {content: str, pos: int, status: str}
        """
        task = self.get_task_info(task_id)
        STR_CONTENT = 'content'
        STR_POS = 'pos'
        STR_STATUS = 'status'
        if not task:
            return {STR_CONTENT: '', STR_POS: 0, STR_STATUS: 'unknown'}
        
        log_file = task.get('log_file')
        if not log_file or not os.path.exists(log_file):
            return {STR_CONTENT: '', STR_POS: 0, STR_STATUS: task.get(STR_STATUS, 'unknown')}
        
        try:
            with open(log_file, 'r', encoding='utf-8') as f:
                f.seek(last_pos)
                content = f.read()
                new_pos = f.tell()
            
            return {
                STR_CONTENT: content,
                STR_POS: new_pos,
                STR_STATUS: task.get(STR_STATUS, 'running')
            }
        except Exception as e:
            return {STR_CONTENT: '', STR_POS: last_pos, STR_STATUS: 'error', 'error': str(e)}

    def get_all_tasks(self) -> List[Dict]:
        """获取所有任务列表,按开始时间降序排序"""
        with self.lock:
            tasks = list(self.running_tasks.values())
        return sorted(tasks, key=lambda t: t.get('start_time', 0), reverse=True)

    def write_kill_log(self, log_file: str, task_id: str, reason: str) -> None:
        """写入任务终止日志"""
        if not log_file or not os.path.exists(log_file):
            return
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write("\n" + "=" * 50 + "\n")
            f.write(f"=== 任务被中止: {task_id} ===\n")
            f.write(f"中止时间: {datetime.now(self.BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"原因: {reason}\n")

    def mark_task_killed(self, task: Dict, reason: str) -> None:
        """标记任务为已终止"""
        with self.lock:
            task['status'] = 'killed'
            task['end_time'] = time.time()
            task['reason'] = reason

    def kill_task(self, task_id: str) -> Dict:
        """
        中止正在运行的任务
        
        Args:
            task_id: 任务ID
        
        Returns:
            操作结果
        """
        task = self.get_task_info(task_id)
        STR_SUCCESS = 'success'
        STR_ERROR = 'error'
        if not task:
            return {STR_SUCCESS: False, STR_ERROR: '任务不存在'}
        
        if task.get('status') != 'running':
            return {STR_SUCCESS: False, STR_ERROR: f'任务状态为 {task.get("status")},无法中止'}
        
        pid = task.get('pid')
        if not pid:
            return {STR_SUCCESS: False, STR_ERROR: '无法获取进程PID'}
        
        terminated = self.terminate_process(pid)
        if not terminated:
            return {STR_SUCCESS: False, STR_ERROR: '终止进程失败'}
        
        self.mark_task_killed(task, '用户手动中止')
        self.save_task_status(task_id, task)
        self.write_kill_log(task.get('log_file'), task_id, '用户手动中止')
        
        return {STR_SUCCESS: True, 'message': '任务已中止'}

    def save_task_status(self, task_id: str, task_info: Dict):
        """保存任务状态到文件"""
        status_file = os.path.join(self.logs_dir, f"{task_id}.status.json")
        try:
            with open(status_file, 'w', encoding='utf-8') as f:
                json.dump(task_info, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logger.error(f"保存任务状态失败: {e}")

    def load_task_status(self, task_id: str) -> Optional[Dict]:
        """从文件加载任务状态"""
        status_file = os.path.join(self.logs_dir, f"{task_id}.status.json")
        if not os.path.exists(status_file):
            return None
        try:
            with open(status_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载任务状态失败: {e}")
            return None

    def recover_running_task(self, task_id: str, task: Dict) -> None:
        """恢复运行中的任务"""
        pid = task.get('pid')
        if self.is_process_alive(pid):
            logger.info(f"恢复运行中的任务: {task_id} (PID: {pid})")
            with self.lock:
                self.running_tasks[task_id] = task
                self.task_counter += 1
        else:
            logger.info(f"任务 {task_id} 进程已终止,更新状态")
            task['status'] = 'failed'
            task['end_time'] = time.time()
            task['reason'] = '进程意外终止'
            self.save_task_status(task_id, task)
            self.write_process_exit_log(task.get('log_file'), task_id)

    def recover_completed_task(self, task_id: str, task: Dict) -> None:
        """恢复已完成任务(24小时内)"""
        if not self.is_task_expired(task):
            with self.lock:
                self.running_tasks[task_id] = task

    def write_process_exit_log(self, log_file: str, task_id: str) -> None:
        """写入进程意外终止日志"""
        if not log_file or not os.path.exists(log_file):
            return
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write("\n" + "=" * 50 + "\n")
            f.write("=== 进程意外终止 (WebUI重启时检测) ===\n")
            f.write(f"检测时间: {datetime.now(self.BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}\n")

    def recover_tasks_on_startup(self):
        """启动时恢复任务状态"""
        status_files = glob.glob(os.path.join(self.logs_dir, "*.status.json"))
        
        for status_file in status_files:
            task_id = os.path.basename(status_file).replace('.status.json', '')
            task = self.load_task_status_from_file(status_file)
            
            if not task:
                continue
            
            status = task.get('status')
            if status == 'running':
                self.recover_running_task(task_id, task)
            elif status in ['success', 'failed', 'error', 'killed']:
                self.recover_completed_task(task_id, task)

    def terminate_running_task(self, task_id: str, task: Dict) -> None:
        """终止单个运行任务并更新状态"""
        pid = task.get('pid')
        if not pid or not self.is_process_alive(pid):
            return
        
        logger.info(f"终止任务 {task_id} (PID: {pid})")
        terminated = self.terminate_process(pid)
        
        if terminated:
            task['status'] = 'killed'
            task['end_time'] = time.time()
            task['reason'] = 'WebUI退出时终止'
            self.save_task_status(task_id, task)
            self.write_webui_exit_log(task.get('log_file'), task_id)

    def write_webui_exit_log(self, log_file: str, task_id: str) -> None:
        """写入WebUI退出终止日志"""
        if not log_file or not os.path.exists(log_file):
            return
        try:
            with open(log_file, 'a', encoding='utf-8') as f:
                f.write("\n" + "=" * 50 + "\n")
                f.write("=== 任务因WebUI退出而终止 ===\n")
                f.write(f"终止时间: {datetime.now(self.BEIJING_TZ).strftime('%Y-%m-%d %H:%M:%S')}\n")
        except (OSError, IOError) as e:
            logger.warning(f"写入日志文件失败: {log_file}, {e}")

    def cleanup_on_exit(self):
        """退出时清理所有运行中的任务"""
        if self.cleaning_up:
            return
        
        self.cleaning_up = True
        logger.info("正在清理运行中的任务...")
        
        with self.lock:
            running = [(tid, t) for tid, t in self.running_tasks.items() 
                       if t.get('status') == 'running']
        
        for task_id, task in running:
            self.terminate_running_task(task_id, task)
        
        logger.info("清理完成")

    def clear_old_tasks(self, max_age_hours: int = 24):
        """清理超过指定时间的旧任务"""
        current_time = time.time()
        with self.lock:
            to_remove = []
            for task_id, task in self.running_tasks.items():
                if task.get('status') in ['success', 'failed', 'error', 'killed']:
                    if self.is_task_expired(task, max_age_hours):
                        to_remove.append(task_id)
            
            for task_id in to_remove:
                del self.running_tasks[task_id]
                self.remove_task_files(task_id)

    def remove_task_files(self, task_id: str) -> None:
        """删除任务相关文件"""
        status_file = os.path.join(self.logs_dir, f"{task_id}.status.json")
        log_file = os.path.join(self.logs_dir, f"{task_id}.log")
        try:
            if os.path.exists(status_file):
                os.remove(status_file)
            if os.path.exists(log_file):
                os.remove(log_file)
        except Exception as e:
            logger.warning(f"清理文件失败: {e}")

    def signal_handler(self, signum, frame):
        """信号处理"""
        if self.cleaning_up:
            return
        
        logger.info(f"收到信号 {signum},正在退出...")
        self.cleanup_on_exit()
        self.should_exit = True
        signal.signal(signum, signal.SIG_DFL)
        os.kill(os.getpid(), signum)