华为云 Python DMS For RabbitMQ 演示
1 说明
1.1 环境准备
- python3.9.2
- Pycharm 2022+ / Visual Studio 2022+
- RabbitMQ 3.8.35
1.2 必备Pypi包
- pika==1.3.0
1.3 消息队列选型
- 华为云分布式消息服务RabbitMQ版
更多详情,请参考DMS RabbitMQ版
1.4 核心代码介绍
- src\rabbit_mq_connect.py.py
RabbitMQConnect为RabbitMQ连接基类,包括了对连接的初始化及队列及交换机的增删
RabbitMQProducer继承至RabbitMQConnect为生产者实现,主要职责为对消息的发送操作 RabbitMQConsumer继承至RabbitMQConnect为消费者实现,主要职责为对消息的消费操作
class RabbitMQConnect:
"""Rabbit消息队列连接基类"""
class RabbitMQProducer(RabbitMQConnect):
"""RabbitMQ生产者实现"""
class RabbitMQConsumer(RabbitMQConnect):
"""RabbitMQ消费者实现"""
- test\rabbit_mq_connect_test.py
测试生产者生产消息
class RabbitMQConnectTest(unittest.TestCase):
produce = None
queue_name = "test"
@classmethod
def setUpClass(cls) -> None:
cls.produce = RabbitMQProducer()
@classmethod
def tearDownClass(cls) -> None:
cls.produce.close()
def test_0_send_msg_to_queue_direct(self):
"""测试direct模式发送消息"""
self.produce.state_queue(self.queue_name)
for i in range(4):
self.produce.send_message_to_queue("", self.queue_name, "test{0}".format(i))
def test_1_send_msg_to_queue_fanout(self):
"""测试fanout模式发送消息"""
exchange_name = "test_exchange"
self.produce.state_exchange(exchange_name, exchange_type='fanout')
for i in range(4):
self.produce.send_message_to_queue(exchange_name, "", "test{0}".format(i))
def test_2_delete_queue(self):
"""测试删除队列"""
queue_name = "test2"
self.produce.delete_queue(queue_name)
def test_3_delete_exchange(self):
"""测试删除交换机"""
exchange = "test_exchange"
self.produce.delete_exchange(exchange)
- test\rabbit_mq_consumer_connect_test.py
不同模式下消费者测试,主要为direct和fanout两种模式的测试。有两个参数分别为consumer_type(消费模式)和num(消费者号码)。 可通过terminal运行命令行多个消费者来测试不同模式下的消费区别。
可能会出现找不到模块的情况,可以把项目路径(即MessageQueue4RabbitMQ的上一级路径)添加到环境变量中。变量名为PYTHONPATH,变量值为项目路径。
python rabbit_mq_consumer_connect_test.py fanout 1 python rabbit_mq_consumer_connect_test.py fanout 2
import sys
consumer_type = sys.argv[1]
num = sys.argv[2]
if consumer_type == "direct":
consumer = RabbitMQConsumer()
print("消费者进程{0}开始运行".format(num))
consumer.consume_msg("test", call_back)
print("结束监控队列")
elif consumer_type == "fanout":
queue_name = "test{0}".format(num)
consumer = RabbitMQConsumer()
consumer.state_queue(queue_name)
consumer.state_exchange("test_exchange", exchange_type='fanout')
consumer.queue_bind_exchange(queue_name, "test_exchange")
print("消费者进程{0}开始运行".format(num))
consumer.consume_msg(queue_name, call_back)
- config\config.json
{
"rabbit_mq": {
"ip_address": "ip",
"port": 5672,
"username": "username",
"password": "password"
}
}
ip_address: 实例ip地址 port: 端口 username: 用户名 password: 密码