'''
数据集管理api服务
'''
import json
from web_apps import db
from utils.query_utils import get_base_query
from utils.auth import set_insert_user, set_update_user, get_auth_token_info
from utils.common_utils import gen_json_response, gen_uuid
from web_apps.rag.db_models import Dataset
def serialize_dataset_model(obj, ser_type='list'):
'''
序列化模型数据
:param obj:
:param ser_type:
:return:
'''
dic = obj.to_dict()
if ser_type == 'list':
res = {}
for k in ['id', 'name', 'status', 'create_by', 'create_time', 'update_by', 'update_time', 'del_flag', 'sort_no', 'description']:
if k in []:
res[k] = json.loads(dic[k])
else:
res[k] = dic[k]
return res
elif ser_type == 'detail':
for k in []:
dic[k] = json.loads(dic[k])
for k in []:
dic.pop(k)
elif ser_type == 'all_list':
res = {}
for k in ['id', 'name']:
if k in []:
res[k] = json.loads(dic[k])
else:
res[k] = dic[k]
return res
return dic
class DatasetApiService(object):
def __init__(self):
pass
def get_obj_list(self, req_dict):
'''
获取列表
'''
page = int(req_dict.get('page', 1))
pagesize = int(req_dict.get('pagesize', 10))
query = get_base_query(Dataset)
name = req_dict.get('name', '')
if name != '':
query = query.filter(Dataset.name.like("%" + name + "%"))
status = req_dict.get('status', '')
if status != '':
query = query.filter(Dataset.status == status)
total = query.count()
query = query.offset((page - 1) * pagesize)
query = query.limit(pagesize)
obj_list = query.all()
result = []
for obj in obj_list:
dic = serialize_dataset_model(obj, ser_type='list')
result.append(dic)
res_data = {
'records': result,
'total': total
}
return gen_json_response(data=res_data)
def get_obj_all_list(self, req_dict):
'''
获取全量列表
'''
query = get_base_query(Dataset)
ids = req_dict.get('ids', '')
if ids:
ids = ids.split(',')
query = query.filter(Dataset.id.in_(ids))
obj_list = query.filter(Dataset.status == 1).all()
result = []
for obj in obj_list:
dic = serialize_dataset_model(obj, ser_type='all_list')
result.append(dic)
return gen_json_response(data=result)
def get_obj_detail(self, req_dict):
'''
获取详情
'''
obj_id = req_dict.get('id')
obj = db.session.query(Dataset).filter(
Dataset.id == obj_id,
Dataset.del_flag == 0).first()
if not obj:
return gen_json_response(code=400, msg='未找到数据')
dic = serialize_dataset_model(obj, ser_type='detail')
return gen_json_response(data=dic)
def add_obj(self, req_dict):
'''
添加
'''
name = req_dict.get('name', '')
if name != '':
exist_obj = db.session.query(Dataset).filter(
Dataset.name == name,
Dataset.del_flag == 0).first()
if exist_obj:
return gen_json_response(code=400, msg='字段"名称"已存在')
obj = Dataset()
for key in req_dict:
if key in []:
setattr(obj, key, json.dumps(req_dict[key], ensure_ascii=False, indent=2))
else:
setattr(obj, key, req_dict[key])
obj.id = gen_uuid(res_type='base')
set_insert_user(obj)
db.session.add(obj)
db.session.commit()
db.session.flush()
return gen_json_response(msg='添加成功', extends={'success': True})
def edit_obj(self, req_dict):
'''
编辑
'''
obj_id = req_dict.get('id')
exist_query = db.session.query(Dataset).filter(Dataset.id != obj_id)
name = req_dict.get('name', '')
if name != '':
exist_query = exist_query.filter(Dataset.name == name)
exist_obj = exist_query.first()
if exist_obj:
return gen_json_response(code=400, msg='数据已存在')
obj = db.session.query(Dataset).filter(Dataset.id == obj_id).first()
if obj is None:
return gen_json_response(code=400, msg='未找到数据')
for key in req_dict:
if key in []:
setattr(obj, key, json.dumps(req_dict[key], ensure_ascii=False, indent=2))
else:
setattr(obj, key, req_dict[key])
set_update_user(obj)
db.session.add(obj)
db.session.commit()
db.session.flush()
return gen_json_response(msg='编辑成功', extends={'success': True})
def delete_obj(self, req_dict):
'''
删除
'''
obj_id = req_dict['id']
del_obj = db.session.query(Dataset).filter(Dataset.id == obj_id).first()
if del_obj is None:
return gen_json_response(code=400, msg='未找到数据')
if del_obj.built_in == 1:
return gen_json_response(code=400, msg='内置数据集,禁止删除')
del_obj.del_flag = 1
set_update_user(del_obj)
db.session.add(del_obj)
db.session.commit()
db.session.flush()
return gen_json_response(code=200, msg='删除成功', extends={'success': True})
def delete_batch(self, req_dict):
'''
批量删除
'''
del_ids = req_dict.get('ids')
if isinstance(del_ids, str):
del_ids = del_ids.split(',')
del_objs = db.session.query(Dataset).filter(Dataset.id.in_(del_ids)).all()
has_built_in = [i for i in del_objs if i.built_in == 1] != []
if has_built_in:
return gen_json_response(code=400, msg='含有内置数据集,禁止删除')
for del_obj in del_objs:
del_obj.del_flag = 1
set_update_user(del_obj)
db.session.add(del_obj)
db.session.commit()
db.session.flush()
return gen_json_response(code=200, msg='删除成功', extends={'success': True})