"""渗透测试知识库条目存储。"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from typing import Any
from app.core.database import db_conn, row_to_dict
KB_CATEGORIES = frozenset({"general", "report", "finding", "target", "note"})
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _parse_tags(raw: Any) -> list[str]:
if isinstance(raw, list):
return [str(t).strip() for t in raw if str(t).strip()]
if isinstance(raw, str) and raw.strip():
try:
data = json.loads(raw)
if isinstance(data, list):
return [str(t).strip() for t in data if str(t).strip()]
except json.JSONDecodeError:
return [t.strip() for t in raw.split(",") if t.strip()]
return []
def _row_to_entry(row: dict[str, Any] | None) -> dict[str, Any] | None:
if not row:
return None
d = dict(row)
d["tags"] = _parse_tags(d.pop("tags_json", "[]"))
return d
def _normalize_category(category: str | None) -> str:
c = (category or "general").strip().lower()
return c if c in KB_CATEGORIES else "general"
def list_entries(
*,
category: str | None = None,
keyword: str | None = None,
limit: int = 50,
) -> list[dict[str, Any]]:
limit = max(1, min(int(limit or 50), 100))
sql = """
SELECT id, title, category, tags_json, summary, created_by,
created_at, updated_at
FROM kb_entries
WHERE deleted_at IS NULL
"""
params: list[Any] = []
if category:
sql += " AND category = ?"
params.append(_normalize_category(category))
if keyword and keyword.strip():
kw = f"%{keyword.strip()}%"
sql += " AND (title LIKE ? OR summary LIKE ? OR content LIKE ? OR tags_json LIKE ?)"
params.extend([kw, kw, kw, kw])
sql += " ORDER BY updated_at DESC LIMIT ?"
params.append(limit)
with db_conn() as conn:
rows = conn.execute(sql, tuple(params)).fetchall()
out: list[dict[str, Any]] = []
for r in rows:
entry = _row_to_entry(dict(r))
if entry:
out.append(entry)
return out
def get_entry(entry_id: str, *, include_content: bool = True) -> dict[str, Any] | None:
cols = "id, title, category, tags_json, summary, created_by, created_at, updated_at"
if include_content:
cols += ", content"
with db_conn() as conn:
row = conn.execute(
f"SELECT {cols} FROM kb_entries WHERE id = ? AND deleted_at IS NULL",
(entry_id.strip(),),
).fetchone()
return _row_to_entry(row_to_dict(row))
def search_entries(keyword: str, *, limit: int = 20) -> list[dict[str, Any]]:
return list_entries(keyword=keyword, limit=limit)
def create_entry(
title: str,
content: str,
*,
category: str = "general",
tags: list[str] | None = None,
summary: str = "",
created_by: str | None = None,
) -> dict[str, Any]:
title = title.strip()
if not title:
raise ValueError("标题不能为空")
if not content.strip():
raise ValueError("内容不能为空")
now = _now_iso()
eid = str(uuid.uuid4())
tags_json = json.dumps(_parse_tags(tags or []), ensure_ascii=False)
summary_text = (summary or content.strip()[:200]).strip()
with db_conn() as conn:
conn.execute(
"""
INSERT INTO kb_entries(
id, title, category, tags_json, summary, content,
created_by, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,?,?)
""",
(
eid,
title,
_normalize_category(category),
tags_json,
summary_text,
content.strip(),
created_by or "",
now,
now,
),
)
entry = get_entry(eid)
if not entry:
raise ValueError("创建失败")
return entry
def update_entry(
entry_id: str,
patch: dict[str, Any],
) -> dict[str, Any]:
current = get_entry(entry_id)
if not current:
raise ValueError("未找到该知识库条目")
title = str(patch.get("title", current["title"])).strip()
if not title:
raise ValueError("标题不能为空")
content = patch.get("content", current.get("content", ""))
if content is not None and not str(content).strip():
raise ValueError("内容不能为空")
category = _normalize_category(
str(patch.get("category", current.get("category", "general")))
)
tags = _parse_tags(patch.get("tags", current.get("tags", [])))
summary = str(patch.get("summary", current.get("summary", ""))).strip()
if content is not None:
content_str = str(content).strip()
else:
full = get_entry(entry_id, include_content=True)
content_str = str((full or {}).get("content") or "")
if not summary and content_str:
summary = content_str[:200]
now = _now_iso()
with db_conn() as conn:
conn.execute(
"""
UPDATE kb_entries SET
title = ?, category = ?, tags_json = ?, summary = ?, content = ?,
updated_at = ?
WHERE id = ? AND deleted_at IS NULL
""",
(
title,
category,
json.dumps(tags, ensure_ascii=False),
summary,
content_str,
now,
entry_id,
),
)
entry = get_entry(entry_id)
if not entry:
raise ValueError("更新失败")
return entry
def delete_entry(entry_id: str) -> None:
now = _now_iso()
with db_conn() as conn:
cur = conn.execute(
"UPDATE kb_entries SET deleted_at = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL",
(now, now, entry_id.strip()),
)
if cur.rowcount == 0:
raise ValueError("未找到该知识库条目")