"""Postgres连接器"""
import logging
import urllib.parse
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from apps.models import Base
from .config import config
logger = logging.getLogger(__name__)
class Postgres:
"""Postgres连接器"""
engine: AsyncEngine
async def init(self) -> None:
"""初始化Postgres连接器"""
logger.info("[Postgres] 初始化Postgres连接器")
self.engine = create_async_engine(
f"postgresql+asyncpg://{urllib.parse.quote_plus(config.postgres.user)}:"
f"{urllib.parse.quote_plus(config.postgres.password)}@{config.postgres.host}:"
f"{config.postgres.port}/{config.postgres.database}",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=3600,
echo_pool=False,
)
logger.info("[Postgres] 创建表")
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close(self) -> None:
"""关闭Postgres连接器,释放所有连接"""
logger.info("[Postgres] 关闭Postgres连接器")
await self.engine.dispose()
@asynccontextmanager
async def session(self) -> AsyncGenerator[AsyncSession, None]:
"""
获取会话,每次从连接池创建新的数据库连接
注意: 此方法不会自动提交,调用方需要显式调用 session.commit()
"""
session = AsyncSession(
self.engine,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
try:
yield session
except Exception:
logger.warning("[Postgres] 会话错误,可能为SQL执行失败")
await session.rollback()
raise
finally:
await session.close()
postgres = Postgres()