import time
import unittest
from MessageQueue4RocketMQ.src.rocket_mq_connect import RocketMQConnect
class RocketMQConnectTest(unittest.TestCase):
rocketmq = None
@classmethod
def setUpClass(cls) -> None:
cls.rocketmq = RocketMQConnect()
def test_0_send_message_sync(self):
"""测试同步发送"""
msg_body = {
"key": "key-01",
'tag': 'test-tag',
"property": "test-sync",
"body": 'the first message'
}
self.rocketmq.send_message_sync(msg_body)
def test_1_send_orderly_with_sharding_key(self):
"""测试发送顺序消息"""
for i in range(5):
msg_body = {
"key": f"key-{i}",
'tag': f'test-orderly-tag',
"property": "test-orderly",
"body": f'this is No.{i}'
}
self.rocketmq.send_orderly_with_sharding_key(msg_body)
def test_2_send_delay_message(self):
"""测试发送定时消息"""
msg_body = {
"key": "key-delay",
'tag': 'test-delay-tag',
"property": "test-delay",
"body": 'the delay message'
}
delay_time = str(int(round((time.time() + 5) * 1000)))
self.rocketmq.send_delay_message(msg_body, delay_time=delay_time)
def test_3_send_transaction_message(self):
"""测试发送事务消息"""
count = 3
msg_body = {
"key": "key-transaction",
'tag': 'test-transaction-tag',
"property": "test-transaction",
"body": 'the transaction message'
}
self.rocketmq.send_transaction_message(count, msg_body)
if __name__ == '__main__':
unittest.main()