from kafka import KafkaConsumer

from util.config_center import ConfigCenter


class KafkaConsumerConnect:
    """消费者连接"""

    def __init__(self, topics, group_id, auto_offset_reset="latest"):
        """
        :param topics:
        :param group_id:消费组id
        :param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
        """
        # 获取配置信息
        self.clusters = ConfigCenter.get_kafka_config()
        self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)

    def init_consumer(self, topics, group_id, auto_offset_reset):
        """
        初始化消费者
        :param topics:
        :param group_id:
        :param auto_offset_reset:
        :return: 消费者连接
        """
        return KafkaConsumer(
            topics, bootstrap_servers=self.clusters, group_id=group_id, auto_offset_reset=auto_offset_reset)


if __name__ == '__main__':
    consumer = KafkaConsumerConnect("group_1", "consumer_1")
    for msg in consumer.consumer:
        print(msg)