文件最后提交记录最后更新时间
3 年前
3 年前
3 年前
3 年前
3 年前
3 年前
README.md

华为云 Python DMS For RocketMQ 演示

1 说明

1.1 环境准备

  • Linux(Huawei Cloud EulerOS 2.0 标准版 64位)
  • python 3.9.9
  • Pycharm 2022+ / Visual Studio 2022+
  • RocketMQ 4.8.0

1.2 必备Pypi包

  • rocketmq-client-python==2.0.0

1.3 消息队列选项

  • 华为云分布式消息服务RocketMQ版
    更多详情,请参考DMS RocketMQ版

1.4 安装依赖包

# 获取rocketmq-client-cpp安装包 2.2.0版本
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
# 安装rocketmq-client-cpp
sudo rpm -ivh rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
# 查找librocketmq.so的路径
find / -name librocketmq.so
# 将librocketmq.so文件绝对路径添加到系统动态库搜索路径。
ln -s {librocketmq.so文件绝对路径} /usr/lib
sudo ldconfig
# 安装Pypi包
pip install rocketmq-client-python

1.5 核心代码介绍

  • 生产者或消费者初始化
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def init(self, obj):
    # 设置连接RocketMQ的参数
    obj.set_name_server_address(self.name_src)
    # 增加用户认证信息
    if self.access_key and self.secret_key:
        obj.set_session_credentials(self.access_key, self.secret_key, "")
  • 创建消息体
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def create_message(self, key, tag, property, body):
    """ 创建消息体 """
    msg = Message(self.topic)
    msg.set_keys(key)
    msg.set_tags(tag)
    msg.set_property('property', property)
    msg.set_body(body)
    return msg
  • 发送普通消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_message_sync(self, msg_body):
    """同步发送"""
    producer = Producer(self.consumer_group)
    self.init(producer)
    producer.start()
    msg = self.create_message(**msg_body)
    ret = producer.send_sync(msg)
    print('send sync message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()
  • 发送顺序消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_orderly_with_sharding_key(self, msg_body):
    """发送顺序消息"""
    producer = Producer(self.consumer_group, True)
    self.init(producer)
    producer.start()
    msg = self.create_message(**msg_body)
    ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
    print('send orderly message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()
  • 发送定时消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_delay_message(self, msg_body, delay_time=None):
    """发送定时消息"""
    producer = Producer(self.consumer_group)
    self.init(producer)
    producer.start()
    msg = self.create_message(**msg_body)
    # delay_time 默认延时3s 时间戳的字符串格式,如果被设置为小于当前时间戳,消息将立即被投递给消费者,可通过以下方式指定对应时刻
    # delay_time = str(int(time.mktime(time.strptime("2022-10-26 10:20:30", "%Y-%M-%d %H:%M:%S"))))
    delay_time = delay_time if delay_time else str(int(round((time.time() + 3) * 1000)))
    msg.set_property('__STARTDELIVERTIME', delay_time)
    ret = producer.send_sync(msg)
    print('send delay message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    producer.shutdown()
  • 发送事务消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_transaction_message(self, count, msg_body):
    """发送事务消息"""
    producer = TransactionMQProducer(self.consumer_group, self.check_callback)
    self.init(producer)
    producer.start()
    for _ in range(count):
        msg = self.create_message(**msg_body)
        ret = producer.send_message_in_transaction(msg, self.local_execute, None)
        print('send transaction message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    print('send transaction message done')

    while True:
        time.sleep(3600)

def local_execute(self, msg, user_args):
    print('local:   ' + msg.body.decode('utf-8'))
    # 返回UNKNOW状态,会进行回查,调用回调函数check_callback
    return TransactionStatus.UNKNOWN

def check_callback(self, msg):
    print('check: ' + msg.body.decode

transaction

TransactionStatus.COMMIT:提交事务,允许消费者消费该消息。
TransactionStatus.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
TransactionStatus.UNKNOWN:无法判断状态,期待服务端向生产者再次回查该消息的状态。

  • 订阅消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def start_consume_message(self):
    """订阅消息"""
    consumer = PushConsumer(self.consumer_group)
    self.init(consumer)
    consumer.subscribe(self.topic, self.callback)
    print('start consume message')
    consumer.start()

    while True:
        time.sleep(3600)

def callback(self, msg):
    print(msg.id, msg.body, msg.get_property('property'))
    return ConsumeStatus.CONSUME_SUCCESS

1.6 配置文件

  • MessageQueue4RocketMQ/config/config.json
{
  "rocket_mq": {
    "name_src": "name_src",
    "topic": "topic",
    "consumer_group": "consumer_group",
    "access_key": "access_key",
    "secret_key": "secret_key"
  }
}

name_src: 实例元数据连接地址和端口(ip_address:port),集群模式使用逗号 ‘,’ 隔开
topic:消息主题 (须在华为云控制台创建)
consumer_group:消费组
access_key:用户名(开启ACL权限访问需要)
secret_key:密钥(开启ACL权限访问需要)

1.7 测试代码

# 进入根路径 MessageQueue4RocketMQ (当前绝对路径为/home/python/MessageQueue4RocketMQ)
cd /home/python/MessageQueue4RocketMQ 
# 开启订阅消息
python src/rocket_mq_recv.py
# 另起终端开启测试
python test/rocket_mq_test.py

生产者发送消息结果
test
消费组订阅消息结果
recv