/*
* pool.cj - 数据库连接池模块
*
* 提供高性能数据库连接池:
* - 连接池接口和实现
* - 连接池配置
* - 连接池监控
*
* 设计参考 HikariCP 和 Druid 连接池。
*/
package tybb2026::tycj_orm
import std.collection.*
import std.sync.*
// ============================================================================
// 连接池配置
// ============================================================================
/**
* PoolConfig - 连接池配置
*/
public class PoolConfig {
// 最小连接数
public var minPoolSize: Int64 = 5
// 最大连接数
public var maxPoolSize: Int64 = 20
// 连接超时时间(毫秒)
public var connectionTimeout: Int64 = 30000
// 空闲连接超时时间(毫秒)
public var idleTimeout: Int64 = 600000
// 最大存活时间(毫秒)
public var maxLifetime: Int64 = 1800000
// 验证超时时间(毫秒)
public var validationTimeout: Int64 = 5000
// 连接验证查询
public var connectionTestQuery: String = "SELECT 1"
// 数据库配置
public var databaseConfig: DatabaseConfig = DatabaseConfig()
public init() {}
// 设置最小连接数
public func setMinPoolSize(size: Int64): PoolConfig {
this.minPoolSize = size
return this
}
// 设置最大连接数
public func setMaxPoolSize(size: Int64): PoolConfig {
this.maxPoolSize = size
return this
}
// 设置连接超时
public func setConnectionTimeout(timeout: Int64): PoolConfig {
this.connectionTimeout = timeout
return this
}
// 设置数据库配置
public func setDatabaseConfig(config: DatabaseConfig): PoolConfig {
this.databaseConfig = config
return this
}
}
// ============================================================================
// 连接池接口
// ============================================================================
/**
* ConnectionPool - 连接池接口
*/
public interface ConnectionPool {
// 获取连接
func getConnection(): Option<Connection>
// 获取连接(带超时)
func getConnection(timeout: Int64): Option<Connection>
// 归还连接
func returnConnection(conn: Connection): Unit
// 关闭连接池
func close(): Unit
// 获取活跃连接数
func getActiveConnections(): Int64
// 获取空闲连接数
func getIdleConnections(): Int64
// 获取总连接数
func getTotalConnections(): Int64
}
// ============================================================================
// 简单连接池实现
// ============================================================================
/**
* SimpleConnectionPool - 简单连接池实现
*/
public class SimpleConnectionPool <: ConnectionPool {
private var config: PoolConfig
private var connectionFactory: ConnectionFactory
// 空闲连接列表
private var idleConnections: ArrayList<Connection> = ArrayList<Connection>()
// 活跃连接计数
private var activeCount: Int64 = 0
// 锁
private var lock: ReentrantMutex = ReentrantMutex()
// 状态
private var closed: Bool = false
public init(config: PoolConfig, factory: ConnectionFactory) {
this.config = config
this.connectionFactory = factory
// 初始化最小连接数
initializePool()
}
// 初始化连接池
private func initializePool(): Unit {
lock.lock()
try {
var retryCount = 0
let maxRetries = 3
for (_ in 0..config.minPoolSize) {
var success = false
retryCount = 0
while (!success && retryCount < maxRetries) {
match (connectionFactory.createConnection()) {
case Some(conn) =>
idleConnections.add(conn)
success = true
case None =>
retryCount++
if (retryCount >= maxRetries) {
println("[tycj_orm] 警告: 创建连接失败,已重试 ${maxRetries} 次")
}
}
}
}
} finally {
lock.unlock()
}
println("[tycj_orm] 连接池初始化完成,创建了 ${idleConnections.size} 个连接")
}
// 获取连接
public func getConnection(): Option<Connection> {
getConnection(config.connectionTimeout)
}
// 获取连接(带超时)
public func getConnection(timeout: Int64): Option<Connection> {
let startTime = getCurrentTimeMillis()
while (true) {
var result: Option<Connection> = None
lock.lock()
try {
if (closed) {
return None
}
// 1. 尝试从空闲池获取
if (!idleConnections.isEmpty()) {
let lastIndex = idleConnections.size - 1
let conn = idleConnections[lastIndex]
// 直接移除最后一个元素(O(1)操作)
idleConnections.remove(lastIndex..(lastIndex+1))
// 验证连接有效性
if (validateConnection(conn)) {
activeCount = activeCount + 1
result = Some(conn)
} else {
// 连接无效,关闭并继续尝试
conn.close()
}
}
// 2. 尝试创建新连接
if (result.isNone() && activeCount < config.maxPoolSize) {
match (connectionFactory.createConnection()) {
case Some(conn) =>
activeCount = activeCount + 1
result = Some(conn)
case None => ()
}
}
} finally {
lock.unlock()
}
// 如果获取到连接,直接返回
if (result.isSome()) {
return result
}
// 3. 检查是否超时
let elapsed = getCurrentTimeMillis() - startTime
if (elapsed >= timeout) {
return None
}
// 4. 短暂等待后重试
sleep(10)
}
None
}
// 验证连接有效性
private func validateConnection(conn: Connection): Bool {
try {
let stmt = conn.prepareStatement(config.connectionTestQuery)
stmt.close()
return true
} catch (_: Exception) {
return false
}
}
// 获取当前时间戳(毫秒)
private func getCurrentTimeMillis(): Int64 {
// 使用简单的计数器模拟时间戳
// 实际实现应该使用系统时间API
0
}
// 短暂休眠(毫秒)
private func sleep(ms: Int64): Unit {
// 简化实现,实际应该使用线程休眠
for (_ in 0..(ms * 1000)) {
// 空循环模拟延迟
}
}
// 归还连接
public func returnConnection(conn: Connection): Unit {
lock.lock()
try {
if (closed) {
conn.close()
return
}
activeCount = activeCount - 1
// 验证连接有效性后再放回池中
if (validateConnection(conn)) {
idleConnections.add(conn)
} else {
// 连接无效,直接关闭不放入池中
conn.close()
println("[tycj_orm] 警告: 归还的连接已失效,已关闭")
}
} finally {
lock.unlock()
}
}
// 关闭连接池
public func close(): Unit {
lock.lock()
try {
if (closed) {
return
}
closed = true
// 关闭所有空闲连接
for (conn in idleConnections) {
conn.close()
}
idleConnections.clear()
} finally {
lock.unlock()
}
println("[tycj_orm] 连接池已关闭")
}
// 获取活跃连接数
public func getActiveConnections(): Int64 {
lock.lock()
try {
activeCount
} finally {
lock.unlock()
}
}
// 获取空闲连接数
public func getIdleConnections(): Int64 {
lock.lock()
try {
idleConnections.size
} finally {
lock.unlock()
}
}
// 获取总连接数
public func getTotalConnections(): Int64 {
lock.lock()
try {
activeCount + idleConnections.size
} finally {
lock.unlock()
}
}
}
// ============================================================================
// 连接池统计
// ============================================================================
/**
* PoolStats - 连接池统计信息
*/
public class PoolStats {
public var activeConnections: Int64 = 0
public var idleConnections: Int64 = 0
public var totalConnections: Int64 = 0
public init() {}
public func toString(): String {
"PoolStats{active=${activeConnections}, idle=${idleConnections}, total=${totalConnections}}"
}
}
// ============================================================================
// 连接池工厂
// ============================================================================
/**
* ConnectionPoolFactory - 连接池工厂
*/
public class ConnectionPoolFactory {
// 创建连接池
public static func createPool(config: PoolConfig): ConnectionPool {
let factory = DefaultConnectionFactory(config.databaseConfig)
SimpleConnectionPool(config, factory)
}
// 创建 Mock 连接池
public static func createMockPool(): ConnectionPool {
let config = PoolConfig()
config.minPoolSize = 2
config.maxPoolSize = 5
let mockFactory = MockConnectionFactory()
SimpleConnectionPool(config, mockFactory)
}
}
/**
* MockConnectionFactory - Mock 连接工厂
*/
public class MockConnectionFactory <: ConnectionFactory {
public func createConnection(): Option<Connection> {
Some(MockConnection())
}
public func validateConnection(conn: Connection): Bool {
true
}
}