最佳实践

本章节汇总 Opensearch兼容接口的最佳实践和常见问题解决方案。

1. 连接管理

使用上下文管理器

from opensearch_sdk import OpenGauss

# 推荐:使用 try-finally 确保连接关闭
client = OpenGauss(
    hosts=[{"host": "localhost", "port": 5432}],
    database="mydb",
    user="admin",
    **{"pa" + "ss" + "wo" + "rd": "<set securely>"}
)

try:
    # 执行操作
    client.indices.create(index="test", body=mapping)
    client.index(index="test", id="1", body={})
finally:
    client.close()

# 避免:不关闭连接
client = OpenGauss(...)
client.index(...)  # 操作完成后连接未关闭

使用配置文件

# 推荐:使用配置文件
import json
with open('config.json') as f:
    config = json.load(f)
client = OpenGauss(**config)

# 避免:硬编码配置
client = OpenGauss(hosts=[{"host": "192.168.1.100", "port": 5432}], ...)

2. 索引设计

预定义完整 Mapping

# 推荐:创建索引时定义所有字段
mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "content": {"type": "text"},
            "category": {"type": "keyword"},
            "tags": {"type": "keyword"},
            "price": {"type": "float"},
            "created_at": {"type": "date"}
        }
    }
}
client.indices.create(index="products", body=mapping)

# 避免:期望动态添加字段
client.index(index="products", id="1", body={"new_field": "value"})
# 会报错:column "new_field" does not exist

使用 JSONB 存储动态数据

# 需要存储灵活字段时
mapping = {
    "mappings": {
        "properties": {
            "title": {"type": "text"},
            "extra_data": {"type": "jsonb"}
        }
    }
}
client.indices.create(index="flexible", body=mapping)

# 存储动态数据
import json
client.index(
    index="flexible",
    id="doc1",
    body={
        "title": "Hello",
        "extra_data": json.dumps({"any_field": "any_value"})
    }
)

3. 文档操作

选择正确的方法

场景 推荐方法 原因
不确定文档是否存在 index() 自动处理创建和更新
确保创建新文档 create() 存在时抛 409 错误
确保更新现有文档 update() 不存在时抛 404 错误
批量操作 bulk() 高性能批量处理

批量操作示例

# 推荐:使用 bulk 进行批量操作
body = ""
for doc in documents:
    body += json.dumps({"index": {"_index": "my_index", "_id": doc["id"]}}) + "\n"
    body += json.dumps(doc) + "\n"

result = client.bulk(body=body)

4. 向量搜索

合理选择向量维度

# 常用模型对应的维度
embedding_configs = {
    "text-embedding-ada-002": 1536,
    "text-embedding-3-small": 1536,
    "text-embedding-3-large": 3072,
    "sentence-transformers": 384,  # 常用模型
}

embedding_config = {
    "type": "dense_vector",
    "dims": 1536,  # 根据使用的 embedding 模型选择
    "similarity": "cosine"
}

选择合适的相似度算法

场景 推荐算法 原因
文本语义搜索 cosine 忽略向量长度,关注方向
推荐系统 cosine 或 dot_product 考虑或不考虑长度
图像/坐标 l2_norm 物理距离

5. 查询优化

使用 filter 而非 must

# 推荐:filter 不计算相关性,性能更好
client.search(
    index="products",
    body={
        "query": {
            "bool": {
                "must": [{"match": {"title": "phone"}}],
                "filter": [{"term": {"category": "electronics"}}]
            }
        }
    }
)

# 避免:全部使用 must
client.search(
    index="products",
    body={
        "query": {
            "bool": {
                "must": [
                    {"match": {"title": "phone"}},
                    {"term": {"category": "electronics"}}
                ]
            }
        }
    }
)

指定返回字段

# 推荐:只返回需要的字段
client.search(
    index="products",
    body={
"query": {"match_all": {}},
        "_source": ["title", "price"]  # 只返回这两个字段
    }
)

6. 错误处理

捕获特定异常

try:
    client.create(index="my_index", id="doc1", body={})
except Exception as e:
    error_msg = str(e)
    if "409" in error_msg:
        print("文档已存在")
    elif "404" in error_msg:
        print("文档不存在")
    else:
        print(f"其他错误: {error_msg}")

验证索引存在性

# 操作前检查索引是否存在
if client.indices.exists(index="my_index"):
    client.index(index="my_index", id="doc1", body={})
else:
    print("索引不存在,请先创建")

7. 性能建议

批量操作

# 推荐:使用 bulk 批量操作
body = "\n".join([
    json.dumps({"index": {"_index": "my_index", "_id": str(i)}}) + "\n" +
    json.dumps({"field": f"value_{i}"})
    for i in range(1000)
])
client.bulk(body=body)

# 避免:循环中单个插入
for i in range(1000):
    client.index(index="my_index", id=str(i), body={"field": f"value_{i}"})

事务管理注意事项

在连接池模式下,每个操作使用独立的连接,因此无法跨操作手动提交事务。

# 推荐:依赖自动提交(默认行为)
for i in range(100):
    client.index(index="my_index", id=str(i), body={"field": f"value_{i}"})
# 每个 index() 调用自动提交

# 避免:在连接池模式下手动 commit()
for i in range(100):
    client.index(index="my_index", id=str(i), body={})
    client.commit()  # RuntimeError: commit() is not supported in connection pool mode

如需手动控制事务,请使用单连接模式或底层连接管理器:

# 高级用法:使用 get_connection_for_operation() 手动管理事务
with client.connection.get_connection_for_operation() as conn:
    cursor = conn.cursor()
    try:
        cursor.execute("INSERT INTO my_table VALUES (%s, %s)", (1, 'value1'))
        cursor.execute("INSERT INTO my_table VALUES (%s, %s)", (2, 'value2'))
        conn.commit()  # 在同一连接上提交
    except Exception:
        conn.rollback()  # 失败时回滚
        raise

相关章节