文件最后提交记录最后更新时间
3 年前
3 年前
3 年前
3 年前
3 年前
README.md

华为云 Python DMS For RabbitMQ 演示

1 说明

1.1 环境准备

  • python3.9.2
  • Pycharm 2022+ / Visual Studio 2022+
  • RabbitMQ 3.8.35

1.2 必备Pypi包

  • pika==1.3.0

1.3 消息队列选型

  • 华为云分布式消息服务RabbitMQ版
    更多详情,请参考DMS RabbitMQ版

1.4 核心代码介绍

  • src\rabbit_mq_connect.py.py RabbitMQConnect为RabbitMQ连接基类,包括了对连接的初始化及队列及交换机的增删
    RabbitMQProducer继承至RabbitMQConnect为生产者实现,主要职责为对消息的发送操作 RabbitMQConsumer继承至RabbitMQConnect为消费者实现,主要职责为对消息的消费操作
class RabbitMQConnect:
    """Rabbit消息队列连接基类"""


class RabbitMQProducer(RabbitMQConnect):
    """RabbitMQ生产者实现"""


class RabbitMQConsumer(RabbitMQConnect):
    """RabbitMQ消费者实现"""
  • test\rabbit_mq_connect_test.py
    测试生产者生产消息
class RabbitMQConnectTest(unittest.TestCase):
    produce = None
    queue_name = "test"

    @classmethod
    def setUpClass(cls) -> None:
        cls.produce = RabbitMQProducer()

    @classmethod
    def tearDownClass(cls) -> None:
        cls.produce.close()

    def test_0_send_msg_to_queue_direct(self):
        """测试direct模式发送消息"""
        self.produce.state_queue(self.queue_name)
        for i in range(4):
            self.produce.send_message_to_queue("", self.queue_name, "test{0}".format(i))

    def test_1_send_msg_to_queue_fanout(self):
        """测试fanout模式发送消息"""
        exchange_name = "test_exchange"
        self.produce.state_exchange(exchange_name, exchange_type='fanout')
        for i in range(4):
            self.produce.send_message_to_queue(exchange_name, "", "test{0}".format(i))

    def test_2_delete_queue(self):
        """测试删除队列"""
        queue_name = "test2"
        self.produce.delete_queue(queue_name)

    def test_3_delete_exchange(self):
        """测试删除交换机"""
        exchange = "test_exchange"
        self.produce.delete_exchange(exchange)
  • test\rabbit_mq_consumer_connect_test.py
    不同模式下消费者测试,主要为direct和fanout两种模式的测试。有两个参数分别为consumer_type(消费模式)和num(消费者号码)。 可通过terminal运行命令行多个消费者来测试不同模式下的消费区别。
    可能会出现找不到模块的情况,可以把项目路径(即MessageQueue4RabbitMQ的上一级路径)添加到环境变量中。变量名为PYTHONPATH,变量值为项目路径。

python rabbit_mq_consumer_connect_test.py fanout 1 python rabbit_mq_consumer_connect_test.py fanout 2

    import sys
    consumer_type = sys.argv[1]
    num = sys.argv[2]
    if consumer_type == "direct":
        consumer = RabbitMQConsumer()
        print("消费者进程{0}开始运行".format(num))
        consumer.consume_msg("test", call_back)
        print("结束监控队列")
    elif consumer_type == "fanout":
        queue_name = "test{0}".format(num)
        consumer = RabbitMQConsumer()
        consumer.state_queue(queue_name)
        consumer.state_exchange("test_exchange", exchange_type='fanout')
        consumer.queue_bind_exchange(queue_name, "test_exchange")
        print("消费者进程{0}开始运行".format(num))
        consumer.consume_msg(queue_name, call_back)
  • config\config.json
{
  "rabbit_mq": {
    "ip_address": "ip",
    "port": 5672,
    "username": "username",
    "password": "password"
  }
}

ip_address: 实例ip地址 port: 端口 username: 用户名 password: 密码