import json
import unittest

from MessageQueue4Kafka.src.kafka_client_connect import KafkaAdminClientConnect
from MessageQueue4Kafka.src.kafka_consumer_connect import KafkaConsumerConnect
from MessageQueue4Kafka.src.kafka_producer_connect import KafkaProducerConnect


class KafkaConnectTest(unittest.TestCase):
    kpc = None
    kcc = None
    kacc = None
    topic = "test_topic"
    group_id_1 = "test_group_id_1"
    group_id_2 = "test_group_id_2"

    @classmethod
    def setUpClass(cls) -> None:
        """测试对象生成时准备"""
        # 创建生产者连接
        cls.kpc = KafkaProducerConnect()
        # 创建admin用户连接
        cls.kacc = KafkaAdminClientConnect()

    @classmethod
    def tearDownClass(cls) -> None:
        """测试对象结束时操作"""
        # 关闭生成者连接
        cls.kpc.producer.close()
        # 关闭admin用户连接
        cls.kacc.admin.close()

    def test_0_create_topic(self):
        """创建topic"""
        flag = self.kacc.create_topic(self.topic)
        self.assertTrue(flag)

    def test_1_send_message_to_topic(self):
        """生产者发送消息到topic"""
        self.kpc.send_msg(self.topic, {"test": "test1", "age": "test1"})
        self.kpc.send_msg(self.topic, {"test": "test2", "age": "test2"})
        self.kpc.send_msg(self.topic, {"test": "test3", "age": "test3"})
        self.kpc.send_msg(self.topic, {"test": "test4", "age": "test4"})

    def test_2_get_message_from_topic(self):
        """消费者从最早的消息开始消费topic消息"""
        # 创建消费者连接,从队列最早的消息开始消费
        kcc = KafkaConsumerConnect(self.topic, self.group_id_1, "earliest")
        count = 0
        # 消费开始监听topic消息
        for msg in kcc.consumer:
            count += 1
            # 对应上一个测试方法生产者发送到队列的消息,如果不一致,则测试失败
            if not json.loads(msg.value).get("test") == "test{0}".format(count):
                self.assertTrue(False)
            # 当计数为4时结束监听
            if count == 4:
                break
        self.assertTrue(True)

    def test_3_delete_topic(self):
        """删除topic"""
        flag = self.kacc.delete_topic(self.topic)
        self.assertTrue(flag)


if __name__ == '__main__':
    unittest.main()