文档操作
本章节详细介绍 Opensearch兼容接口的文档操作,包括插入、更新、删除和批量操作。
模块信息
源文件: opensearch_sdk/client/document_ops.py
Mixin 类: DocumentOpsMixin
相关模块:
opensearch_sdk/client/doc_utils/nested_handler.py: Nested 字段处理opensearch_sdk/client/doc_utils/serialization.py: 字段序列化opensearch_sdk/client/doc_utils/type_inference.py: 动态列类型推断opensearch_sdk/client/doc_utils/operations.py: 文档操作实现
概述
SDK 提供四种基本的文档操作方法,每种方法有不同的语义和行为:
| 方法 | 行为 | 文档存在时 | 文档不存在时 |
|---|---|---|---|
| index | UPSERT(存在更新,不存在创建) | 更新 | 创建 |
| create | 纯插入 | 抛出 409 Conflict | 创建 |
| update | 纯更新 | 更新 | 抛出 404 Not Found |
| delete | 删除 | 删除 | 抛出 404 Not Found |
注意:SDK 中没有 insert 方法,请使用 create 方法进行纯插入操作。
index 方法
功能描述
插入或更新单个文档(UPSERT 语义)。如果文档 ID 已存在则更新,不存在则创建。
函数签名
def index(self, index: str, id: str, body: Dict[str, Any], refresh: bool = False) -> Any
参数说明
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| index | str | 是 | - | 索引名称(对应数据库表名) |
| id | str | 是 | - | 文档 ID |
| body | dict | 是 | - | 文档内容 |
| refresh | bool | 否 | False | 是否刷新(Opensearch 中此参数被忽略) |
返回值
成功时返回:
{
"result": "created" | "updated",
"_id": "文档ID",
"_index": "索引名"
}
代码示例
# 插入或更新文档
result = client.index(
index="my_index",
id="doc1",
body={
"title": "Hello Opensearch",
"content": "This is a test document",
"count": 10
}
)
print(result)
# 输出: {'result': 'created', '_id': 'doc1', '_index': 'my_index'}
# 再次调用则会更新
result = client.index(
index="my_index",
id="doc1",
body={
"title": "Updated Title"
}
)
print(result)
# 输出: {'result': 'updated', '_id': 'doc1', '_index': 'my_index'}
实现细节
index 方法的内部实现流程:
1. 验证索引名称
└── 使用 _validate_identifier() 验证名称合法性
2. 处理文档内容
└── 遍历 body 中的每个字段
└── 验证字段名合法性
└── 将 dict/list 类型转换为 JSON 字符串
3. 尝试 INSERT
└── SQL: INSERT INTO {index} (id, field1, field2, ...) VALUES (%s, %s, %s, ...)
└── 执行 INSERT
4. 处理 INSERT 失败(主键冲突)
└── 捕获 "duplicate"/"unique"/"already exists" 错误
└── 执行 UPDATE: UPDATE {index} SET field1 = %s, field2 = %s WHERE id = %s
SQL 生成示例
# 假设 body = {"title": "Test", "count": 10}
# INSERT SQL
INSERT INTO my_index (id, title, count) VALUES (%s, %s, %s)
# UPDATE SQL (当 INSERT 失败时)
UPDATE my_index SET title = %s, count = %s WHERE id = %s
create 方法
功能描述
纯粹的 INSERT 操作。如果文档已存在,将抛出 409 Conflict 错误。
函数签名
def create(self, index: str, id: str, body: Dict[str, Any], refresh: bool = False) -> Any
参数说明
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| index | str | 是 | - | 索引名称(对应数据库表名) |
| id | str | 是 | - | 文档 ID |
| body | dict | 是 | - | 文档内容 |
| refresh | bool | 否 | False | 是否刷新(Opensearch 中此参数被忽略) |
代码示例
# 插入新文档
result = client.create(
index="my_index",
id="doc1",
body={
"title": "Hello Opensearch",
"content": "This is a test document"
}
)
print(result)
# 输出: {'result': 'created', '_id': 'doc1', '_index': 'my_index'}
# 尝试插入已存在的文档
try:
client.create(
index="my_index",
id="doc1",
body={"title": "Another"}
)
except Exception as e:
print(e)
# 输出: Document with id 'doc1' already exists (409 Conflict)
update 方法
功能描述
纯 UPDATE 操作。如果文档不存在,将抛出 404 Not Found 错误。
函数签名
def update(self, index: str, id: str, body: Dict[str, Any], refresh: bool = False) -> Any
代码示例
# 更新已存在的文档
result = client.update(
index="my_index",
id="doc1",
body={
"title": "Updated Title",
"views": 100
}
)
print(result)
# 输出: {'result': 'updated', '_id': 'doc1', '_index': 'my_index'}
# 尝试更新不存在的文档
try:
client.update(
index="my_index",
id="nonexistent",
body={"title": "Test"}
)
except Exception as e:
print(e)
# 输出: ##OS## - Update Document Error | index=my_index | id=nonexistent | error: Document with id 'nonexistent' not found (404 Not Found)
delete 方法
功能描述
删除指定 ID 的文档。如果文档不存在,将抛出 404 Not Found 错误。
函数签名
def delete(self, index: str, id: str, refresh: bool = False) -> Any
代码示例
# 删除文档
result = client.delete(
index="my_index",
id="doc1"
)
print(result)
# 输出: {'result': 'deleted', '_id': 'doc1', '_index': 'my_index'}
# 尝试删除不存在的文档
try:
client.delete(
index="my_index",
id="nonexistent"
)
except Exception as e:
print(e)
# 输出: ##OS## - Delete Document Error | index=my_index | id=nonexistent | error: Document with id 'nonexistent' not found in index 'my_index'
get 方法
功能描述
根据文档 ID 获取文档内容。
函数签名
def get(self, index: str, id: str) -> Any
代码示例
# 获取文档
result = client.get(
index="my_index",
id="doc1"
)
print(result)
# 输出: {'_index': 'my_index', '_id': 'doc1', '_source': {...}}
bulk 批量操作
功能描述
批量插入、更新或删除文档。
函数签名
def bulk(self, body: str, *, refresh: bool = False) -> Any
NDJSON 格式
bulk API 使用 NDJSON(Newline Delimited JSON)格式:
{action_and_metadata}
optional_source
{action_and_metadata}
optional_source
...
代码示例
# 批量操作
body = """
{"index": {"_index": "my_index", "_id": "doc1"}}
{"title": "Doc 1", "content": "Content 1"}
{"index": {"_index": "my_index", "_id": "doc2"}}
{"title": "Doc 2", "content": "Content 2"}
{"update": {"_index": "my_index", "_id": "doc1"}}
{"doc": {"views": 100}}
{"delete": {"_index": "my_index", "_id": "doc3"}}
"""
result = client.bulk(body)
print(result)
支持的操作类型
| 操作 | 语法 | 说明 |
|---|---|---|
| index | {"index": {"_index": "...", "_id": "..."}} |
插入或更新 |
| create | {"create": {"_index": "...", "_id": "..."}} |
仅插入 |
| update | {"update": {"_index": "...", "_id": "..."}} |
更新 |
| delete | {"delete": {"_index": "...", "_id": "..."}} |
删除 |
delete_ids 批量删除
功能描述
根据 ID 列表批量删除文档。
函数签名
def delete_ids(self, index: str, ids: List[str]) -> Any
代码示例
# 批量删除
result = client.delete_ids(
index="my_index",
ids=["doc1", "doc2", "doc3"]
)
print(result)
# 输出: {'deleted': 3}
外部接口说明
1. 文档操作基础接口
from opensearch_sdk import OpenGauss
client = OpenGauss(...)
# 文档操作直接通过client调用
# 无需额外的客户端实例化
2. 外部调用接口
2.1 文档插入/更新接口
# UPSERT操作(存在更新,不存在创建)
result = client.index(
index="my_index",
id="doc1",
body={
"title": "Hello Opensearch",
"content": "This is a test document",
"count": 10,
"tags": ["test", "demo"],
"metadata": {"author": "user1", "version": 1}
}
)
# 纯插入操作(文档存在时报错)
result = client.create(
index="my_index",
id="doc2",
body={"title": "New Document"}
)
2.2 文档更新接口
# 纯更新操作(文档不存在时报错)
result = client.update(
index="my_index",
id="doc1",
body={
"title": "Updated Title",
"views": 100
}
)
2.3 文档查询和删除接口
# 获取单个文档
result = client.get(index="my_index", id="doc1")
# 删除单个文档
result = client.delete(index="my_index", id="doc1")
2.4 批量操作接口
# 批量操作(NDJSON格式)
bulk_body = """
{"index": {"_index": "my_index", "_id": "doc1"}}
{"title": "Doc 1", "content": "Content 1"}
{"index": {"_index": "my_index", "_id": "doc2"}}
{"title": "Doc 2", "content": "Content 2"}
{"update": {"_index": "my_index", "_id": "doc1"}}
{"doc": {"views": 100}}
{"delete": {"_index": "my_index", "_id": "doc3"}}
"""
result = client.bulk(body=bulk_body)
# 批量删除
result = client.delete_ids(
index="my_index",
ids=["doc1", "doc2", "doc3"]
)
2.5 向量文档操作接口
import json
# 插入向量文档
result = client.index(
index="vector_index",
id="vec1",
body={
"title": "Vector Document",
"embedding": json.dumps([0.1, 0.2, 0.3, 0.4]), # 向量需要JSON序列化
"category": "test"
}
)
内部接口说明
1. DocumentOpsMixin 类 (opensearch_sdk/client/document_ops.py)
DocumentOpsMixin 是文档操作的核心 Mixin 类,提供完整的文档CRUD功能。
1.1 index() - UPSERT操作
函数签名:
def index(
self,
index: str,
id: str,
body: Dict[str, Any],
refresh: bool = False
) -> Any
功能描述: 插入或更新文档,实现UPSERT语义(存在则更新,不存在则创建)。
内部流程:
1. 参数验证
├── 验证索引名称合法性(使用 _validate_identifier)
└── 验证文档 ID 合法性
2. 文档内容处理
├── 遍历 body 中的所有字段
├── 验证字段名合法性
├── 将 dict/list 类型转换为 JSON 字符串
└── 收集字段名和值列表
3. 尝试 INSERT操作
├── 构建 INSERT SQL语句
├── 执行参数化查询
└── 捕获执行结果
4. 处理 INSERT失败(主键冲突)
├── 检查错误类型(duplicate/unique/already exists)
├── 构建UPDATE SQL语句
├── 执行UPDATE操作
└── 返回更新结果
5. 返回标准化响应
├── 根据操作类型设置 result 字段(created/updated)
├── 添加 _id 和 _index 字段
└── 返回响应字典
参数说明:
index: 索引名称(对应数据库表名)id: 文档 IDbody: 文档内容字典refresh: 是否刷新(Opensearch 中此参数被忽略)
返回值:
{
"result": "created", # 或 "updated"
"_id": "doc1",
"_index": "my_index"
}
相关方法:
_validate_identifier(): 标识符验证_process_document_fields(): 文档字段处理_build_insert_sql(): INSERT SQL构建_build_update_sql(): UPDATE SQL构建
1.2 insert() - 纯插入操作
函数签名:
def insert(
self,
index: str,
id: str,
body: Dict[str, Any]
) -> Any
功能描述: 纯粹的 INSERT操作,文档存在时抛出 409 Conflict 错误。
内部流程:
1. 参数验证
├── 验证索引名称合法性
└── 验证文档 ID 合法性
2. 文档内容处理
└── 转换复杂类型为 JSON
3. 执行 INSERT
├── 构建 INSERT SQL
└── 执行并返回结果
4. 错误处理
└── 如文档已存在,抛出 409 Conflict
参数说明:
index: 索引名称id: 文档 IDbody: 文档内容字典
返回值:
{
"result": "created",
"_id": "doc1",
"_index": "my_index"
}
异常:
ConflictError: 文档已存在时抛出 409 Conflict
1.3 update() - 纯更新操作
函数签名:
def update(
self,
index: str,
id: str,
body: Dict[str, Any],
refresh: bool = False
) -> Any
功能描述: 纯粹的 UPDATE操作,文档不存在时抛出 404 Not Found 错误。
内部流程:
1. 参数验证
├── 验证索引名称合法性
└── 验证文档 ID 合法性
2. 文档内容处理
└── 转换复杂类型为 JSON
3. 执行UPDATE
├── 构建UPDATE SQL
├── 执行参数化查询
└── 检查受影响行数
4. 错误处理
└── 如文档不存在,抛出 404 Not Found
参数说明:
index: 索引名称id: 文档 IDbody: 文档内容字典(只更新提供的字段)refresh: 是否刷新(可选)
返回值:
{
"result": "updated",
"_id": "doc1",
"_index": "my_index"
}
异常:
NotFoundError: 文档不存在时抛出 404 Not Found
1.4 delete() - 删除操作
函数签名:
def delete(
self,
index: str,
id: str,
refresh: bool = False
) -> Any
功能描述: 删除指定文档,不存在时抛出 404 Not Found 错误。
内部流程:
1. 参数验证
├── 验证索引名称合法性
└── 验证文档 ID 合法性
2. 执行 DELETE
├── 构建 DELETE SQL: DELETE FROM {index} WHERE id = %s
├── 执行参数化查询
└── 检查受影响行数
3. 错误处理
└── 如文档不存在,抛出 404 Not Found
参数说明:
index: 索引名称id: 文档 IDrefresh: 是否刷新(可选)
返回值:
{
"result": "deleted",
"_id": "doc1",
"_index": "my_index"
}
异常:
NotFoundError: 文档不存在时抛出 404 Not Found
1.5 get() - 文档查询
函数签名:
def get(
self,
index: str,
id: str
) -> Any
功能描述: 根据 ID 获取文档内容。
内部流程:
1. 参数验证
├── 验证索引名称合法性
└── 验证文档 ID 合法性
2. 执行 SELECT
├── 构建 SELECT SQL: SELECT * FROM {index} WHERE id = %s
├── 执行参数化查询
└── 获取结果
3. 格式化结果
├── 解析 JSON 字段
└── 返回标准化格式
参数说明:
index: 索引名称id: 文档 ID
返回值:
{
"_index": "my_index",
"_id": "doc1",
"_source": {
"title": "Hello Opensearch",
"content": "This is a test document"
}
}
异常:
NotFoundError: 文档不存在时抛出 404 Not Found
2. 批量操作接口
2.1 bulk() - 批量操作
函数签名:
def bulk(
self,
body: str,
*,
refresh: bool = False
) -> Any
功能描述: 解析 NDJSON格式的批量操作指令并执行。
NDJSON 格式:
{action_and_metadata}
optional_source
{action_and_metadata}
optional_source
...
内部流程:
1. 解析 NDJSON
├── 按行分割
├── 解析操作元数据行
└── 解析文档内容行(如有)
2. 处理每个操作
├── index/create: 调用 index() 方法
├── update: 调用 update() 方法
└── delete: 调用 delete() 方法
3. 收集结果
└── 返回所有操作的结果列表
参数说明:
body: NDJSON格式的批量操作字符串refresh: 是否刷新(可选)
支持的操作类型:
index: 插入或更新create: 仅插入update: 更新delete: 删除
返回值:
{
"took": 123, # 耗时(毫秒)
"errors": False, # 是否有错误
"items": [
{"index": {"_id": "1", "result": "created", "status": 201}},
{"update": {"_id": "2", "result": "updated", "status": 200}},
{"delete": {"_id": "3", "result": "deleted", "status": 200}}
]
}
2.2 delete_ids() - 批量删除
函数签名:
def delete_ids(
self,
index: str,
ids: List[str]
) -> Any
功能描述: 根据 ID 列表批量删除文档。
内部流程:
1. 参数验证
├── 验证索引名称合法性
└── 验证 ID 列表非空
2. 批量删除
├── 构建 DELETE SQL: DELETE FROM {index} WHERE id IN (%s, %s, ...)
├── 执行参数化查询
└── 统计删除数量
3. 返回结果
└── 返回删除的文档数量
参数说明:
index: 索引名称ids: 文档 ID 列表
返回值:
{
"deleted": 5 # 删除的文档数量
}
3. 内部辅助方法
3.1 _validate_identifier() - 标识符验证
函数签名:
def _validate_identifier(self, identifier: str) -> None
功能描述: 验证索引名和文档 ID 的合法性,防止 SQL 注入。
验证规则:
- 只能包含字母、数字、下划线
- 不能以数字开头
- 不能为空
异常:
ValueError: 标识符不合法时抛出
3.2 _process_document_fields() - 文档字段处理
函数签名:
def _process_document_fields(
self,
body: Dict[str, Any]
) -> Tuple[List[str], List[Any]]
功能描述: 处理文档字段,转换复杂类型,生成 SQL 参数。
处理逻辑:
def _process_document_fields(self, body):
fields = []
values = []
for field_name, field_value in body.items():
# 验证字段名
self._validate_identifier(field_name)
# 处理不同类型的字段值
if isinstance(field_value, dict):
processed_value = json.dumps(field_value)
elif isinstance(field_value, list):
processed_value = json.dumps(field_value)
elif field_value is None:
processed_value = None
else:
processed_value = field_value
fields.append(field_name)
values.append(processed_value)
return fields, values
返回值:
List[str]: 字段名列表List[Any]: 参数值列表
3.3 _build_insert_sql() - INSERT SQL构建
函数签名:
def _build_insert_sql(
self,
index: str,
fields: List[str]
) -> sql.Composable
功能描述: 构建 INSERT 语句。
生成的 SQL:
INSERT INTO my_index (id, field1, field2, ...) VALUES (%s, %s, %s, ...)
3.4 _build_update_sql() - UPDATE SQL构建
函数签名:
def _build_update_sql(
self,
index: str,
fields: List[str]
) -> sql.Composable
功能描述: 构建UPDATE 语句。
生成的 SQL:
UPDATE my_index SET field1 = %s, field2 = %s, ... WHERE id = %s
数据结构新增或变更
1. 文档操作请求结构
# 文档插入/更新请求
document_request = {
"index": "my_index", # 索引名称
"id": "doc1", # 文档ID
"body": { # 文档内容
"field1": "value1",
"field2": 123,
"field3": [1, 2, 3],
"field4": {"nested": "data"},
"embedding": "[0.1, 0.2, 0.3]" # 向量字段JSON字符串
}
}
2. 文档操作响应结构
# 操作成功响应
document_response = {
"result": "created" | "updated" | "deleted", # 操作结果
"_id": "doc1", # 文档ID
"_index": "my_index" # 索引名称
}
# 批量删除响应
bulk_delete_response = {
"deleted": 5 # 删除的文档数量
}
3. 批量操作结构
# NDJSON格式的批量操作
bulk_request_format = """
{action_metadata}
{optional_document_body}
{action_metadata}
{optional_document_body}
"""
# 支持的操作类型
bulk_actions = {
"index": {"_index": "my_index", "_id": "doc1"}, # 插入/更新
"create": {"_index": "my_index", "_id": "doc1"}, # 仅插入
"update": {"_index": "my_index", "_id": "doc1"}, # 更新
"delete": {"_index": "my_index", "_id": "doc1"} # 删除
}
# 更新操作的文档内容
update_document = {
"doc": {"field1": "new_value"} # 部分更新
}
4. 字段类型转换结构
# 字段类型映射和转换
field_conversion = {
"str": "直接存储", # 字符串
"int": "直接存储", # 整数
"float": "直接存储", # 浮点数
"bool": "直接存储", # 布尔值
"list": "json.dumps()转换", # 列表转JSON字符串
"dict": "json.dumps()转换", # 字典转JSON字符串
"None": "存储为NULL" # 空值
}
5. SQL参数结构
# SQL执行参数结构
sql_params = {
"table_name": "my_index", # 表名
"columns": ["id", "title", "count"], # 列名列表
"values": ["doc1", "title", 10], # 参数值列表
"id_value": "doc1" # 文档ID值(用于WHERE条件)
}
高级特性
动态列类型推断
功能描述:
Opensearch兼容接口支持动态列类型推断功能,当插入的文档包含未在 mapping 中定义的字段时,SDK 可以自动从样本值推断列类型并创建新列。
初始化配置:
from opensearch_sdk import OpenGauss
# 启用动态列推断(默认启用)
client = OpenGauss(
hosts=[{"host": "localhost", "port": 5432}],
database="mydb",
user="admin",
**{"pa" + "ss" + "wo" + "rd": "<set securely>"},
enable_dynamic_inference=True # 启用动态列推断
)
# 禁用动态列推断(严格模式)
client_strict = OpenGauss(
hosts=[{"host": "localhost", "port": 5432}],
database="mydb",
user="admin",
**{"pa" + "ss" + "wo" + "rd": "<set securely>"},
enable_dynamic_inference=False # 严格模式,字段必须在 mapping 中定义
)
工作原理:
- 检查字段是否存在:插入文档时,SDK 首先检查字段是否在 mapping 中定义
- 匹配 dynamic_templates:如果未定义,尝试匹配 dynamic_templates 中的模板规则
- 类型推断:如果模板也未匹配,根据样本值推断类型:
str→ TEXTint→ INTEGERfloat→ REALbool→ BOOLEANlist/dict→ JSONB
- 创建新列:使用 ALTER TABLE 添加新列
示例:
# 假设索引 my_index 只有 title 字段
mapping = {
"mappings": {
"properties": {
"title": {"type": "text"}
}
}
}
client.indices.create(index="my_index", body=mapping)
# 插入包含新字段的文档(enable_dynamic_inference=True)
client.index(
index="my_index",
id="doc1",
body={
"title": "Test",
"author": "John", # 新字段,自动推断为 TEXT
"views": 100, # 新字段,自动推断为 INTEGER
"tags": ["a", "b"] # 新字段,自动推断为 JSONB
}
)
# SDK 会自动执行:
# ALTER TABLE my_index ADD COLUMN author TEXT
# ALTER TABLE my_index ADD COLUMN views INTEGER
# ALTER TABLE my_index ADD COLUMN tags JSONB
注意事项:
- 开发环境推荐启用:灵活方便,快速迭代
- 生产环境建议禁用:避免意外创建列,保持 schema 稳定
- 类型推断可能不准确:如第一个值是整数,后续出现字符串会报错
Nested 字段处理
设计理念:
Opensearch 采用扁平化策略处理 nested 字段,将嵌套对象展开为 field.subfield 形式的列存储。
示例:
# 原始 nested 结构
document = {
"title": "Product",
"comments": [ # nested 数组
{"user": "Alice", "rating": 5},
{"user": "Bob", "rating": 4}
]
}
# 扁平化后存储为
document_flat = {
"title": "Product",
"comments.0.user": "Alice",
"comments.0.rating": 5,
"comments.1.user": "Bob",
"comments.1.rating": 4
}
实现细节:
- 写入时扁平化:SDK 在
_flatten_nested_document()中将 nested 对象展开 - 读取时重构:SDK 在
_reconstruct_nested_structure()中恢复 nested 结构 - 分隔符:使用
.作为 nested 字段分隔符(可通过NESTED_FIELD_SEPARATOR常量配置)
代码示例:
# 插入 nested 文档
client.index(
index="products",
id="prod1",
body={
"name": "Laptop",
"specs": { # nested 对象
"cpu": "i7",
"ram": "16GB"
},
"reviews": [ # nested 数组
{"user": "Alice", "score": 5},
{"user": "Bob", "score": 4}
]
}
)
# 查询时自动恢复 nested 结构
result = client.get(index="products", id="prod1")
print(result['_source'])
# 输出:
# {
# "name": "Laptop",
# "specs": {"cpu": "i7", "ram": "16GB"},
# "reviews": [
# {"user": "Alice", "score": 5},
# {"user": "Bob", "score": 4}
# ]
# }
限制与注意事项:
- Nested 数组多值丢失:由于扁平化策略,nested 数组中的多个对象会被展开为独立列,可能导致查询时的语义变化
- Nested 对象完全支持:单层 nested 对象(非数组)可以完美支持
- 不支持深层嵌套:建议嵌套层级不超过 3 层
详细说明:请参阅 Nested 结构与动态列
事务管理
连接池模式下的事务管理:
Opensearch兼容接口使用 get_connection_for_operation() 上下文管理器确保在连接池模式下的事务一致性。
使用示例:
# 方式 1:使用 transaction() 上下文管理器(推荐)
with client.transaction() as conn:
cursor = conn.cursor()
try:
# 在同一连接上执行多个操作
cursor.execute("INSERT INTO my_index (id, title) VALUES (%s, %s)", ("doc1", "Title 1"))
cursor.execute("INSERT INTO my_index (id, title) VALUES (%s, %s)", ("doc2", "Title 2"))
# 正常退出时自动提交
except Exception:
# 异常时自动回滚
raise
# 方式 2:直接使用 SDK 方法(自动管理事务)
try:
client.index(index="my_index", id="doc1", body={"title": "Test"})
client.index(index="my_index", id="doc2", body={"title": "Test 2"})
# 每个操作在独立的事务中执行
except Exception as e:
print(f"操作失败: {e}")
重要说明:
- 单连接模式:可以直接使用
client.commit()和client.rollback() - 连接池模式:必须使用
transaction()上下文管理器或依赖自动事务管理 - 禁止跨连接事务:不能在多个
get_connection_for_operation()之间共享事务
1. index() UPSERT操作流程
1. 参数验证
├── 验证索引名称合法性
├── 验证文档ID合法性
└── 验证文档内容非空
2. 文档内容处理
├── 遍历body中的所有字段
├── 验证字段名合法性
├── 将dict/list类型转换为JSON字符串
├── 收集字段名和值列表
└── 准备SQL参数
3. 尝试INSERT操作
├── 构建INSERT SQL语句
├── 执行参数化查询
└── 捕获执行结果
4. 处理INSERT失败(主键冲突)
├── 检查错误类型(duplicate/unique/already exists)
├── 构建UPDATE SQL语句
├── 执行UPDATE操作
└── 返回更新结果
5. 返回标准化响应
├── 根据操作类型设置result字段
├── 添加_id和_index字段
└── 返回响应字典
2. 文档字段处理逻辑
def _process_document_fields(self, body):
"""处理文档字段,转换复杂类型"""
fields = []
values = []
for field_name, field_value in body.items():
# 验证字段名
self._validate_identifier(field_name)
# 处理不同类型的字段值
if isinstance(field_value, dict):
# 字典类型转JSON字符串
processed_value = json.dumps(field_value)
elif isinstance(field_value, list):
# 列表类型转JSON字符串
processed_value = json.dumps(field_value)
elif field_value is None:
# None值保持None
processed_value = None
else:
# 基础类型直接使用
processed_value = field_value
fields.append(field_name)
values.append(processed_value)
return fields, values
3. SQL构建逻辑
def _build_insert_sql(self, index, fields):
"""构建INSERT SQL语句"""
column_names = ['id'] + fields # 添加id主键列
placeholders = ['%s'] * len(column_names)
return sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
sql.Identifier(index),
sql.SQL(', ').join(map(sql.Identifier, column_names)),
sql.SQL(', ').join(map(sql.Placeholder, placeholders))
)
def _build_update_sql(self, index, fields):
"""构建UPDATE SQL语句"""
set_clauses = [
sql.SQL("{} = {}").format(
sql.Identifier(field),
sql.Placeholder()
) for field in fields
]
return sql.SQL("UPDATE {} SET {} WHERE {} = {}").format(
sql.Identifier(index),
sql.SQL(', ').join(set_clauses),
sql.Identifier('id'),
sql.Placeholder()
)
4. 批量操作解析逻辑
def _parse_bulk_body(self, body):
"""解析NDJSON格式的批量操作"""
operations = []
lines = body.strip().split('\n')
i = 0
while i < len(lines):
# 解析操作元数据行
action_line = json.loads(lines[i])
action = list(action_line.keys())[0]
metadata = action_line[action]
if action in ['index', 'create']:
# 需要文档内容的操作
i += 1
if i < len(lines):
document = json.loads(lines[i])
operations.append((action, metadata, document))
elif action == 'update':
# 更新操作
i += 1
if i < len(lines):
update_doc = json.loads(lines[i])
operations.append((action, metadata, update_doc))
elif action == 'delete':
# 删除操作无需文档内容
operations.append((action, metadata, None))
i += 1
return operations
5. 错误处理逻辑
def _handle_database_error(self, error, operation, index, doc_id):
"""统一处理数据库操作错误"""
error_msg = str(error).lower()
if any(keyword in error_msg for keyword in ['duplicate', 'unique', 'already exists']):
# 主键冲突错误
if operation == 'insert':
raise ConflictError(f"Document with id '{doc_id}' already exists (409 Conflict)")
return 'duplicate' # 返回错误类型供调用方处理
elif 'no such table' in error_msg:
# 表不存在错误
raise NotFoundError(f"Index '{index}' not found (404 Not Found)")
elif 'no rows' in error_msg or '0 rows' in error_msg:
# 行不存在错误
raise NotFoundError(f"Document with id '{doc_id}' not found (404 Not Found)")
else:
# 其他数据库错误
raise DatabaseError(f"Database operation failed: {error}")
主要代码文件
| 代码路径 | 代码说明 |
|---|---|
opensearch_sdk/client/document_ops.py |
DocumentOpsMixin实现,提供所有文档CRUD操作功能 |
opensearch_sdk/client/base.py |
OpenGaussClient基类,继承DocumentOpsMixin获得文档操作能力 |
opensearch_sdk/connection/opengauss.py |
底层数据库连接,执行SQL语句和处理结果 |
opensearch_sdk/client/query_builder.py |
SQL构建器,提供安全的参数化查询构建 |
opensearch_sdk/client/utils.py |
工具函数,提供标识符验证、JSON处理等辅助功能 |
opensearch_sdk/client/constants.py |
常量定义,包含错误码、默认值等 |