import json
import os

from kafka import KafkaProducer


class KafkaProducerConnect:
    """生产者连接"""

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, f"config{os.sep}config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.producer = self.init_producer()

    def init_producer(self):
        """
        初始化
        :return:
        """
        producer = KafkaProducer(bootstrap_servers=self.clusters)
        return producer

    def send_msg(self, topic, msg, key=None):
        """
        发送消息
        :param topic: topic
        :param msg: 消息值
        :param key:
        :return:
        """
        msg = json.dumps(msg).encode("utf-8")
        flag = self.producer.send(topic, msg, key=key)
        return flag