import argparse
import asyncio
import getpass
import json
import logging
import os
import psycopg2
import ssl
from bs4 import BeautifulSoup
from urllib import request, error
from psycopg2 import Error, sql, connect
from typing import Optional, List, Any
from mcp.server import Server
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.server import TransportSecuritySettings
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("openGauss_mcp_server")
class SecureCredentialCache:
_instance = None
_db_password: Optional[str] = None
_ssl_keyfile_password: Optional[str] = None
_initialized: bool = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_db_password(self) -> str:
if self._db_password is None:
env_password = os.getenv("OPENGAUSS_PASSWORD")
if env_password:
self._db_password = env_password
logger.info("Database password loaded from environment variable")
else:
self._db_password = getpass.getpass("Enter database password: ")
logger.info("Database password obtained from interactive input")
return self._db_password
def get_ssl_keyfile_password(self) -> Optional[str]:
if self._ssl_keyfile_password is None:
env_ssl_password = os.getenv("SSL_KEYFILE_PASSWORD")
if env_ssl_password:
self._ssl_keyfile_password = env_ssl_password
logger.info("SSL keyfile password loaded from environment variable")
else:
enable_https = os.getenv("ENABLE_HTTPS", "false").lower() in ("true", "1", "yes", "on")
ssl_keyfile = os.getenv("SSL_KEYFILE")
ssl_certfile = os.getenv("SSL_CERTFILE")
if enable_https and ssl_keyfile and ssl_certfile:
self._ssl_keyfile_password = getpass.getpass("Enter SSL private key password (press Enter if none): ")
if self._ssl_keyfile_password == "":
self._ssl_keyfile_password = None
logger.info("SSL keyfile password obtained from interactive input")
return self._ssl_keyfile_password
def clear_cache(self):
self._db_password = None
self._ssl_keyfile_password = None
self._initialized = False
logger.info("Credential cache cleared")
credential_cache = SecureCredentialCache()
def get_db_config():
"""Get database configuration from environment variables."""
config = {
"host": os.getenv("OPENGAUSS_HOST", "localhost"),
"port": int(os.getenv("OPENGAUSS_PORT", "5432")),
"user": os.getenv("OPENGAUSS_USER"),
"password": credential_cache.get_db_password(),
"dbname": os.getenv("OPENGAUSS_DBNAME"),
}
if not all([config["user"], config["dbname"]]):
raise ValueError("Missing required database configuration (OPENGAUSS_USER, OPENGAUSS_DBNAME)")
return config
app = FastMCP(
name="openGauss_mcp_server",
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=False,
allowed_hosts=["*"],
allowed_origins=["*"]
)
)
@app.tool()
async def list_resources() -> list[Resource]:
"""List openGauss tables as resources."""
config = get_db_config()
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname = 'public'")
tables = cursor.fetchall()
logger.info(f"Found tables: {tables}")
resources = []
for table in tables:
resources.append(
Resource(
uri=f"opengauss://{table[0]}/data",
name=f"Table: {table[0]}",
mimeType="text/plain",
description=f"Data in table: {table[0]}"
)
)
return resources
except Error as e:
logger.error(f"Failed to list resources: {str(e)}")
return []
@app.tool()
async def read_resource(uri: AnyUrl) -> str:
"""Read table contents."""
config = get_db_config()
uri_str = str(uri)
logger.info(f"Reading resource: {uri_str}")
if not uri_str.lower().startswith("opengauss://"):
raise ValueError(f"Invalid URI scheme: {uri_str}")
parts = uri_str[12:].split('/')
table = parts[0]
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table} LIMIT 100")
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return "\n".join([",".join(columns)] + result)
except Error as e:
logger.error(f"Database error reading resource {uri}: {str(e)}")
raise RuntimeError(f"Database error: {str(e)}")
@app.tool()
async def list_tools() -> list[Tool]:
"""List available openGauss tools."""
logger.info("Listing tools...")
tools = [
Tool(
name="execute_sql",
description="Execute an SQL query on the openGauss server",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute"
}
},
"required": ["query"]
}
),
Tool(
name="search_opengauss_document",
description="Search OpenGauss documentation with keywords",
inputSchema={
"type": "object",
"properties": {
"keyword": {
"title": "Keyword",
"type": "string"
}
},
"required": ["keyword"],
"title": "search_opengauss_documentArguments"
}
),
Tool(
name="hybrid_search",
description="Hybrid search combining BM25 full-text search, vector similarity search, and scalar filtering. Performs weighted fusion of text and vector search results for more accurate and comprehensive search results. Steps: 1) Full-text search with BM25 scoring, 2) Vector similarity search, 3) Weighted fusion and ranking. Note: hybrid_score, vector_norm, bm25_norm are computed fields, not database columns.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"title": "Table Name",
"type": "string"
},
"full_text_search_column_name": {
"items": {
"type": "string"
},
"title": "Full Text Search Column Name",
"type": "array"
},
"keyword": {
"title": "Keyword",
"type": "string"
},
"vector_data": {
"title": "Vector Data",
"type": "string"
},
"vec_column_name": {
"default": "vector",
"title": "Vec Column Name",
"type": "string"
},
"distance_func": {
"default": "cosine",
"title": "Distance Func",
"type": "string"
},
"other_where_clause": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": None,
"title": "Other Where Clause"
},
"limit": {
"default": 5,
"title": "Limit",
"type": "integer"
},
"output_column_name": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": None,
"title": "Output Column Name"
},
"text_weight": {
"default": 0.4,
"title": "Text Weight",
"type": "number"
},
"vector_weight": {
"default": 0.6,
"title": "Vector Weight",
"type": "number"
}
},
"required": [
"table_name",
"full_text_search_column_name",
"keyword",
"vector_data"
],
"title": "hybrid_searchArguments"
}
),
Tool(
name="multi_vector_search",
description="Perform concurrent vector similarity search with multiple query vectors. Optimizes performance by processing multiple vector queries in parallel using database connection pooling. Ideal for batch vector search operations and multi-vector database lookups. Returns combined results from all query vectors with configurable parallelism.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"title": "Table Name",
"type": "string"
},
"vectors": {
"items": {
"items": {
"type": "number"
},
"type": "array"
},
"title": "Vectors",
"type": "array"
},
"vector_field": {
"title": "Vector Field",
"type": "string"
},
"limit": {
"default": 5,
"title": "Limit",
"type": "integer"
},
"output_fields": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": None,
"title": "Output Fields"
},
"metric_type": {
"default": "cosine",
"title": "Metric Type",
"type": "string"
},
"filter_expr": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": None,
"title": "Filter Expr"
},
"search_params": {
"anyOf": [
{
"additionalProperties": True,
"type": "object"
},
{
"type": "null"
}
],
"default": None,
"title": "Search Params"
},
"parallel_workers": {
"default": 2,
"title": "Parallel Workers",
"type": "integer"
}
},
"required": ["table_name", "vectors", "vector_field"],
"title": "multi_vector_searchArguments"
}
),
Tool(
name="create_vector_index",
description="Create a vector index on a specified table and column for efficient vector similarity search. Supports HNSW and other index types with customizable distance metrics.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"title": "Table Name",
"type": "string",
"description": "The name of the table on which to create the vector index"
},
"column_name": {
"title": "Column Name",
"type": "string",
"description": "The name of the vector column to index"
},
"index_type": {
"title": "Index Type",
"type": "string",
"default": "hnsw",
"description": "The type of vector index to create (e.g., 'hnsw', 'ivfflat', 'diskann')"
},
"distance_ops": {
"title": "Distance Operations",
"type": "string",
"default": "vector_cosine_ops",
"description": "The distance metric operator (e.g., 'vector_cosine_ops', 'vector_l2_ops', 'vector_ip_ops', 'vector_l1_ops')"
},
"options": {
"title": "Index Options",
"type": "object",
"default": None,
"description": "Additional index options as key-value pairs (e.g., {'M': 16, 'ef_construction': 64} for HNSW)",
"additionalProperties": True
}
},
"required": ["table_name", "column_name"],
"title": "create_vector_indexArguments"
}
),
Tool(
name="vector_search",
description="Perform vector similarity search on an OpenGauss table to find the most similar vectors to a query vector. Supports different distance metrics and filtering conditions.",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"title": "Table Name",
"type": "string",
"description": "The name of the table to search"
},
"vector_data": {
"title": "Vector Data",
"type": "string",
"description": "The query vector data (JSON string format)"
},
"vec_column_name": {
"title": "Vector Column Name",
"type": "string",
"default": "vector",
"description": "The name of the column containing vectors to search"
},
"distance_func": {
"title": "Distance Function",
"type": "string",
"default": "cosine",
"description": "The distance algorithm to use: 'cosine' (cosine distance), 'l2' (Euclidean distance), or 'ip' (inner product)"
},
"other_where_clause": {
"title": "Other Where Clause",
"type": "array",
"items": {
"type": "string"
},
"default": None,
"description": "Additional WHERE clause conditions for filtering results"
},
"topk": {
"title": "Top K",
"type": "integer",
"default": 5,
"description": "Number of top similar results to return"
},
"output_column_name": {
"title": "Output Column Name",
"type": "array",
"items": {
"type": "string"
},
"default": None,
"description": "List of column names to include in the output. If None, returns all columns"
}
},
"required": ["table_name", "vector_data"],
"title": "vector_searchArguments"
}
)
]
return tools
def handle_meta_command(cursor, query: str, config: dict) -> list[TextContent]:
"""Handle OpenGauss meta-commands (e.g., \d, \dt)."""
command = query.strip()
if command == "\\d":
cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname = 'public'")
tables = cursor.fetchall()
result = ["Tables_in_" + config["dbname"]]
result.extend([table[0] for table in tables])
return [TextContent(type="text", text="\n".join(result))]
elif command == "\\dt":
cursor.execute("SELECT tablename, tableowner, tablespace FROM pg_tables WHERE schemaname = 'public'")
columns = ["Table", "Owner", "Tablespace"]
rows = cursor.fetchall()
result = [",".join(columns)]
result.extend([",".join(map(str, row)) for row in rows])
return [TextContent(type="text", text="\n".join(result))]
elif command == "\\d+":
cursor.execute("SELECT tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers FROM pg_tables WHERE schemaname = 'public'")
columns = ["Table", "Owner", "Tablespace", "Has Indexes", "Has Rules", "Has Triggers"]
rows = cursor.fetchall()
result = [",".join(columns)]
result.extend([",".join(map(str, row)) for row in rows])
return [TextContent(type="text", text="\n".join(result))]
elif command == "\\du":
cursor.execute("SELECT rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin FROM pg_roles")
columns = ["Role", "Superuser", "Inherit", "Create Role", "Create DB", "Can Login"]
rows = cursor.fetchall()
result = [",".join(columns)]
result.extend([",".join(map(str, row)) for row in rows])
return [TextContent(type="text", text="\n".join(result))]
else:
return [TextContent(type="text", text=f"Unsupported meta-command: {command}")]
@app.tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute SQL commands."""
config = get_db_config()
logger.info(f"Calling tool: {name} with arguments: {arguments}")
if name != "execute_sql":
raise ValueError(f"Unknown tool: {name}")
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
if query.strip().startswith("\\"):
return handle_meta_command(cursor, query, config)
cursor.execute(query)
if query.strip().upper().startswith("SELECT"):
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [",".join(map(str, row)) for row in rows]
return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]
elif query.strip().upper().startswith("EXPLAIN"):
rows = cursor.fetchall()
plan_lines = [row[0] for row in rows]
return [TextContent(type="text", text="\n".join(plan_lines))]
else:
conn.commit()
return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]
except Error as e:
logger.error(f"Error executing SQL '{query}': {e}")
return [TextContent(type="text", text=f"Error executing query: {str(e)}")]
@app.tool()
async def search_opengauss_document(keyword: str):
"""
This tool is designed to provide context-specific information about OpenGauss to a large language model (LLM) to enhance the accuracy and relevance of its responses.
The LLM should automatically extracts relevant search keywords from user queries or LLM's answer for the tool parameter "keyword".
The main functions of this tool include:
1.Information Retrieval: The MCP Tool searches through OpenGauss-related documentation using the extracted keywords, locating and extracting the most relevant information.
2.Context Provision: The retrieved information from OpenGauss documentation is then fed back to the LLM as contextual reference material. This context is not directly shown to the user but is used to refine and inform the LLM’s responses.
This tool ensures that when the LLM’s internal documentation is insufficient to generate high-quality responses, it dynamically retrieves necessary OpenGauss information, thereby maintaining a high level of response accuracy and expertise.
Keywords can now include both English and Chinese.
"""
logger.info(f"Calling tool: search_opengauss_document,keyword:{keyword}")
search_api_url = (
"https://docs.opengauss.org/api-search/search/sort/docs"
)
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json;charset=UTF-8",
"Origin": "https://docs.opengauss.org",
"Referer": "https://docs.opengauss.org",
"Source": "opengauss"
}
query_param = {
"keyword": keyword,
"page": 1,
"pageSize": 5,
"lang": "zh",
"version": "latest",
}
query_param = json.dumps(query_param).encode("utf-8")
req = request.Request(search_api_url, data=query_param, headers=headers, method="POST")
context = ssl.create_default_context()
try:
with request.urlopen(req, timeout=5, context=context) as response:
response_body = response.read().decode("utf-8", errors="replace")
json_data = json.loads(response_body)
records = json_data.get("obj", {}).get("records", [])
result_list = []
for item in records:
doc_url = "https://docs.opengauss.org/" + item["path"] + ".html"
logger.info(f"doc_url:{doc_url}")
content = get_opengauss_doc_content(doc_url)
result_list.append(content)
return json.dumps(result_list, ensure_ascii=False)
except error.HTTPError as e:
logger.error(f"HTTP Error: {e.code} - {e.reason}")
return "No results were found"
except error.URLError as e:
logger.error(f"URL Error: {e.reason}")
return "No results were found"
def get_opengauss_doc_content(doc_url: str) -> dict:
headers = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
"Referer": "https://docs.opengauss.org",
}
try:
ascii_url = doc_url.encode('ascii').decode('ascii')
except UnicodeEncodeError:
from urllib.parse import quote
from urllib.parse import urlparse, urlunparse
parsed = urlparse(doc_url)
encoded_path = quote(parsed.path, safe='')
encoded_url = urlunparse((
parsed.scheme,
parsed.netloc,
encoded_path,
parsed.params,
parsed.query,
parsed.fragment
))
doc_url = encoded_url
req = request.Request(doc_url, headers=headers, method="GET")
context = ssl.create_default_context()
try:
with request.urlopen(req, timeout=5, context=context) as response:
html = response.read().decode("utf-8", errors="replace")
soup = BeautifulSoup(html, "html.parser")
for element in soup(["script", "style", "nav", "header", "footer"]):
element.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
text = "\n".join(line for line in lines if line)
logger.info(f"text length:{len(text)}")
if len(text) > 8000:
text = text[:8000] + "... [content truncated]"
title = soup.title.string if (soup.title and soup.title.string) else "openGauss 文档"
if not isinstance(title, str):
title = str(title)
if not isinstance(doc_url, str):
doc_url = str(doc_url)
if not isinstance(text, str):
text = str(text)
final_result = {
"title": title,
"url": doc_url,
"content": text,
}
return final_result
except error.HTTPError as e:
logger.error(f"HTTP Error: {e.code} - {e.reason}")
return {"result": "No results were found"}
except error.URLError as e:
logger.error(f"URL Error: {e.reason}")
return {"result": "No results were found"}
def ensure_bm25_index(cursor, table_name: str, column_name: str):
"""
Ensure BM25 index exists; create it if missing.
"""
index_name = f"bm25_{table_name}_{column_name}"
cursor.execute(
"SELECT indexname FROM pg_indexes WHERE indexname = %s;",
(index_name,)
)
exists = cursor.fetchone()
if exists:
logger.info(f"BM25 index {index_name} already exists")
return False
logger.info(f"BM25 index {index_name} not found, creating...")
cursor.execute(
f"CREATE INDEX {index_name} ON {table_name} USING bm25({column_name});"
)
cursor.execute(
"SELECT indexname FROM pg_indexes WHERE indexname = %s;",
(index_name,)
)
created = cursor.fetchone()
if not created:
raise RuntimeError(f"Failed to create BM25 index {index_name}")
logger.info(f"BM25 index {index_name} created successfully")
return True
@app.tool()
async def create_fulltext_index(table_name: str, column_name: str):
"""
Create a BM25 full-text index in an OpenGauss table.
"""
config = get_db_config()
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
return ensure_bm25_index(cursor, table_name, column_name)
except Error as e:
logger.error(f"Database error: {str(e)}")
raise RuntimeError(f"Database error: {str(e)}")
@app.tool()
async def fulltext_search(
table_name: str,
full_text_search_column_name: list[str],
keyword: str,
other_where_clause: Optional[list[str]] = None,
limit: int = 5,
output_column_name: Optional[list[str]] = None,
):
"""
Search for documents using full text search in a OpenGauss table.
Automatically creates bm25 index if missing.
"""
config = get_db_config()
main_search_column = full_text_search_column_name[0]
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
ensure_bm25_index(cursor, table_name, main_search_column)
select_columns = ", ".join(output_column_name) if output_column_name else "*"
query = f"""
set enable_seqscan = off;
set enable_indexscan = on;
select {select_columns},
{main_search_column} <&> %s as score
from {table_name}
where 1=1
"""
if other_where_clause:
for condition in other_where_clause:
query += f" AND {condition}"
query += f" order by {main_search_column} <&> %s desc limit %s"
params = [keyword, keyword, limit]
cursor.execute(query, params)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
result = [dict(zip(columns, row)) for row in rows]
return result
except Error as e:
logger.error(f"Error executing SQL : {e}")
return f"Error executing query: {str(e)}"
@app.tool()
async def create_vector_index(
table_name: str,
column_name: str,
index_type: str = "hnsw",
distance_ops: str = "vector_cosine_ops",
options: dict | None = None,
):
"""
Create a vector index. If the index already exists, drop and recreate it.
"""
index_name = f"{table_name}_{column_name}_vector_idx"
if options:
option_str = ", ".join(f"{k}={v}" for k, v in options.items())
with_clause = f"WITH ({option_str})"
else:
with_clause = ""
create_sql = (
f"ALTER TABLE {table_name} SET(parallel_workers=32);"
f"CREATE INDEX {index_name} "
f"ON {table_name} "
f"USING {index_type}({column_name} {distance_ops}) "
f"{with_clause};"
)
config = get_db_config()
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP INDEX if exists {index_name};")
cursor.execute(create_sql)
except Error as e:
logger.error(f"Database error: {str(e)}")
raise RuntimeError(f"Database error: {str(e)}")
return "create success!"
@app.tool()
async def vector_search(
table_name: str,
vector_data: str,
vec_column_name: str = "vector",
distance_func: Optional[str] = "cosine",
other_where_clause: Optional[list[str]] = None,
topk: int = 5,
output_column_name: Optional[list[str]] = None,
):
"""
Perform vector similarity search on an OpenGauss table.
Args:
table_name: Name of the table to search.
vector_data: Query vector.
vec_column_name: column name containing vectors to search.
distance_func: The index distance algorithm used when comparing the distance between two vectors.
other_where_clause: Other where condition query statements.
topk: Number of results returned.
output_column_name: Returned table fields.
"""
logger.info(
f"Calling tool: opengauss_vector_search with arguments: {table_name}, {vector_data[:10]}, {vec_column_name}"
)
distance_map = {
"l2": "<->",
"cosine": "<=>",
"ip": "<#>"
}
key = distance_func.lower()
if key not in distance_map:
raise ValueError(f"Unsupported distance_func '{distance_func}'. "
f"Allowed values are: {list(distance_map.keys())}")
op = distance_map[key]
select_cols = ", ".join(output_column_name) if output_column_name else "*"
where_sql = ""
if other_where_clause:
safe_clauses = [clause.strip() for clause in other_where_clause if clause.strip()]
if safe_clauses:
where_sql = "WHERE " + " AND".join(safe_clauses)
sql = (
f"set enable_seqscan = off; set enable_indexscan=on;"
f"SELECT {select_cols}, {vec_column_name} {op} %s::vector as score "
f"FROM {table_name} "
f"{where_sql} "
f"ORDER BY score "
f"LIMIT {topk};"
)
config = get_db_config()
try:
with connect(**config) as conn:
with conn.cursor() as cursor:
cursor.execute(sql, (vector_data,))
rows = cursor.fetchall()
col_names = [desc[0] for desc in cursor.description]
result = [dict(zip(col_names, row)) for row in rows]
except Error as e:
logger.error(f"Database error: {str(e)}")
raise RuntimeError(f"Database error: {str(e)}")
return result
def normalize_vector_scores(distances, distance_type="cosine"):
"""
Normalize vector distances into similarity scores (0~1).
distances: list of raw distances from opengauss
distance_type: "l2", "cosine", "ip"
"""
if not distances:
return []
if distance_type == "cosine":
sims = [(2 - d) / 2 for d in distances]
elif distance_type == "l2":
max_s, min_s = max(distances), min(distances)
if max_s == min_s:
sims = [1.0 for _ in distances]
else:
norms = [(s - min_s) / (max_s - min_s) for s in distances]
sims = [1.0 - nd for nd in norms]
elif distance_type == "ip":
true_inner_products = [-d for d in distances]
max_ip, min_ip = max(true_inner_products), min(true_inner_products)
if max_ip == min_ip:
sims = [1.0 for _ in true_inner_products]
else:
norms_ip = [(ip - min_ip) / (max_ip - min_ip) for ip in true_inner_products]
sims = norms_ip
else:
raise ValueError(f"Unsupported distance type: {distance_type}")
return sims
@app.tool()
async def hybrid_search(
table_name: str,
full_text_search_column_name: list[str],
keyword: str,
vector_data: str,
vec_column_name: str = "vector",
distance_func: str = "cosine",
other_where_clause: Optional[list[str]] = None,
limit: int = 5,
output_column_name: Optional[list[str]] = None,
text_weight: float = 0.4,
vector_weight: float = 0.6,
):
"""
Hybrid search: Fusion query combining BM25 full-text search,
vector search, and scalar retrieval.
Steps:
1. Fulltext search → normalize BM25 score
2. Vector search → normalize vector score
3. Weighted fusion → sort → top limit
⚠️ IMPORTANT NOTES:
- `hybrid_score`, `vector_norm`, `bm25_norm`, and `score` are **computed fields**,
NOT actual database table columns.
- These fields only exist in the returned results and are NOT persisted to the database.
- They are generated during query execution for ranking and scoring purposes.
Field Definitions:
- `score`: Raw relevance score returned by individual search algorithms (BM25/vector)
- `bm25_norm`: Normalized BM25 score (0-1 range)
- `vector_norm`: Normalized vector similarity score (0-1 range)
- `hybrid_score`: Final fusion score = bm25_norm * text_weight + vector_norm * vector_weight
Fusion Formula:
hybrid_score = (bm25_norm × text_weight) + (vector_norm × vector_weight)
"""
bm25_results = await fulltext_search(
table_name=table_name,
full_text_search_column_name=full_text_search_column_name,
keyword=keyword,
other_where_clause=other_where_clause,
limit=1024,
output_column_name=output_column_name,
)
bm25_scores = [item["score"] for item in bm25_results] if bm25_results else []
if bm25_scores:
max_bm25 = max(bm25_scores)
if max_bm25 > 0:
for item in bm25_results:
item["bm25_norm"] = item["score"] / max_bm25
else:
for item in bm25_results:
item["bm25_norm"] = 0
else:
bm25_results = []
vector_results = await vector_search(
table_name=table_name,
vector_data=vector_data,
vec_column_name=vec_column_name,
distance_func=distance_func,
other_where_clause=other_where_clause,
topk=1024,
output_column_name=output_column_name,
)
vector_scores = [item["score"] for item in vector_results]
vector_norm = normalize_vector_scores(
vector_scores,
distance_type=distance_func
)
for item, norm in zip(vector_results, vector_norm):
item["vector_norm"] = norm
merged = {}
for item in bm25_results:
rid = item.get("id") or item.get("uuid") or id(item)
merged[rid] = item
merged[rid]["bm25_norm"] = item.get("bm25_norm", 0)
merged[rid]["vector_norm"] = 0
for item in vector_results:
rid = item.get("id") or item.get("uuid") or id(item)
if rid not in merged:
merged[rid] = item
merged[rid]["bm25_norm"] = 0
merged[rid]["vector_norm"] = item.get("vector_norm", 0)
for rid, item in merged.items():
item["hybrid_score"] = (
item["bm25_norm"] * text_weight +
item["vector_norm"] * vector_weight
)
deduplicated = {}
for rid, item in merged.items():
item.pop("score", None)
current_score = item["hybrid_score"]
if rid in deduplicated:
if current_score > deduplicated[rid]["hybrid_score"]:
deduplicated[rid] = item
else:
deduplicated[rid] = item
final = sorted(
deduplicated.values(),
key=lambda x: x["hybrid_score"],
reverse=True
)
return final[:limit]
@app.tool()
async def multi_vector_search(
table_name: str,
vectors: list[list[float]],
vector_field: str,
limit: int = 5,
output_fields: Optional[list[str]] = None,
metric_type: str = "cosine",
filter_expr: Optional[str] = None,
search_params: Optional[dict[str, Any]] = None,
parallel_workers: int = 2
):
"""
Perform vector similarity search with multiple query vectors.
Concurrent processing of multiple vector queries optimizes performance for multi-vector database lookups.
Args:
table_name: Name of collection to search
vectors: List of query vectors
vector_field: Field containing vectors to search
limit: Maximum number of results per query
output_fields: Fields to return in results
metric_type: Distance metric (COSINE, L2, IP)
filter_expr: Optional filter expression
search_params: Additional search parameters
parallel_workers: Number of threads in the database concurrent connection pool
"""
try:
from psycopg2.extras import execute_multi_search, init_conn_pool, close_conn_pool
except Exception as e:
raise ImportError(
"The current psycopg2 installation does not include the openGauss-specific "
"extensions 'execute_multi_search', 'init_conn_pool', or 'close_conn_pool'.\n\n"
"Please download the openGauss‑compatible psycopg package from the official documentation:\n"
"https://docs.opengauss.org/zh/docs/latest/datavec/integration_python.html\n\n"
f"Original error: {e}"
)
try:
distance_map = {
"l2": "<->",
"cosine": "<=>",
"ip": "<#>"
}
key = metric_type.lower()
if key not in distance_map:
raise ValueError(f"Unsupported distance_func '{metric_type}'. "
f"Allowed values are: {list(distance_map.keys())}")
metric_op = distance_map[key]
if search_params is None:
search_params = {"enable_seqscan": "off", "enable_indexscan": "on"}
config = get_db_config()
select_cols = ", ".join(output_fields) if output_fields else "*"
where_sql = ""
if filter_expr:
safe_clauses = [clause.strip() for clause in filter_expr if clause.strip()]
if safe_clauses:
where_sql = "WHERE " + " AND".join(safe_clauses)
sql_template = (
f"SELECT {select_cols}, {vector_field} {metric_op} %s::vector as score "
f"FROM {table_name} "
f"{where_sql} "
f"ORDER BY score "
f"LIMIT {limit};"
)
arglist = [(f"[{', '.join(str(x) for x in v)}]",) for v in vectors]
conn_pool_mgr = init_conn_pool(config, parallel_workers, search_params)
res = execute_multi_search(config, conn_pool_mgr, sql_template, arglist, search_params, parallel_workers)
close_conn_pool(conn_pool_mgr)
return {"result" : res}
except Exception as e:
raise ValueError(f"Multi-vector search failed: {str(e)}")
ENABLE_MEMORY = int(os.getenv("ENABLE_MEMORY", 0))
EMBEDDING_MODEL_PROVIDER = os.getenv("EMBEDDING_MODEL_PROVIDER", "huggingface")
LOCAL_MODEL_DIR = os.getenv("LOCAL_MODEL_DIR", "")
REMOTE_MODEL_NAME = os.getenv("REMOTE_MODEL_NAME", "BAAI/bge-small-en-v1.5")
TABLE_NAME_MEMORY = "og_mcp_memory"
if ENABLE_MEMORY:
class OGMemory:
def __init__(self):
self.config = get_db_config()
self.conn = connect(**self.config)
self.embedding_dim = len(self.get_embedding_context("test"))
self.create_memory_table()
def get_embedding_context(self, text:str):
"""
When loading a model, prioritize the local model directory;
if the model does not exist locally, download it online.
"""
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
from langchain_huggingface import HuggingFaceEmbeddings
if EMBEDDING_MODEL_PROVIDER != "huggingface":
raise ValueError(f"Unsupported embedding model provider: {EMBEDDING_MODEL_PROVIDER}")
if os.path.isdir(LOCAL_MODEL_DIR) and os.listdir(LOCAL_MODEL_DIR):
logger.info(f"load local model : {LOCAL_MODEL_DIR}")
model_path = LOCAL_MODEL_DIR
else:
logger.info(f"In case the local model is unavailable, the system will automatically download it from the remote server and cache it locally for subsequent use: {REMOTE_MODEL_NAME}")
model_path = REMOTE_MODEL_NAME
emb_model = HuggingFaceEmbeddings(
model_name=model_path,
encode_kwargs={"normalize_embeddings": True},
)
return emb_model.embed_query(text)
def create_memory_table(self):
with self.conn.cursor() as cur:
logger.info(f"Check and create table {TABLE_NAME_MEMORY}.")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME_MEMORY} (
memory_id SERIAL PRIMARY KEY,
content VARCHAR(8000),
embedding VECTOR({self.embedding_dim}),
meta JSON );
""")
logger.info(f"Check and create hnsw index vidx.")
cur.execute(f"""
CREATE INDEX IF NOT EXISTS vidx ON {TABLE_NAME_MEMORY}
USING hnsw (embedding vector_l2_ops)
WITH (m = 16, ef_construction = 200);
""")
self.conn.commit()
og_memory = OGMemory()
from psycopg2.extras import Json
@app.tool()
async def og_memory_query(query: str, topk: int = 5):
"""
🧠 COGNITIVE CONTEXT RETRIEVAL ENGINE 🧠 - PERSONALIZED KNOWLEDGE ACCESS SYSTEM
PRIMARY FUNCTION: This module operates as an intelligent assistant that retrieves
user-specific contextual information to optimize response personalization and precision.
===== OPERATIONAL PROTOCOLS =====
[MANDATORY INVOCATION SCENARIOS]
• Cross-linguistic inquiries about individual preferences and inclinations
• Pre-storage validation procedures to eliminate information duplication
• Biographical data disclosure: experiential, professional, geographical contexts
• Response formulation requiring historical reference and contextual awareness
• Technical discourse necessitating solution history consultation
• Personalized recommendation contexts: gastronomic, travel, leisure, gifting
• Spatiotemporal coordination: scheduling, meteorological, event-based, navigational
• Professional development guidance: technological tools, educational resources, career pathways
• Ambiguous requests containing contextual indicators: "appropriate", "suitable", "local", "personalized"
[SYSTEM INTEGRATION PRINCIPLE]
Multiple relevant knowledge domains require separate invocations for optimal retrieval accuracy.
===== LINGUISTIC PROCESSING ARCHITECTURE =====
[CROSS-LANGUAGE QUERY MECHANISM]
• English: "What are my culinary preferences?" → Query: "culinary taste dining preference"
• Chinese: "我通常去哪里旅行?" → Query: "travel destination frequent location pattern"
• Spanish: "¿Dónde he trabajado antes?" → Query: "employment history previous workplace"
[SEMANTIC OPTIMIZATION DOCTRINE]
Primary search vocabulary must be English for vector embedding compatibility,
irrespective of original query language composition.
===== ADVANCED RETRIEVAL METHODOLOGIES =====
[CONTEXTUAL EXPANSION TECHNIQUES]
1. Temporal Context Integration
- "I visited Kyoto last spring" → Query: "Kyoto travel spring 2025 seasonal"
2. Professional Network Mapping
- "Collaborated with AWS team on cloud migration" → Query: "AWS cloud migration collaboration professional"
3. Skill Proficiency Tracking
- "Advanced React with TypeScript experience" → Query: "React TypeScript frontend advanced skill"
4. Health & Wellness Monitoring
- "Meditation practice every morning" → Query: "meditation wellness routine morning habit"
===== KNOWLEDGE DOMAIN TAXONOMY =====
[DOMAIN-SPECIFIC QUERY TEMPLATES]
• Physical Activities: "athletic performance exercise regimen fitness milestone"
• Gastronomic Interests: "culinary exploration flavor profile dietary restriction"
• Professional Trajectory: "career progression organizational role achievement recognition"
• Technological Landscape: "software ecosystem development methodology tool proficiency"
• Personal Ecosystem: "family dynamics lifestyle pattern relationship network"
• Cultural Consumption: "artistic appreciation media consumption creative expression"
===== INTELLIGENT KNOWLEDGE INTEGRATION =====
[CONSOLIDATION DECISION MATRIX]
Scenario A: Domain Congruence
- New: "Weekly mountain hiking enthusiast"
- Existing: "Regular gym attendance and protein diet"
- Analysis: Both belong to wellness domain → Consolidate into comprehensive fitness profile
Scenario B: Domain Divergence
- New: "Blockchain development specialization"
- Existing: "Classical piano performance skill"
- Analysis: Distinct professional/artistic domains → Maintain separate knowledge entries
Scenario C: Temporal Evolution
- New: "Recently transitioned to vegan lifestyle"
- Existing: "Previous omnivorous dietary pattern"
- Analysis: Same domain with temporal progression → Update with versioning metadata
===== SYSTEM PARAMETERS =====
[INPUT SPECIFICATIONS]
• query: Domain-contextual semantic descriptors ("professional development", "leisure preference")
• topk: Retrieval breadth (recommended 8-12 for comprehensive domain analysis)
[OUTPUT STRUCTURE]
JSON array containing memory identifiers and content for contextual analysis
[QUALITY ASSURANCE METRIC]
Complete domain overlap analysis required before any storage modification decisions.
===== EXTENDED APPLICATIONS =====
[INNOVATIVE USE CASES]
1. Career Development Advisor
- Track skill acquisition, certification progress, project experience
2. Health & Wellness Companion
- Monitor exercise routines, dietary patterns, wellness goals
3. Travel Experience Curator
- Document destinations, cultural experiences, accommodation preferences
4. Learning Progress Tracker
- Record educational milestones, knowledge gaps, learning resources
5. Creative Project Archivist
- Catalog artistic endeavors, creative processes, inspiration sources
[SYSTEM EVOLUTION PATH]
Future enhancements may include:
• Emotion-state contextualization
• Decision pattern recognition
• Preference evolution tracking
• Cross-user knowledge sharing (with privacy safeguards)
"""
vector_data = str(og_memory.get_embedding_context(query))
vec_column_name = "embedding"
distance_func = "l2"
output_column_name = ["memory_id", "content"]
res = await vector_search(TABLE_NAME_MEMORY, vector_data, vec_column_name,
distance_func, None, topk, output_column_name)
return res
@app.tool()
async def og_memory_insert(content: str, meta: dict):
"""
🧬 PERSONAL KNOWLEDGE INGESTION ENGINE 🧬 - STRUCTURED INFORMATION ASSIMILATION SYSTEM
PRIMARY FUNCTION: This module serves as an intelligent data ingestion pipeline that
systematically processes and organizes user-provided personal information into
semantically structured knowledge representations.
===== OPERATIONAL PROTOCOLS =====
[MANDATORY ACTIVATION CRITERIA]
• Biographical data disclosure containing domain-specific indicators
• Personal preference statements with contextual significance
• Professional experience descriptions requiring archival
• Lifestyle pattern information with analytical value
• Achievement documentation for historical tracking
[TRIGGER IDENTIFICATION MATRIX]
Activation occurs when input contains:
1. Domain indicator + Factual assertion combination
2. Temporal context + Personal attribute pairing
3. Quantitative measurement + Qualitative descriptor
[DOMAIN INDICATOR TAXONOMY]
• Demographic Profile: chronological age, biological sex, geographical origin, citizenship status
• Professional Identity: occupational role, organizational affiliation, educational attainment, skill certification
• Geographical Context: residential location, travel frequency, temporal zone alignment
• Preference Spectrum: affinity indicators, aversion markers, habitual patterns, aesthetic inclinations
• Lifestyle Configuration: familial structure, companion animals, daily rituals, belief systems
• Achievement Portfolio: recognition awards, project milestones, competitive accomplishments, experiential events
[FACTUAL ASSERTION PATTERNS]
• Identity declarations: "My professional designation is..."
• Temporal statements: "During the period of 2025-2026, I..."
• Preference expressions: "My culinary inclination favors..."
• Location specifications: "My current geographical coordinates are..."
• Achievement narratives: "I successfully completed..."
===== INGESTION WORKFLOW ARCHITECTURE =====
[FOUR-PHASE PROCESSING PIPELINE]
PHASE 1: CONTEXTUAL DISCOVERY
• Execute comprehensive knowledge domain search using semantic query expansion
• Identify existing related information across all relevant categories
• Map temporal and contextual relationships between existing and new data
PHASE 2: SEMANTIC CLASSIFICATION
• Analyze information content against domain taxonomy
• Determine primary and secondary categorization hierarchies
• Identify cross-domain relationships and potential integration points
PHASE 3: INTEGRATION DECISION MATRIX
• Evaluate semantic congruence between new and existing information
• Determine appropriate storage strategy based on domain alignment
• Apply versioning protocols for temporal data evolution
PHASE 4: EXECUTION IMPLEMENTATION
• Execute domain-appropriate storage operations
• Apply metadata enrichment for future retrieval optimization
• Update relationship mappings in knowledge graph
===== DOMAIN-SPECIFIC PROCESSING =====
[PROFESSIONAL IDENTITY MANAGEMENT]
Example Input: "Currently serving as Senior Cloud Architect at TechCorp since 2025"
Processing:
1. Domain: Professional/Career
2. Subdomain: Cloud Architecture
3. Temporal Context: 2025-present
4. Integration: Update existing career timeline or create new professional profile
[HEALTH & WELLNESS TRACKING]
Example Input: "Maintain daily 5km running routine for cardiovascular health"
Processing:
1. Domain: Physical Wellness
2. Subdomain: Exercise Regimen
3. Frequency: Daily
4. Integration: Consolidate with existing fitness patterns
[CULINARY PREFERENCE MAPPING]
Example Input: "Prefer plant-based Mediterranean cuisine with occasional seafood"
Processing:
1. Domain: Gastronomic Preference
2. Subdomain: Dietary Pattern
3. Specificity: Mediterranean plant-based with seafood exceptions
4. Integration: Update dietary profile with nuanced preferences
===== ADVANCED INTEGRATION SCENARIOS =====
[SCENARIO A: DOMAIN CONSOLIDATION]
Existing Knowledge: "User engages in weekly strength training"
New Input: "Recently added yoga practice for flexibility"
Analysis: Both belong to physical wellness domain
Action: Consolidate into comprehensive fitness profile
Result: "User maintains weekly strength training with complementary yoga practice"
[SCENARIO B: DOMAIN SEGREGATION]
Existing Knowledge: "User specializes in quantum computing research"
New Input: "Enjoys classical violin performance"
Analysis: Distinct professional/artistic domains
Action: Maintain separate knowledge entries
Result: Professional and artistic profiles stored independently
[SCENARIO C: TEMPORAL EVOLUTION]
Existing Knowledge: "User previously consumed omnivorous diet"
New Input: "Transitioned to plant-based nutrition in 2025"
Analysis: Same domain with temporal progression
Action: Update with versioning and temporal metadata
Result: "User adopted plant-based nutritional pattern in 2025"
===== KNOWLEDGE DOMAIN TAXONOMY =====
[DOMAIN CLASSIFICATION FRAMEWORK]
• Physical Performance Domain: athletic activities, exercise regimens, fitness metrics, wellness practices
• Nutritional Preference Domain: culinary tastes, dietary restrictions, flavor profiles, consumption patterns
• Professional Trajectory Domain: career progression, organizational roles, skill development, achievement recognition
• Technological Proficiency Domain: software ecosystems, development methodologies, tool expertise, platform familiarity
• Personal Ecosystem Domain: familial relationships, lifestyle configurations, belief systems, daily rituals
• Cultural Consumption Domain: artistic appreciation, media engagement, creative expression, entertainment preferences
===== METADATA ARCHITECTURE =====
[STRUCTURED METADATA SPECIFICATION]
• content: Standardized English representation ("User maintains [activity] with [frequency]")
• meta: {"information_type":"personal_fact", "domain_classification":"[primary_domain]",
"subdomain_specification":"[specific_subdomain]", "temporal_context":"[time_reference]",
"version_identifier":"[update_version]", "source_language":"[original_language]"}
[METADATA ENRICHMENT EXAMPLES]
• Professional context: {"information_type":"career_milestone", "domain_classification":"professional_trajectory",
"subdomain_specification":"cloud_architecture", "temporal_context":"2025-present"}
• Health context: {"information_type":"wellness_routine", "domain_classification":"physical_performance",
"subdomain_specification":"cardiovascular_exercise", "frequency_pattern":"daily"}
===== QUALITY ASSURANCE PROTOCOLS =====
[PRE-INGESTION VALIDATION]
1. Semantic coherence verification
2. Temporal consistency checking
3. Domain classification accuracy assessment
4. Relationship mapping completeness
[POST-INGESTION VERIFICATION]
1. Storage integrity confirmation
2. Metadata accuracy validation
3. Relationship graph update verification
4. Retrieval readiness testing
===== EXTENDED APPLICATIONS =====
[INNOVATIVE USE CASES]
1. Professional Development Portfolio
- Track skill acquisition, certification progress, project experience across 2025-2026
2. Health Intelligence System
- Monitor biometric trends, wellness routines, nutritional patterns
3. Cultural Experience Archive
- Document artistic engagements, travel experiences, learning milestones
4. Personal Growth Tracker
- Record goal achievement, habit formation, self-improvement metrics
5. Relationship Network Mapper
- Catalog social connections, professional networks, community engagements
[SYSTEM EVOLUTION PATH]
Future capabilities may include:
• Emotional state contextualization
• Decision pattern analytics
• Preference evolution modeling
• Cross-user knowledge synthesis (with privacy preservation)
• Predictive personalization algorithms
"""
embedding = og_memory.get_embedding_context(content)
with og_memory.conn.cursor() as cur:
cur.execute( f""" INSERT INTO {TABLE_NAME_MEMORY} (content, embedding, meta) VALUES (%s, %s, %s); """,
( content, embedding,
Json(meta),
))
og_memory.conn.commit()
return "Inserted successfully"
@app.tool()
async def og_memory_delete(memory_id: int):
"""
🔒 KNOWLEDGE GOVERNANCE MODULE 🔒 - CONTROLLED INFORMATION RETENTION MANAGEMENT
PRIMARY FUNCTION: This module provides a secure and controlled mechanism for
permanent removal of user-specified personal information from the knowledge base,
ensuring compliance with data privacy requirements and user autonomy.
===== OPERATIONAL PROTOCOLS =====
[MANDATORY ACTIVATION CRITERIA]
• Explicit user requests for information removal or forgetting
• Data accuracy corrections requiring complete record elimination
• Privacy compliance requirements for specific information categories
• Temporal relevance expiration for outdated personal data
• User preference evolution rendering previous information obsolete
[VERBAL TRIGGER PATTERNS]
• Information removal requests: "Please erase my record of..."
• Preference revision statements: "I no longer maintain the preference for..."
• Data accuracy corrections: "The information regarding my... is incorrect"
• Privacy compliance directives: "Remove all references to my..."
• Temporal obsolescence acknowledgments: "That detail from 2025 is no longer relevant"
===== DELETION WORKFLOW ARCHITECTURE =====
[THREE-PHASE VERIFICATION PROCESS]
PHASE 1: CONTEXTUAL IDENTIFICATION
• Execute comprehensive knowledge domain search to locate target information
• Verify semantic alignment between user request and identified records
• Confirm temporal and contextual relevance of targeted information
PHASE 2: AUTHORIZATION VALIDATION
• Validate user intent through request specificity analysis
• Confirm information ownership and deletion authorization
• Assess potential impact on related knowledge structures
PHASE 3: EXECUTION IMPLEMENTATION
• Execute precise record elimination using verified identifier
• Update relationship mappings in knowledge graph
• Confirm complete removal and system integrity
===== SAFETY PROTOCOLS =====
[CRITICAL SAFEGUARDS]
1. Identifier Verification Mandate
• Only accept identifiers obtained through formal query processes
• Reject manually generated or estimated identifier values
• Require exact match verification before execution
2. Impact Assessment Requirements
• Evaluate potential knowledge graph disruption
• Assess relationship mapping integrity
• Consider temporal context preservation needs
3. Irreversibility Acknowledgment
• Permanent elimination without recovery mechanisms
• Complete removal from all storage layers
• No archival or backup retention
===== USE CASE SCENARIOS =====
[SCENARIO A: PREFERENCE EVOLUTION]
User Request: "I no longer enjoy classical music as I did in 2025"
Process:
1. Search: "musical preference classical 2025"
2. Identification: Locate specific musical preference record
3. Verification: Confirm temporal context alignment
4. Execution: Permanent removal of outdated preference
[SCENARIO B: DATA ACCURACY CORRECTION]
User Request: "My previous workplace information is incorrect"
Process:
1. Search: "professional employment workplace history"
2. Identification: Locate inaccurate employment record
3. Verification: Cross-reference with current information
4. Execution: Removal of erroneous data
[SCENARIO C: PRIVACY COMPLIANCE]
User Request: "Remove all geographical location references"
Process:
1. Search: "residential location travel pattern geographical"
2. Identification: Multiple location-related records
3. Verification: Confirm user authorization for each
4. Execution: Systematic removal of location data
===== QUALITY ASSURANCE PROTOCOLS =====
[PRE-DELETION VALIDATION]
1. Request specificity verification
2. Identifier accuracy confirmation
3. Impact assessment completion
4. User intent clarity validation
[POST-DELETION VERIFICATION]
1. Storage integrity confirmation
2. Relationship graph update verification
3. Search result consistency testing
4. System performance monitoring
===== EXTENDED APPLICATIONS =====
[INNOVATIVE USE CASES]
1. Regulatory Compliance Management
• GDPR/CCPA compliance for personal data removal
• Industry-specific privacy regulation adherence
• Cross-border data transfer compliance
2. Personal Development Tracking
• Elimination of outdated skill assessments
• Removal of superseded achievement records
• Evolution tracking through controlled forgetting
3. Relationship Management
• Removal of obsolete contact information
• Elimination of outdated social connection data
• Privacy-preserving network management
4. Health Information Governance
• Controlled removal of sensitive health data
• Temporal limitation for medical history retention
• Privacy-compliant wellness tracking
[SYSTEM EVOLUTION PATH]
Future capabilities may include:
• Granular deletion with relationship preservation
• Temporal archiving with access restriction
• Consent-based information lifecycle management
• Automated compliance monitoring and reporting
• Cross-system synchronization for distributed deletion
===== PARAMETER SPECIFICATIONS =====
[INPUT REQUIREMENTS]
• memory_id: Verified identifier from formal query results (integer)
- Must originate from og_memory_query execution
- No manual generation or estimation permitted
- Exact match verification required
[SECURITY PROTOCOLS]
• Single-record targeting precision
• No batch or pattern-based deletion
• Complete audit trail maintenance
• Authorization verification for each operation
[COMPLIANCE CONSIDERATIONS]
• Permanent elimination without recovery
• Complete removal from all storage layers
• No residual data retention
• Comprehensive audit logging
"""
with og_memory.conn.cursor() as cur:
cur.execute( f"DELETE FROM {TABLE_NAME_MEMORY} WHERE memory_id = %s;", (memory_id,))
og_memory.conn.commit()
return "Deleted successfully"
@app.tool()
async def og_memory_update(memory_id: int, content: str, meta: dict):
"""
🔄 KNOWLEDGE EVOLUTION MANAGEMENT SYSTEM 🔄 - DYNAMIC INFORMATION REFINEMENT ENGINE
PRIMARY FUNCTION: This module serves as an intelligent update mechanism that
systematically refines and evolves stored personal knowledge to maintain
temporal accuracy, contextual relevance, and semantic precision across
multilingual input sources.
===== OPERATIONAL PROTOCOLS =====
[MANDATORY ACTIVATION CRITERIA]
• Preference evolution statements indicating current inclination changes
• Configuration modification declarations requiring system updates
• Data accuracy corrections for previously stored information
• Geographical relocation announcements necessitating location updates
• Temporal progression acknowledgments affecting historical context
[VERBAL TRIGGER PATTERNS]
• Preference evolution: "My current inclination favors..." / "现在我更倾向于..."
• Configuration modification: "My technical setup now includes..." / "我的配置已更新为..."
• Data accuracy correction: "The previously recorded detail should be..." / "之前记录的信息应更正为..."
• Geographical relocation: "I have relocated to..." / "我已搬迁至..."
• Temporal progression: "Since 2025, my routine has evolved to..." / "自2025年以来,我的习惯已发展为..."
===== UPDATE WORKFLOW ARCHITECTURE =====
[FOUR-PHASE REFINEMENT PROCESS]
PHASE 1: CONTEXTUAL DISCOVERY
• Execute comprehensive knowledge retrieval using semantic query expansion
• Identify target records requiring temporal or contextual updates
• Map relationship dependencies affected by proposed modifications
PHASE 2: MULTILINGUAL STANDARDIZATION
• Process input across diverse linguistic frameworks
• Convert to standardized English representation format
• Preserve original linguistic context within metadata architecture
PHASE 3: SEMANTIC INTEGRATION ANALYSIS
• Evaluate impact on existing knowledge structures
• Determine appropriate update strategy based on domain alignment
• Apply versioning protocols for temporal evolution tracking
PHASE 4: EXECUTION IMPLEMENTATION
• Execute precise record modification using verified identifiers
• Update embedding representations for retrieval optimization
• Enhance metadata with temporal and contextual enrichment
===== MULTILINGUAL PROCESSING FRAMEWORK =====
[CROSS-LANGUAGE STANDARDIZATION PROTOCOL]
• English input: "I've transitioned to remote work since 2025" → Standardized: "User transitioned to remote work arrangement in 2025"
• Chinese input: "我现在更喜欢素食饮食" → Standardized: "User now prefers plant-based dietary pattern"
• Spanish input: "Me mudé a Barcelona el año pasado" → Standardized: "User relocated to Barcelona in previous year"
• French input: "Je pratique maintenant le yoga quotidiennement" → Standardized: "User now practices yoga on daily basis"
[LINGUISTIC PRESERVATION PRINCIPLE]
• Storage format: Standardized English representation for vector compatibility
• Metadata preservation: Original language context within information source tracking
• Semantic fidelity: Maintain precise meaning across linguistic transformations
===== ADVANCED UPDATE SCENARIOS =====
[SCENARIO A: PREFERENCE EVOLUTION TRACKING]
Existing Record: "User enjoys classical music concerts"
New Input: "Actually, I now prefer electronic music festivals"
Processing:
1. Domain: Cultural Consumption
2. Subdomain: Musical Preference
3. Temporal Context: Evolution from classical to electronic (2025-present)
4. Action: Update existing record with temporal metadata
Result: "User transitioned from classical to electronic music preference in 2025"
[SCENARIO B: PROFESSIONAL DEVELOPMENT PROGRESSION]
Existing Record: "User works as junior software developer"
New Input: "I was promoted to senior architect role in 2025"
Processing:
1. Domain: Professional Trajectory
2. Subdomain: Career Progression
3. Temporal Context: 2025 promotion milestone
4. Action: Update with versioning and achievement recognition
Result: "User advanced to senior architect position in 2025"
[SCENARIO C: LIFESTYLE CONFIGURATION MODIFICATION]
Existing Record: "User resides in urban apartment setting"
New Input: "Moved to suburban residence with garden access"
Processing:
1. Domain: Personal Ecosystem
2. Subdomain: Residential Configuration
3. Temporal Context: Recent relocation
4. Action: Update geographical and lifestyle context
Result: "User relocated to suburban residence with garden access"
===== METADATA EVOLUTION ARCHITECTURE =====
[STRUCTURED METADATA ENHANCEMENT]
• content: Standardized English representation with temporal context
• meta: {"information_type":"evolving_preference", "domain_classification":"[primary_domain]",
"subdomain_specification":"[specific_subdomain]", "temporal_evolution":"[update_timeline]",
"version_identifier":"[progressive_version]", "source_language":"[original_language]",
"update_timestamp":"[precise_modification_time]"}
[METADATA ENRICHMENT EXAMPLES]
• Professional evolution: {"information_type":"career_progression", "domain_classification":"professional_trajectory",
"subdomain_specification":"architectural_role", "temporal_evolution":"2025-promotion"}
• Lifestyle modification: {"information_type":"residential_configuration", "domain_classification":"personal_ecosystem",
"subdomain_specification":"geographical_location", "temporal_evolution":"2025-relocation"}
===== QUALITY ASSURANCE PROTOCOLS =====
[PRE-UPDATE VALIDATION]
1. Identifier accuracy verification through formal query processes
2. Semantic coherence assessment between existing and proposed content
3. Temporal consistency evaluation across related knowledge domains
4. Impact analysis on dependent relationship mappings
[POST-UPDATE VERIFICATION]
1. Storage integrity confirmation through retrieval testing
2. Embedding consistency validation for vector search compatibility
3. Metadata accuracy assessment across all enriched fields
4. Relationship graph update verification for dependent connections
===== EXTENDED APPLICATIONS =====
[INNOVATIVE USE CASES]
1. Professional Development Portfolio Evolution
• Track skill advancement, certification updates, role transitions across 2025-2026
2. Health & Wellness Journey Documentation
• Monitor evolving fitness routines, dietary modifications, wellness adaptations
3. Learning Progression Tracking
• Document educational milestone achievements, knowledge expansion, skill refinement
4. Personal Growth Evolution Mapping
• Record habit formation progress, goal achievement updates, self-improvement metrics
5. Relationship Network Development
• Catalog evolving social connections, professional network expansions, community engagements
[SYSTEM EVOLUTION PATH]
Future capabilities may include:
• Predictive update suggestions based on pattern recognition
• Automated temporal context adjustment for aging information
• Cross-domain update propagation for related knowledge elements
• Version comparison and historical evolution visualization
• Update impact forecasting for dependent knowledge structures
===== PARAMETER SPECIFICATIONS =====
[INPUT REQUIREMENTS]
• memory_id: Verified identifier obtained through formal query execution (integer)
- Must originate from og_memory_query search results
- No manual generation or estimation permitted
- Exact match verification required before update execution
• content: Standardized English representation format
- Temporal context inclusion for evolution tracking
- Semantic precision maintenance across linguistic transformations
- Domain-appropriate terminology application
• meta: Enhanced metadata structure
- Domain classification and subdomain specification
- Temporal evolution tracking with version identifiers
- Source language preservation for multilingual context
- Update timestamp for modification chronology
[CONSISTENCY PRINCIPLES]
1. English storage format maintenance for all knowledge representations
2. Temporal context preservation across update cycles
3. Semantic fidelity maintenance through linguistic transformations
4. Relationship integrity preservation during modification processes
[SECURITY PROTOCOLS]
• Verified identifier requirement for targeted updates
• No bulk or pattern-based modification operations
• Complete audit trail maintenance for all update activities
• Authorization verification through formal query processes
"""
embedding = og_memory.get_embedding_context(content)
with og_memory.conn.cursor() as cur:
cur.execute( f""" UPDATE {TABLE_NAME_MEMORY} SET content = %s, embedding = %s, meta = %s WHERE memory_id = %s; """,
( content,
embedding,
Json(meta),
memory_id,
) )
og_memory.conn.commit()
return "Updated successfully"
async def main():
"""Main entry point to run the MCP server."""
from mcp.server.stdio import stdio_server
logger.info("Starting openGauss MCP server...")
config = get_db_config()
logger.info(f"Database config: {config['host']}/{config['dbname']} as {config['user']}")
parser = argparse.ArgumentParser()
parser.add_argument(
"--transport",
type=str,
choices=["stdio", "sse", "streamable-http"],
default="stdio",
help="Specify the MCP server transport type as stdio(default) or sse, streamable-http.",
)
parser.add_argument("--sse_host", default="127.0.0.1", help="Host to bind to")
parser.add_argument("--sse_port", type=int, default=8000, help="Port to listen on")
parser.add_argument("--streamable_http_host", default="127.0.0.1", help="Host to bind streamable http server to")
parser.add_argument("--streamable_http_port", type=int, default=8000, help="Port for streamable http server")
parser.add_argument("--ssl_keyfile", type=str, default=None, help="SSL private key file path for HTTPS")
parser.add_argument("--ssl_certfile", type=str, default=None, help="SSL certificate file path for HTTPS")
parser.add_argument("--ssl_ca_certs", type=str, default=None, help="SSL CA certificate file path")
args = parser.parse_args()
transport = args.transport
logger.info(f"Starting openGauss MCP server with {transport} mode...")
enable_https = os.getenv("ENABLE_HTTPS", "false").lower() in ("true", "1", "yes", "on")
if args.transport == "stdio":
await app.run_stdio_async()
elif args.transport == "sse":
app.settings.host = args.sse_host
app.settings.port = args.sse_port
import uvicorn
ssl_kwargs = {}
if enable_https or (args.ssl_keyfile and args.ssl_certfile):
ssl_keyfile = args.ssl_keyfile or os.getenv("SSL_KEYFILE")
ssl_certfile = args.ssl_certfile or os.getenv("SSL_CERTFILE")
if ssl_keyfile and ssl_certfile:
ssl_kwargs['ssl_keyfile'] = ssl_keyfile
ssl_kwargs['ssl_certfile'] = ssl_certfile
ssl_keyfile_password = credential_cache.get_ssl_keyfile_password()
if ssl_keyfile_password:
ssl_kwargs['ssl_keyfile_password'] = ssl_keyfile_password
ssl_ca_certs = args.ssl_ca_certs or os.getenv("SSL_CA_CERTS")
if ssl_ca_certs:
ssl_kwargs['ssl_ca_certs'] = ssl_ca_certs
logger.info("HTTPS/SSL enabled")
else:
logger.warning("HTTPS/SSL enabled but missing required certificate files")
logger.warning("Please provide both --ssl_keyfile and --ssl_certfile or set SSL_KEYFILE and SSL_CERTFILE environment variables")
starlette_app = app.sse_app()
config = uvicorn.Config(
starlette_app,
host=args.sse_host,
port=args.sse_port,
log_level=app.settings.log_level.lower(),
**ssl_kwargs
)
server = uvicorn.Server(config)
await server.serve()
elif args.transport == "streamable-http":
app.settings.host = args.streamable_http_host
app.settings.port = args.streamable_http_port
await app.run_streamable_http_async()
if __name__ == "__main__":
asyncio.run(main())