华为云 Python NoSql For GaussDB(for Influx) 演示

1.说明

本项目将演示通过influxdb对GaussDB(for Influx)进行操作。

1.1 环境准备

  • python3.9.2
  • Pycharm 2022+ / Visual Studio 2022+
  • Influx 1.7

1.2必备Pypi包

  • influxdb==5.3.1

1.3 数据库选型

1.4 核心代码介绍

  • src\influx_connect.py
class InfluxConnect:

    def __new__(cls, *args, **kwargs):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, r"config\config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("influxdb", None)
        if not config:
            raise Exception("连接地址不存在或为空")
        return InfluxDBClient(**config)

  • test\influx_connect_test.py
    对influx进行增删改查测试
class InfluxConnectTest(unittest.TestCase):
    ic = None
    database = "test"

    @classmethod
    def setUpClass(cls) -> None:
        cls.ic = InfluxConnect()

    @classmethod
    def tearDownClass(cls) -> None:
        cls.ic.close()

    def test_0_create_database(self):
        """创建database"""
        self.ic.create_database(self.database)
        self.assertTrue(self.database in [i["name"] for i in self.ic.get_list_database()])

    def test_1_insert_point(self):
        """插入一个POINT"""
        point = [{
            "measurement": "test",
            "tags": {
                "id": 1,
            },
            "fields": {
                "name": "test",
                "age": 10
            }
        }]
        self.ic.switch_database(self.database)
        self.ic.write_points(point)
        result = self.ic.query(f"SELECT * FROM test WHERE id='1'", database=self.database)
        self.assertTrue(result)

    def test_3_delete_series(self):
        """创建database"""
        self.ic.delete_series(database=self.database, measurement="test", tags={"id": "1"})
        result = self.ic.query(f"SELECT * FROM test WHERE id='1'", database=self.database)
        self.assertTrue(not result)

    def test_4_drop_measurement(self):
        """删除measurement"""
        self.ic.switch_database(self.database)
        self.ic.drop_measurement(measurement="test")
        self.assertTrue("test" not in [i["name"] for i in self.ic.get_list_measurements()])

    def test_5_drop_database(self):
        """删除database"""
        self.ic.drop_database(self.database)
        self.assertTrue(self.database not in [i["name"] for i in self.ic.get_list_database()])
  • config\config.json
{
  "influxdb": {
    "host": "ip_address",
    "port": 8635,
    "username": "username",
    "password": "password",
    "ssl": true
  }
}

username:用户名
password:用户密码
host:数据库IP地址
port:数据库端口
ssl:是否开启ssl,通常开启