import json
import os
from kafka import KafkaConsumer
class KafkaConsumerConnect:
"""消费者连接"""
def __init__(self, topics, group_id, auto_offset_reset="latest"):
"""
:param topics:
:param group_id:消费组id
:param auto_offset_reset:消费模式。默认为latest为从最新的开始消费, earliest为从最早的开始消费
"""
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, f"config{os.sep}config.json")
with open(config_path, "r") as config_file:
config = json.load(config_file)
config = config.get("kafka", {})
self.clusters = config.get("clusters", list())
if not self.clusters:
raise Exception("配置文件中clusters为空值")
self.consumer = self.init_consumer(topics, group_id, auto_offset_reset)
def init_consumer(self, topics, group_id, auto_offset_reset):
"""
初始化消费者
:param topics:
:param group_id:
:param auto_offset_reset:
:return:
"""
return KafkaConsumer(
topics, bootstrap_servers=self.clusters, group_id=group_id, auto_offset_reset=auto_offset_reset)