文件最后提交记录最后更新时间
3 年前
3 年前
3 年前
3 年前
3 年前
README.md

华为云 Python DMS For Kafka 演示

1 说明

1.1 环境准备

  • python3.9.2
  • Pycharm 2022+ / Visual Studio 2022+
  • Kafka 2.7

1.2 必备Pypi包

  • kafka-python==2.0.2

1.3 消息队列选型

  • 华为云分布式消息服务Kafka版
    更多详情,请参考DMS Kafka版

1.4 核心代码介绍

  • src\kafka_client_connect.py AdminClient负责topic的增删
class KafkaAdminClientConnect:
    """kafka主用户连接"""

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, r"config\config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.admin = KafkaAdminClient(bootstrap_servers=self.clusters)

    def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
        """
        创建topic
        :param topic_name: topic名
        :param num_partitions: 分区号
        :param replication_factor: 备份数
        :return: 创建结果
        """

    def delete_topic(self, topic_name):
        """
        删除topic
        :param topic_name:
        :return:
        """

  • src\kafka_consumer_connect.py
    负责创建消费者的连接
class KafkaConsumerConnect:
    """消费者连接"""

    def __init__(self, topics, group_id, auto_offset_reset="latest"):
        """

        :param topics:
        :param group_id:消费组id
        :param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
        """
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, r"config\config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)

    def init_consumer(self, topics, group_id, auto_offset_reset):
        """
        初始化消费者
        :param topics:
        :param group_id:
        :param auto_offset_reset:
        :return:
        """
  • src\kafka_producer_connect.py
    负责对生产者的初始化及发送消息
class KafkaProducerConnect:
    """生产者连接"""

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, r"config\config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.producer = self.init_producer()
        self.fail_count = 0

    def init_producer(self):
        """
        初始化
        :return:
        """

    def send_msg(self, topic, msg, key=None):
        """
        发送消息
        :param topic: topic
        :param msg: 消息值
        :param key:
        :return:
        """
  • test\kafka_connect_test.py
    测试消息队列的一系列操作
class KafkaConnectTest(unittest.TestCase):
    kpc = None
    kcc = None
    kacc = None
    topic = "test_topic"
    group_id_1 = "test_group_id_1"
    group_id_2 = "test_group_id_2"

    @classmethod
    def setUpClass(cls) -> None:
        cls.kpc = KafkaProducerConnect()
        cls.kacc = KafkaAdminClientConnect()

    @classmethod
    def tearDownClass(cls) -> None:
        cls.kpc.producer.close()
        cls.kacc.admin.close()

    def test_0_create_topic(self):
        """创建topic"""
        flag = self.kacc.create_topic(self.topic)
        self.assertTrue(flag)

    def test_1_send_message_to_topic(self):
        """生产者发送消息到topic"""
        self.kpc.send_msg(self.topic, {"test": "test1", "age": "test1"})
        self.kpc.send_msg(self.topic, {"test": "test2", "age": "test2"})
        self.kpc.send_msg(self.topic, {"test": "test3", "age": "test3"})
        self.kpc.send_msg(self.topic, {"test": "test4", "age": "test4"})

    def test_2_get_message_from_topic(self):
        """消费者从最早的消息开始消费topic消息"""
        kcc = KafkaConsumerConnect(self.topic, self.group_id_1, "earliest")
        count = 0
        for msg in kcc.consumer:
            count += 1
            if not json.loads(msg.value).get("test") == "test{0}".format(count):
                self.assertTrue(False)
            if count == 4:
                break
        self.assertTrue(True)

    def test_3_delete_topic(self):
        """删除消息"""
        flag = self.kacc.delete_topic(self.topic)
        self.assertTrue(flag)
  • config\config.json
{
  "kafka": {
    "clusters": ["ip:port","ip:port","ip:port"]
  }
}

clusters: 集群ip加端口