import json
import os

from kafka import KafkaConsumer


class KafkaConsumerConnect:
    """消费者连接"""

    def __init__(self, topics, group_id, auto_offset_reset="latest"):
        """

        :param topics:
        :param group_id:消费组id
        :param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
        """
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, f"config{os.sep}config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)

    def init_consumer(self, topics, group_id, auto_offset_reset):
        """
        初始化消费者
        :param topics:
        :param group_id:
        :param auto_offset_reset:
        :return:
        """
        return KafkaConsumer(
            topics, bootstrap_servers=self.clusters, group_id=group_id, auto_offset_reset=auto_offset_reset)