from aiokafka import AIOKafkaConsumer

from util.config_center import ConfigCenter


class AIOKafkaConsumerConnect:
    """异步消费者连接"""

    def __init__(self, topics, group_id, auto_offset_reset="latest"):
        """
        :param topics:
        :param group_id:消费组id
        :param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
        """
        # 获取配置信息
        self.clusters = ConfigCenter.get_kafka_config()
        self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)

    def init_consumer(self, topics, group_id, auto_offset_reset):
        """
        初始化消费者
        :param topics:
        :param group_id:
        :param auto_offset_reset:
        :return: 消费者连接
        """
        return AIOKafkaConsumer(
            topics, bootstrap_servers=self.clusters, group_id=group_id, auto_offset_reset=auto_offset_reset)