import os
import json
import time
from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
from rocketmq.client import PushConsumer, ConsumeStatus


class RocketMQConnect:

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, f"config{os.sep}config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("rocket_mq", {})
        self.name_src = config.get("name_src", '')
        self.topic = config.get("topic", "")
        self.consumer_group = config.get("consumer_group", "")
        self.access_key = config.get("access_key", "")
        self.secret_key = config.get("secret_key", "")

        if not self.name_src:
            raise Exception("配置文件中name_src为空值")

    def init(self, obj):
        # 设置连接RocketMQ的参数
        obj.set_name_server_address(self.name_src)
        # 增加用户认证信息
        if self.access_key and self.secret_key:
            obj.set_session_credentials(self.access_key, self.secret_key, "")

    def create_message(self, key, tag, property, body):
        """ 创建消息体 """
        msg = Message(self.topic)
        msg.set_keys(key)
        msg.set_tags(tag)
        msg.set_property('property', property)
        msg.set_body(body)
        return msg

    def send_message_sync(self, msg_body):
        """同步发送"""
        producer = Producer(self.consumer_group)
        self.init(producer)
        producer.start()
        msg = self.create_message(**msg_body)
        ret = producer.send_sync(msg)
        print('send sync message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()

    def send_orderly_with_sharding_key(self, msg_body):
        """发送顺序消息"""
        producer = Producer(self.consumer_group, True)
        self.init(producer)
        producer.start()
        msg = self.create_message(**msg_body)
        ret = producer.send_orderly_with_sharding_key(msg, 'orderId')
        print('send orderly message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()

    def send_delay_message(self, msg_body, delay_time=None):
        """发送定时消息"""
        producer = Producer(self.consumer_group)
        self.init(producer)
        producer.start()
        msg = self.create_message(**msg_body)
        # delay_time 默认延时3s 时间戳的字符串格式,如果被设置为小于当前时间戳,消息将立即被投递给消费者,
        # 可通过以下方式指定对应时刻:str(int(time.mktime(time.strptime("2022-10-26 10:20:30", "%Y-%M-%d %H:%M:%S"))))
        delay_time = delay_time if delay_time else str(int(round((time.time() + 3) * 1000)))
        msg.set_property('__STARTDELIVERTIME', delay_time)
        ret = producer.send_sync(msg)
        print('send delay message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        producer.shutdown()

    def callback(self, msg):
        print(msg.id, msg.body, msg.get_property('property'))
        return ConsumeStatus.CONSUME_SUCCESS

    def send_transaction_message(self, count, msg_body):
        """发送事务消息"""
        producer = TransactionMQProducer(self.consumer_group, self.check_callback)
        self.init(producer)
        producer.start()
        for _ in range(count):
            msg = self.create_message(**msg_body)
            ret = producer.send_message_in_transaction(msg, self.local_execute, None)
            print('send transaction message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
        print('send transaction message done')

        while True:
            time.sleep(3600)

    def local_execute(self, msg, user_args):
        print('local:   ' + msg.body.decode('utf-8'))
        # 返回UNKNOW状态,会进行回查,调用回调函数check_callback
        return TransactionStatus.UNKNOWN

    def check_callback(self, msg):
        print('check: ' + msg.body.decode('utf-8'))
        return TransactionStatus.COMMIT

    def start_consume_message(self):
        """订阅普通消息或事务消息"""
        consumer = PushConsumer(self.consumer_group)
        self.init(consumer)
        consumer.subscribe(self.topic, self.callback)
        print('start consume message')
        consumer.start()

        while True:
            time.sleep(3600)