# -------------------------------------------------------------------------
# This file is part of the MindStudio project.
# Copyright (c) 2025 Huawei Technologies Co.,Ltd.
#
# MindStudio is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
#          http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# -------------------------------------------------------------------------

import subprocess
import os
import sys
import time
from typing import Optional
from queue import Queue
import queue
import threading
import select

OUTPUT_TAIL_LIMIT = 500


class CommandExecutor:
    def __init__(self):
        self.process = None
        self._exit_code = None
        self.msg_out_queue = Queue()
        self.inst_in_queue = Queue()
        self.thread = None
        self.env = dict()
        self.output_lines = []

    def execute(self, command, env=None) -> None:
        """执行已设置的命令"""
        if command is None:
            raise ValueError("No command has been set. Use set_command() first.")

        self._reset()

        print(command)

        sub_process_env = os.environ.copy()
        if env:
            sub_process_env.update(env)

        self.process = subprocess.Popen(  # pylint: disable=consider-using-with
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=sub_process_env,
            universal_newlines=True,
            shell=isinstance(command, str),
        )
        self.thread = threading.Thread(target=self._monitor, daemon=True)
        self.thread.start()

    def clean_msg_out_queue(self):
        while not self.msg_out_queue.empty():
            try:
                self.msg_out_queue.get_nowait()
            except queue.Empty:
                break

    def _monitor(self):
        is_get_output = False
        process = self.process

        def read_instruction():
            nonlocal is_get_output
            if self.inst_in_queue.empty():
                return False
            instruction = self.inst_in_queue.get()
            if instruction == "get_output":
                is_get_output = True
            elif instruction == "not_get_output":
                if is_get_output:
                    self.clean_msg_out_queue()
                is_get_output = False
            elif instruction == "exit":
                return True
            else:
                return False
            return False

        while True:
            if read_instruction():
                break
            # 非阻塞检查管道
            reads = [process.stdout, process.stderr]
            ready, _, _ = select.select(reads, [], [], 0.1)

            for stream in ready:
                line = stream.readline()
                if not line:  # 进程结束
                    continue

                # 实时输出
                if stream == process.stdout:
                    sys.stdout.write(line)
                else:
                    sys.stderr.write(line)

                self.output_lines.append(line)
                if len(self.output_lines) > OUTPUT_TAIL_LIMIT:
                    self.output_lines = self.output_lines[-OUTPUT_TAIL_LIMIT:]

                if is_get_output:
                    self.msg_out_queue.put(line)

            if process.poll() is not None:
                self.msg_out_queue.put(None)
                self.msg_out_queue.put(process.poll())
                break

    def wait(self, target: Optional[str] = None, timeout: Optional[float] = None) -> tuple:
        """
        等待命令执行完成或输出中出现特定字符串

        参数:
            target: 要等待的目标字符串(None表示等待命令结束)
            from_stderr: 是否从标准错误流中查找
            timeout: 超时时间(秒)

        返回:
            错误码,
            条件是否满足
                -1: 条件不满足
                0: 条件满足(找到目标或正常结束)
                1: 超时
            其他: 进程退出码(未找到目标或异常结束)
        """
        if self.process is None:
            raise ValueError("Command has not been executed yet. Call execute() first.")

        start_time = time.time()
        if target is not None:
            self.inst_in_queue.put("get_output")
        if self.process.poll() is not None:
            return self.process.poll(), -1

        while True:
            try:
                output = self.msg_out_queue.get(timeout=1)
                if output is None:
                    return self.msg_out_queue.get(), -1
                elif target in output:
                    self.inst_in_queue.put("not_get_output")
                    return None, 0
                else:
                    pass
            except queue.Empty:
                time.sleep(0.1)

            # 检查超时
            if timeout is not None and (time.time() - start_time) > timeout:
                return None, 1

    def kill(self) -> None:
        """重置执行状态"""
        if self.process is not None:
            subprocess.run(["/usr/bin/pkill", "-P", f"{self.process.pid}"], check=False)
            subprocess.run(["/usr/bin/kill", "-9", f"{self.process.pid}"], check=False)
        self.process = None
        self._exit_code = None

    def get_output_tail(self, limit: int = 80) -> str:
        return "".join(self.output_lines[-limit:])

    def _reset(self) -> None:
        """重置执行状态"""
        if self.process is not None and self._exit_code is None:
            self.process.terminate()
        self.process = None
        self._exit_code = None
        self.output_lines = []

    def __del__(self):
        """析构函数,确保清理资源"""
        self._reset()