文件最后提交记录最后更新时间
3 年前
3 年前
3 年前
3 年前
3 年前
README.md

华为云 Python NoSql For GaussDB(for Cassandra) 演示

1.说明

本项目将演示通过cassandra-driver对GaussDB(for Cassandra)进行操作。

1.1 环境准备

  • python3.9.2
  • Pycharm 2022+ / Visual Studio 2022+
  • Cassandra 3.11

1.2必备Pypi包

  • cassandra-driver==3.25.0

1.3 数据库选型

1.4 核心代码介绍

  • src\cassandra_connect.py
class CassandraConnect:

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, r"config\config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("cassandra", {})
        clusters = config.get("clusters", list())
        username = config.get("username", "")
        password = config.get("password", "")
        port = config.get("port", "")
        if not clusters:
            raise Exception("配置文件中clusters为空值")
        auth_provider = PlainTextAuthProvider(username=username, password=password)
        self.cluster = Cluster(contact_points=clusters, auth_provider=auth_provider, port=port)
        self.session = self.cluster.connect()

    def execute_cql(self, cql, args):

    def create_keyspace(self, keyspace, strategy="SimpleStrategy", replication_factor=1):

    def create_table(self, keyspace, table_name, cols):

    def query(self, keyspace, table_name, query_cols="*", where_condition=""):

    def _insert_prepare(self, keyspace, table_name, cols: list):

    def insert_one(self, keyspace, table_name, cols: list, value: list):

    def insert_many(self, keyspace, table_name, cols: list, values: list):

    def update(self, keyspace, table_name, update_cols, update_value, where_condition):

    def delete(self, keyspace, table_name, where_condition):

    def delete_table(self, keyspace, table_name):

    def delete_keyspace(self, keyspace):

    def close(self):
  • test\cassandra_connect_test.py
    对cassandra进行增删改查测试
class CassandraConnectTest(unittest.TestCase):
    cc = None

    @classmethod
    def setUpClass(cls) -> None:
        cls.cc = CassandraConnect()

    @classmethod
    def tearDownClass(cls) -> None:
        cls.cc.close()

    def test_0_create_keyspace(self):
        """测试创建keyspace"""
        self.cc.create_keyspace("test")
        self.assertTrue("test" in self.cc.cluster.metadata.keyspaces)

    def test_1_create_table(self):
        """测试创建表"""
        self.cc.create_table("test", "test", [["name", "varchar primary key"], ["age", "int"]])
        self.assertTrue("test" in self.cc.cluster.metadata.keyspaces["test"].tables)

    def test_2_insert_one(self):
        """测试插入值"""
        self.cc.insert_one("test", "test", ["name", "age"], ["test", 10])
        result = self.cc.query("test", "test", "*", "name='test'")
        self.assertTrue(result.one().name == "test" and result.one().age == 10)

    def test_3_insert_many(self):
        """测试批量插入值"""
        values = [["test1", 10], ["test2", 10]]
        self.cc.insert_many("test", "test", ["name", "age"], values)
        for value in values:
            where_condition = "name='{0}'".format(value[0])
            result = self.cc.query("test", "test", "*", where_condition)
            self.assertTrue(result.one().name == value[0] and result.one().age == value[1])

    def test_4_query(self):
        """测试查询值"""
        verification_result = ["test", "test1", "test2"]
        results = self.cc.query("test", "test", "*", "age=10")
        for result in results:
            self.assertTrue(result.name in verification_result)

    def test_5_update(self):
        """测试更新值"""
        self.cc.update("test", "test", ["age"], [20], "name='test'")
        result = self.cc.query("test", "test", "*", "name='test'")
        self.assertTrue(result.one().age == 20)

    def test_6_delete(self):
        """测试删除值"""
        self.cc.delete("test", "test", "name='test'")
        result = self.cc.query("test", "test", "*", "name='test'")
        self.assertTrue(result.one() is None)

    def test_7_delete_table(self):
        """测试删除table"""
        self.cc.delete_table("test", "test")
        self.assertTrue("test" not in self.cc.cluster.metadata.keyspaces["test"].tables)

    def test_8_delete_keyspace(self):
        """测试删除keyspace"""
        self.cc.delete_keyspace("test")
        self.assertTrue("test" not in self.cc.cluster.metadata.keyspaces)
  • config\config.json
{
  "cassandra": {
    "clusters": [
      "ip",
      "ip"
    ],
    "username": "",
    "password": "",
    "port": 8635
  }
}

username:用户名
password:用户密码
clusters:数据库集群各IP地址
port:数据库端口