from kafka import KafkaConsumer
from util.config_center import ConfigCenter
class KafkaConsumerConnect:
"""消费者连接"""
def __init__(self, topics, group_id, auto_offset_reset="latest"):
"""
:param topics:
:param group_id:消费组id
:param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
"""
self.clusters = ConfigCenter.get_kafka_config()
self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)
def init_consumer(self, topics, group_id, auto_offset_reset):
"""
初始化消费者
:param topics:
:param group_id:
:param auto_offset_reset:
:return: 消费者连接
"""
return KafkaConsumer(
topics, bootstrap_servers=self.clusters, group_id=group_id, auto_offset_reset=auto_offset_reset)
if __name__ == '__main__':
consumer = KafkaConsumerConnect("group_1", "consumer_1")
for msg in consumer.consumer:
print(msg)