"""
   Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
   You can use this software according to the terms and conditions of the Mulan PSL v2.
   You may obtain a copy of Mulan PSL v2 at:
            http://license.coscl.org.cn/MulanPSL2
   THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
   EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
   MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
   See the Mulan PSL v2 for more details.
"""

import os
import argparse
import shutil
import unittest

from omnihelper.flink_log_parser import FlinkLogParser


class TestFlinkLogParser(unittest.TestCase):
    """测试 FlinkLogParser 类的测试用例"""

    def setUp(self):
        """初始化测试环境"""
        self.test_data = [
            {
                'jobid': 'job-123',
                'taskid': 'task-456',
                '状态': 'RUNNING',
                '算子名称': 'Map',
                'Input': 'test_input_1',
                'Output': 'test_output_1',
                '出现频次': 5,
                '运行时间(s)': 1.23,
                '输入数据量': '100MB',
                '输出数据量': '50MB',
                '表达式/内置函数名称': 'SUBSTRING',
                '表达式Input': 'col1, 1, 5',
                '嵌套内容': 'SUBSTRING(col1, 1, 5)',
                '表达式出现频次': 3
            },
            {
                'jobid': 'job-123',
                'taskid': 'task-456',
                '状态': 'RUNNING',
                '算子名称': 'Filter',
                'Input': 'test_input_1',
                'Output': 'test_output_2',
                '出现频次': 3,
                '运行时间(s)': 0.89,
                '输入数据量': '50MB',
                '输出数据量': '30MB',
                '表达式/内置函数名称': 'WHERE',
                '表达式Input': 'col2 > 10',
                '嵌套内容': 'WHERE col2 > 10',
                '表达式出现频次': 2
            },
            {
                'jobid': 'job-123',
                'taskid': 'task-789',
                '状态': 'FINISHED',
                '算子名称': 'Reduce',
                'Input': 'test_output_2',
                'Output': 'test_output_3',
                '出现频次': 2,
                '运行时间(s)': 2.45,
                '输入数据量': '30MB',
                '输出数据量': '10MB',
                '表达式/内置函数名称': 'SUM',
                '表达式Input': 'col3',
                '嵌套内容': 'SUM(col3)',
                '表达式出现频次': 1
            },
            {
                'jobid': 'job-456',
                'taskid': 'task-123',
                '状态': 'RUNNING',
                '算子名称': 'Join',
                'Input': 'test_input_2, test_input_3',
                'Output': 'test_output_4',
                '出现频次': 1,
                '运行时间(s)': 3.67,
                '输入数据量': '200MB',
                '输出数据量': '150MB',
                '表达式/内置函数名称': '',
                '表达式Input': '',
                '嵌套内容': '',
                '表达式出现频次': ''
            },
            {
                'jobid': 'job-456',
                'taskid': 'task-123',
                '状态': 'RUNNING',
                '算子名称': '',
                'Input': '',
                'Output': '',
                '出现频次': '',
                '运行时间(s)': '',
                '输入数据量': '',
                '输出数据量': '',
                '表达式/内置函数名称': 'CONCAT',
                '表达式Input': 'col1, col2',
                '嵌套内容': 'CONCAT(col1, col2)',
                '表达式出现频次': 4
            }
        ]

        self.args = argparse.Namespace(
            url='http://127.0.0.1:8081',
            interval=1000,
            timeout=30,
            no_ssl_verify=False,
            ssl_verify=True,
            output_dir=None,
            show_op_details=True,
            jobid=None,
            header=None,
            kerberos=False,
            kerberos_mutual_auth='OPTIONAL',
            input_data=None,
        )
        self.output_dir = "./tmp_output"
        self.args.output_dir = self.output_dir

    def tearDown(self):
        """清理测试环境"""
        if os.path.exists(self.output_dir):
            shutil.rmtree(self.output_dir)

    def test_flink_report_generation(self):
        """测试 Flink 报告生成功能"""
        flink_parser = FlinkLogParser(self.args)
        flink_parser.analysis_result = self.test_data
        flink_parser.generate_report()

        output_files = os.listdir(self.output_dir)
        self.assertGreater(len(output_files), 0, "No report file generated")

    def test_window_join_exec_plan_parsing(self):
        """测试 WindowJoin 执行计划解析:验证算子提取、类型解析、递归死循环修复"""
        from omnihelper.flink.operator.op_parse import FlinkParser

        column_type = {
            'event_type': 'INT', 'person': 'ROW', 'auction': 'ROW', 'bid': 'ROW',
            'dateTime': 'TIMESTAMP',
            'order_id': 'BIGINT', 'user_id': 'BIGINT', 'amount': 'DECIMAL',
            'pay_amount': 'DECIMAL', 'order_id0': 'BIGINT',
            'o_window_start': 'TIMESTAMP', 'o_window_end': 'TIMESTAMP',
            'p_window_start': 'TIMESTAMP', 'p_window_end': 'TIMESTAMP',
        }
        parser = FlinkParser(None, column_type, None)

        description = (
            "[1]:TableSourceScan(table=[[default_catalog, default_database, datagen]], "
            "fields=[event_type, person, auction, bid])"
            "<br/>+- "
            "[2]:Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, "
            "(event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])"
            "<br/>   +- "
            "[3]:WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])"
            "<br/>      +- "
            "[4]:Calc(select=[dateTime, bid.auction AS $1, bid.bidder AS $2, bid.price AS $3, "
            "bid.channel AS $4, bid.url AS $5, bid.extra AS $6], where=[(event_type = 2)])"
            "<br/> \n"
            "[6]:Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], "
            "rankRange=[rankStart=1, rankEnd=10], partitionBy=[$1], orderBy=[$3 DESC], "
            "select=[dateTime, $1, $2, $3, $4, $5, $6, rank_number])"
            "<br/>+- "
            "[7]:Calc(select=[$1 AS auction, $2 AS bidder, $3 AS price, $4 AS channel, "
            "$5 AS url, dateTime, $6 AS extra, rank_number])"
            "<br/>   +- "
            "[8]:Sink(table=[default_catalog.default_database.nexmark_q19], "
            "fields=[auction, bidder, price, channel, url, dateTime, extra, rank_number])"
            "<br/>"
        )

        import re
        raw_parts = re.split(r"<br/>[\s:*\+\-]*|\n", description)
        description_data = [
            parsed_line for line in raw_parts
            if (parsed_line := FlinkParser.parse_single_description_line(line)) is not None
        ]

        schema_chain = parser._build_schema_chain_for_vertex(description_data)
        self.assertGreater(len(schema_chain), 0, "schema_chain should not be empty")

        op_types = [entry.get("op_type") for entry in schema_chain]
        self.assertIn("TableSourceScan", op_types, "TableSourceScan should be in schema_chain")
        self.assertIn("Calc", op_types, "Calc should be in schema_chain")

        source_entry = next(e for e in schema_chain if e.get("op_type") == "TableSourceScan")
        self.assertGreater(len(source_entry.get("output_schema", [])), 0,
                           "TableSourceScan should have output_schema")

        ops = parser._build_ops_from_schema_chain(schema_chain, {})
        self.assertGreater(len(ops), 0, "ops should not be empty")

        source_op = next((op for op in ops if op["op_type"] == "TableSourceScan"), None)
        self.assertIsNotNone(source_op, "TableSourceScan should be in ops")
        self.assertNotEqual(source_op["input_types_str"], "",
                            "TableSourceScan input should not be empty")
        self.assertEqual(source_op["output_types_str"], "",
                         "TableSourceScan output should be empty")

    def test_window_join_recursion_fix(self):
        """测试 WindowJoin 执行计划不触发递归死循环"""
        from omnihelper.flink.operator.op_parse import FlinkParser

        column_type = {
            'order_id': 'BIGINT', 'user_id': 'BIGINT', 'amount': 'DECIMAL',
            'pay_amount': 'DECIMAL', 'order_id0': 'BIGINT',
            'o_window_start': 'TIMESTAMP', 'o_window_end': 'TIMESTAMP',
            'p_window_start': 'TIMESTAMP', 'p_window_end': 'TIMESTAMP',
        }
        parser = FlinkParser(None, column_type, None)

        description = (
            "[165]:WindowJoin(leftWindow=[TUMBLE(win_start=[o_window_start], "
            "win_end=[o_window_end], size=[1 min])], "
            "rightWindow=[TUMBLE(win_start=[p_window_start], win_end=[p_window_end], "
            "size=[1 min])], joinType=[InnerJoin], where=[(order_id = order_id0)], "
            "select=[o_window_start, o_window_end, order_id, user_id, amount, "
            "p_window_start, p_window_end, order_id0, pay_amount])"
            "<br/>+- "
            "[166]:Calc(select=[o_window_start AS window_start, o_window_end AS window_end, "
            "order_id, user_id, amount, pay_amount])"
            "<br/>   +- "
            "[167]:Sink(table=[default_catalog.default_database.window_join_print], "
            "fields=[window_start, window_end, order_id, user_id, amount, pay_amount])"
            "<br/>"
        )

        import re
        raw_parts = re.split(r"<br/>[\s:*\+\-]*|\n", description)
        description_data = [
            parsed_line for line in raw_parts
            if (parsed_line := FlinkParser.parse_single_description_line(line)) is not None
        ]

        try:
            schema_chain = parser._build_schema_chain_for_vertex(description_data)
            self.assertIsInstance(schema_chain, list, "schema_chain should be a list")
        except RecursionError:
            self.fail("RecursionError should not occur when parsing WindowJoin description")


if __name__ == "__main__":
    unittest.main()