import os
import json
import time
from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
from rocketmq.client import PushConsumer, ConsumeStatus
class RocketMQConnect:
def __init__(self):
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, f"config{os.sep}config.json")
with open(config_path, "r") as config_file:
config = json.load(config_file)
config = config.get("rocket_mq", {})
self.name_src = config.get("name_src", '')
self.topic = config.get("topic", "")
self.consumer_group = config.get("consumer_group", "")
self.access_key = config.get("access_key", "")
self.secret_key = config.get("secret_key", "")
if not self.name_src:
raise Exception("配置文件中name_src为空值")
def init(self, obj):
obj.set_name_server_address(self.name_src)
if self.access_key and self.secret_key:
obj.set_session_credentials(self.access_key, self.secret_key, "")
def create_message(self, key, tag, property, body):
""" 创建消息体 """
msg = Message(self.topic)
msg.set_keys(key)
msg.set_tags(tag)
msg.set_property('property', property)
msg.set_body(body)
return msg
def send_message_sync(self, msg_body):
"""同步发送"""
producer = Producer(self.consumer_group)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
ret = producer.send_sync(msg)
print('send sync message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
def send_orderly_with_sharding_key(self, msg_body):
"""发送顺序消息"""
producer = Producer(self.consumer_group, True)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
print('send orderly message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
def send_delay_message(self, msg_body, delay_time=None):
"""发送定时消息"""
producer = Producer(self.consumer_group)
self.init(producer)
producer.start()
msg = self.create_message(**msg_body)
delay_time = delay_time if delay_time else str(int(round((time.time() + 3) * 1000)))
msg.set_property('__STARTDELIVERTIME', delay_time)
ret = producer.send_sync(msg)
print('send delay message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
producer.shutdown()
def callback(self, msg):
print(msg.id, msg.body, msg.get_property('property'))
return ConsumeStatus.CONSUME_SUCCESS
def send_transaction_message(self, count, msg_body):
"""发送事务消息"""
producer = TransactionMQProducer(self.consumer_group, self.check_callback)
self.init(producer)
producer.start()
for _ in range(count):
msg = self.create_message(**msg_body)
ret = producer.send_message_in_transaction(msg, self.local_execute, None)
print('send transaction message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
print('send transaction message done')
while True:
time.sleep(3600)
def local_execute(self, msg, user_args):
print('local: ' + msg.body.decode('utf-8'))
return TransactionStatus.UNKNOWN
def check_callback(self, msg):
print('check: ' + msg.body.decode('utf-8'))
return TransactionStatus.COMMIT
def start_consume_message(self):
"""订阅普通消息或事务消息"""
consumer = PushConsumer(self.consumer_group)
self.init(consumer)
consumer.subscribe(self.topic, self.callback)
print('start consume message')
consumer.start()
while True:
time.sleep(3600)