import json
import os
KAFKA_CLUSTERS = "KAFKA_CLUSTERS"
REDIS_HOST = "REDIS_HOST"
REDIS_PORT = "REDIS_PORT"
REDIS_PASSWORD = "REDIS_PASSWORD"
INFLUX_HOST = "INFLUX_HOST"
INFLUX_PORT = "INFLUX_PORT"
INFLUX_USERNAME = "INFLUX_USERNAME"
INFLUX_PASSWORD = "INFLUX_PASSWORD"
class ConfigCenter:
"""配置中心"""
@classmethod
def get_value_from_env(cls, key):
value = os.getenv(key)
if not value:
raise Exception("没有设置{0}环境变量".format(key))
return value
@classmethod
def get_kafka_config(cls):
return json.loads(cls.get_value_from_env(KAFKA_CLUSTERS))
@classmethod
def get_redis_config(cls, db_num):
host = cls.get_value_from_env(REDIS_HOST)
port = cls.get_value_from_env(REDIS_PORT)
password = cls.get_value_from_env(REDIS_PASSWORD)
return {
"host": host,
"port": port,
"password": password,
"db": db_num
}
@classmethod
def get_influx_config(cls):
host = cls.get_value_from_env(INFLUX_HOST)
port = cls.get_value_from_env(INFLUX_PORT)
username = cls.get_value_from_env(INFLUX_USERNAME)
password = cls.get_value_from_env(INFLUX_PASSWORD)
return {
"host": host,
"port": port,
"username": username,
"password": password,
}