import json
import os
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement, BatchType
from cassandra.query import ConsistencyLevel
class CassandraConnect:
def __init__(self):
dir_path = os.path.split(os.path.split(__file__)[0])[0]
config_path = os.path.join(dir_path, f"config{os.sep}config.json")
with open(config_path, "r") as config_file:
config = json.load(config_file)
config = config.get("cassandra", {})
clusters = config.get("clusters", list())
username = config.get("username", "")
password = config.get("password", "")
port = config.get("port", "")
if not clusters:
raise Exception("配置文件中clusters为空值")
auth_provider = PlainTextAuthProvider(username=username, password=password)
self.cluster = Cluster(contact_points=clusters, auth_provider=auth_provider, port=port)
self.session = self.cluster.connect()
def execute_cql(self, cql, args):
try:
result = self.session.execute(cql, args)
except Exception as e:
print("错误信息{0}".format(e))
raise e
return result
def create_keyspace(self, keyspace, strategy="SimpleStrategy", replication_factor=1):
"""
创建keyspace
:param keyspace: 键空间
:param strategy: 策略
:param replication_factor: 备份数量
:return:
"""
cql = "CREATE KEYSPACE %s WITH replication = {'class': '%s', 'replication_factor': %s};" %\
(keyspace, strategy, replication_factor)
self.execute_cql(cql, [])
def create_table(self, keyspace, table_name, cols):
"""
创建表
:param keyspace: 键空间
:param table_name: 表名
:param cols: 字段及字段类型 例[["col1_name","col1_type"], ["col2_name","col2_type"]]
"""
cols_str = ", ".join([f"{col_name} {col_type}" for col_name, col_type in cols])
cql = self.session.prepare(f"CREATE TABLE {keyspace}.{table_name}({cols_str});")
self.execute_cql(cql, [])
def query(self, keyspace, table_name, query_cols="*", where_condition=""):
"""
查询
:param keyspace:键空间
:param table_name:表名
:param query_cols:查询字段
:param where_condition:筛选条件
:return:
"""
cql = f"SELECT {query_cols} FROM {keyspace}.{table_name} WHERE {where_condition} ALLOW FILTERING"
cql = self.session.prepare(cql)
return self.execute_cql(cql, [])
def _insert_prepare(self, keyspace, table_name, cols: list):
"""
插入准备
:param keyspace:键空间
:param table_name:表名
:param cols:字段
:return:
"""
value_format = ", ".join(["?" for i in cols])
cols = ", ".join(cols)
cql = f"insert into {keyspace}.{table_name} ({cols}) values ({value_format});"
return self.session.prepare(cql)
def insert_one(self, keyspace, table_name, cols: list, value: list):
"""
插入一条数据
:param keyspace:键空间
:param table_name:表名
:param cols:字段
:param value:插入值
:return:
"""
cql = self._insert_prepare(keyspace, table_name, cols)
return self.execute_cql(cql, value)
def insert_many(self, keyspace, table_name, cols: list, values: list):
"""
插入多条数据
:param keyspace:键空间
:param table_name:表名
:param cols:字段
:param values:批量插入值
:return:
"""
cql = self._insert_prepare(keyspace, table_name, cols)
batch = BatchStatement(batch_type=BatchType.UNLOGGED, consistency_level=ConsistencyLevel.QUORUM)
for value in values:
batch.add(cql, value)
return self.execute_cql(batch, [])
def update(self, keyspace, table_name, update_cols, update_value, where_condition):
"""
更新
:param keyspace:键空间
:param table_name:表名
:param update_cols:更新字段
:param update_value:更新值
:param where_condition:筛选条件
:return:
"""
cols_str = ""
for col in update_cols:
cols_str += "{0}=?, ".format(col)
cols_str = cols_str[:-2]
if where_condition:
cql = f"UPDATE {keyspace}.{table_name} SET {cols_str} WHERE {where_condition}"
else:
cql = f"UPDATE {keyspace}.{table_name} SET {cols_str}"
cql = self.session.prepare(cql)
return self.execute_cql(cql, update_value)
def delete(self, keyspace, table_name, where_condition):
"""
删除值
:param keyspace:键空间
:param table_name:表名
:param where_condition:筛选条件
:return:
"""
cql = f"DELETE FROM {keyspace}.{table_name} WHERE {where_condition}"
cql = self.session.prepare(cql)
return self.execute_cql(cql, [])
def delete_table(self, keyspace, table_name):
"""
删除表
:param keyspace:键空间
:param table_name:表名
:return:
"""
cql = f"DROP TABLE {keyspace}.{table_name}"
cql = self.session.prepare(cql)
return self.execute_cql(cql, [])
def delete_keyspace(self, keyspace):
"""
删除keyspace
:param keyspace:键空间
:return:
"""
cql = f"DROP KEYSPACE {keyspace}"
cql = self.session.prepare(cql)
return self.execute_cql(cql, [])
def close(self):
self.session.shutdown()
print("session是否关闭:{0}".format(self.session.is_shutdown))
self.cluster.shutdown()
print("cluster是否关闭:{0}".format(self.cluster.is_shutdown))