import json
import os
from kafka import KafkaProducer
class KafkaProducerConnect:
"""生产者连接"""
def __init__(self):
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, f"config{os.sep}config.json")
with open(config_path, "r") as config_file:
config = json.load(config_file)
config = config.get("kafka", {})
self.clusters = config.get("clusters", list())
if not self.clusters:
raise Exception("配置文件中clusters为空值")
self.producer = self.init_producer()
def init_producer(self):
"""
初始化
:return:
"""
producer = KafkaProducer(bootstrap_servers=self.clusters)
return producer
def send_msg(self, topic, msg, key=None):
"""
发送消息
:param topic: topic
:param msg: 消息值
:param key:
:return:
"""
msg = json.dumps(msg).encode("utf-8")
flag = self.producer.send(topic, msg, key=key)
return flag