| 文件 | 最后提交记录 | 最后更新时间 |
|---|---|---|
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 | ||
| 3 年前 |
华为云 Python DMS For Kafka 演示
1 说明
1.1 环境准备
- python3.9.2
- Pycharm 2022+ / Visual Studio 2022+
- Kafka 2.7
1.2 必备Pypi包
- kafka-python==2.0.2
1.3 消息队列选型
- 华为云分布式消息服务Kafka版
更多详情,请参考DMS Kafka版
1.4 核心代码介绍
- src\kafka_client_connect.py AdminClient负责topic的增删
class KafkaAdminClientConnect:
"""kafka主用户连接"""
def __init__(self):
# 获取配置文件路径
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, r"config\config.json")
with open(config_path, "r") as config_file:
# 反序列化配置文件
config = json.load(config_file)
config = config.get("kafka", {})
self.clusters = config.get("clusters", list())
if not self.clusters:
raise Exception("配置文件中clusters为空值")
self.admin = KafkaAdminClient(bootstrap_servers=self.clusters)
def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
"""
创建topic
:param topic_name: topic名
:param num_partitions: 分区号
:param replication_factor: 备份数
:return: 创建结果
"""
def delete_topic(self, topic_name):
"""
删除topic
:param topic_name:
:return:
"""
- src\kafka_consumer_connect.py
负责创建消费者的连接
class KafkaConsumerConnect:
"""消费者连接"""
def __init__(self, topics, group_id, auto_offset_reset="latest"):
"""
:param topics:
:param group_id:消费组id
:param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
"""
# 获取配置文件路径
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, r"config\config.json")
with open(config_path, "r") as config_file:
# 反序列化配置文件
config = json.load(config_file)
config = config.get("kafka", {})
self.clusters = config.get("clusters", list())
if not self.clusters:
raise Exception("配置文件中clusters为空值")
self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)
def init_consumer(self, topics, group_id, auto_offset_reset):
"""
初始化消费者
:param topics:
:param group_id:
:param auto_offset_reset:
:return:
"""
- src\kafka_producer_connect.py
负责对生产者的初始化及发送消息
class KafkaProducerConnect:
"""生产者连接"""
def __init__(self):
# 获取配置文件路径
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, r"config\config.json")
with open(config_path, "r") as config_file:
# 反序列化配置文件
config = json.load(config_file)
config = config.get("kafka", {})
self.clusters = config.get("clusters", list())
if not self.clusters:
raise Exception("配置文件中clusters为空值")
self.producer = self.init_producer()
self.fail_count = 0
def init_producer(self):
"""
初始化
:return:
"""
def send_msg(self, topic, msg, key=None):
"""
发送消息
:param topic: topic
:param msg: 消息值
:param key:
:return:
"""
- test\kafka_connect_test.py
测试消息队列的一系列操作
class KafkaConnectTest(unittest.TestCase):
kpc = None
kcc = None
kacc = None
topic = "test_topic"
group_id_1 = "test_group_id_1"
group_id_2 = "test_group_id_2"
@classmethod
def setUpClass(cls) -> None:
cls.kpc = KafkaProducerConnect()
cls.kacc = KafkaAdminClientConnect()
@classmethod
def tearDownClass(cls) -> None:
cls.kpc.producer.close()
cls.kacc.admin.close()
def test_0_create_topic(self):
"""创建topic"""
flag = self.kacc.create_topic(self.topic)
self.assertTrue(flag)
def test_1_send_message_to_topic(self):
"""生产者发送消息到topic"""
self.kpc.send_msg(self.topic, {"test": "test1", "age": "test1"})
self.kpc.send_msg(self.topic, {"test": "test2", "age": "test2"})
self.kpc.send_msg(self.topic, {"test": "test3", "age": "test3"})
self.kpc.send_msg(self.topic, {"test": "test4", "age": "test4"})
def test_2_get_message_from_topic(self):
"""消费者从最早的消息开始消费topic消息"""
kcc = KafkaConsumerConnect(self.topic, self.group_id_1, "earliest")
count = 0
for msg in kcc.consumer:
count += 1
if not json.loads(msg.value).get("test") == "test{0}".format(count):
self.assertTrue(False)
if count == 4:
break
self.assertTrue(True)
def test_3_delete_topic(self):
"""删除消息"""
flag = self.kacc.delete_topic(self.topic)
self.assertTrue(flag)
- config\config.json
{
"kafka": {
"clusters": ["ip:port","ip:port","ip:port"]
}
}
clusters: 集群ip加端口