from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from util.config_center import ConfigCenter
class KafkaAdminClientConnect:
"""kafka主用户连接"""
def __init__(self):
clusters = ConfigCenter.get_kafka_config()
self.admin = KafkaAdminClient(bootstrap_servers=clusters)
def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
"""
创建topic
:param topic_name: topic名
:param num_partitions: 分区号
:param replication_factor: 备份数
:return: 创建结果
"""
topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
return self.admin.create_topics([topic], validate_only=False)
def delete_topic(self, topic_name):
"""
删除topic
:param topic_name:
:return:删除结果
"""
return self.admin.delete_topics([topic_name])