华为云 Python DMS For RocketMQ 演示
1 说明
1.1 环境准备
- Linux(Huawei Cloud EulerOS 2.0 标准版 64位)
- python 3.9.9
- Pycharm 2022+ / Visual Studio 2022+
- RocketMQ 4.8.0
1.2 必备Pypi包
- rocketmq-client-python==2.0.0
1.3 消息队列选项
- 华为云分布式消息服务RocketMQ版
更多详情,请参考DMS RocketMQ版
1.4 安装依赖包
# 获取rocketmq-client-cpp安装包 2.2.0版本
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
# 安装rocketmq-client-cpp
sudo rpm -ivh rocketmq-client-cpp-2.2.0-centos7.x86_64.rpm
# 查找librocketmq.so的路径
find / -name librocketmq.so
# 将librocketmq.so文件绝对路径添加到系统动态库搜索路径。
ln -s {librocketmq.so文件绝对路径} /usr/lib
sudo ldconfig
# 安装Pypi包
pip install rocketmq-client-python
1.5 核心代码介绍
- 生产者或消费者初始化
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def init(self, obj):
# 设置连接RocketMQ的参数
obj.set_name_server_address(self.name_src)
# 增加用户认证信息
if self.access_key and self.secret_key:
obj.set_session_credentials(self.access_key, self.secret_key, "")
- 创建消息体
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def create_message(self, key, tag, property, body):
""" 创建消息体 """
msg = Message(self.topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_property('property', property)
msg.set_body(body)
return msg
- 发送普通消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_message_sync(self, msg_body):
"""同步发送"""
producer = Producer(self.consumer_group)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
ret = producer.send_sync(msg)
print('send sync message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
- 发送顺序消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_orderly_with_sharding_key(self, msg_body):
"""发送顺序消息"""
producer = Producer(self.consumer_group, True)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
print('send orderly message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
- 发送定时消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_delay_message(self, msg_body, delay_time=None):
"""发送定时消息"""
producer = Producer(self.consumer_group)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
# delay_time 默认延时3s 时间戳的字符串格式,如果被设置为小于当前时间戳,消息将立即被投递给消费者,可通过以下方式指定对应时刻
# delay_time = str(int(time.mktime(time.strptime("2022-10-26 10:20:30", "%Y-%M-%d %H:%M:%S"))))
delay_time = delay_time if delay_time else str(int(round((time.time() + 3) * 1000)))
msg.set_property('__STARTDELIVERTIME', delay_time)
ret = producer.send_sync(msg)
print('send delay message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
- 发送事务消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def send_transaction_message(self, count, msg_body):
"""发送事务消息"""
producer = TransactionMQProducer(self.consumer_group, self.check_callback)
self.init(producer)
producer.start()
for _ in range(count):
msg = self.create_message(**msg_body)
ret = producer.send_message_in_transaction(msg, self.local_execute, None)
print('send transaction message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
print('send transaction message done')
while True:
time.sleep(3600)
def local_execute(self, msg, user_args):
print('local: ' + msg.body.decode('utf-8'))
# 返回UNKNOW状态,会进行回查,调用回调函数check_callback
return TransactionStatus.UNKNOWN
def check_callback(self, msg):
print('check: ' + msg.body.decode

TransactionStatus.COMMIT:提交事务,允许消费者消费该消息。
TransactionStatus.ROLLBACK:回滚事务,消息将被丢弃不允许消费。
TransactionStatus.UNKNOWN:无法判断状态,期待服务端向生产者再次回查该消息的状态。
- 订阅消息
# MessageQueue4RocketMQ/src/rocket_mq_connect.py
def start_consume_message(self):
"""订阅消息"""
consumer = PushConsumer(self.consumer_group)
self.init(consumer)
consumer.subscribe(self.topic, self.callback)
print('start consume message')
consumer.start()
while True:
time.sleep(3600)
def callback(self, msg):
print(msg.id, msg.body, msg.get_property('property'))
return ConsumeStatus.CONSUME_SUCCESS
1.6 配置文件
- MessageQueue4RocketMQ/config/config.json
{
"rocket_mq": {
"name_src": "name_src",
"topic": "topic",
"consumer_group": "consumer_group",
"access_key": "access_key",
"secret_key": "secret_key"
}
}
name_src: 实例元数据连接地址和端口(ip_address:port),集群模式使用逗号 ‘,’ 隔开
topic:消息主题 (须在华为云控制台创建)
consumer_group:消费组
access_key:用户名(开启ACL权限访问需要)
secret_key:密钥(开启ACL权限访问需要)
1.7 测试代码
# 进入根路径 MessageQueue4RocketMQ (当前绝对路径为/home/python/MessageQueue4RocketMQ)
cd /home/python/MessageQueue4RocketMQ
# 开启订阅消息
python src/rocket_mq_recv.py
# 另起终端开启测试
python test/rocket_mq_test.py
生产者发送消息结果

消费组订阅消息结果
