import json
import unittest
from MessageQueue4Kafka.src.kafka_client_connect import KafkaAdminClientConnect
from MessageQueue4Kafka.src.kafka_consumer_connect import KafkaConsumerConnect
from MessageQueue4Kafka.src.kafka_producer_connect import KafkaProducerConnect
class KafkaConnectTest(unittest.TestCase):
kpc = None
kcc = None
kacc = None
topic = "test_topic"
group_id_1 = "test_group_id_1"
group_id_2 = "test_group_id_2"
@classmethod
def setUpClass(cls) -> None:
"""测试对象生成时准备"""
cls.kpc = KafkaProducerConnect()
cls.kacc = KafkaAdminClientConnect()
@classmethod
def tearDownClass(cls) -> None:
"""测试对象结束时操作"""
cls.kpc.producer.close()
cls.kacc.admin.close()
def test_0_create_topic(self):
"""创建topic"""
flag = self.kacc.create_topic(self.topic)
self.assertTrue(flag)
def test_1_send_message_to_topic(self):
"""生产者发送消息到topic"""
self.kpc.send_msg(self.topic, {"test": "test1", "age": "test1"})
self.kpc.send_msg(self.topic, {"test": "test2", "age": "test2"})
self.kpc.send_msg(self.topic, {"test": "test3", "age": "test3"})
self.kpc.send_msg(self.topic, {"test": "test4", "age": "test4"})
def test_2_get_message_from_topic(self):
"""消费者从最早的消息开始消费topic消息"""
kcc = KafkaConsumerConnect(self.topic, self.group_id_1, "earliest")
count = 0
for msg in kcc.consumer:
count += 1
if not json.loads(msg.value).get("test") == "test{0}".format(count):
self.assertTrue(False)
if count == 4:
break
self.assertTrue(True)
def test_3_delete_topic(self):
"""删除topic"""
flag = self.kacc.delete_topic(self.topic)
self.assertTrue(flag)
if __name__ == '__main__':
unittest.main()