/*
* 事务管理模块
* 提供声明式和编程式事务支持
*/
package tybb2026::tycj_orm
import std.collection.*
/*
* 事务隔离级别枚举
*/
public enum IsolationLevel {
| Default
| ReadUncommitted
| ReadCommitted
| RepeatableRead
| Serializable
}
/*
* 事务传播行为枚举
*/
public enum Propagation {
| Required
| Supports
| Mandatory
| RequiresNew
| NotSupported
| Never
| Nested
}
/*
* 事务定义类
* 用于配置事务属性
*/
public class TransactionDefinition {
/* 传播行为 */
public var propagation: Propagation
/* 隔离级别 */
public var isolation: IsolationLevel
/* 超时时间(秒,-1 表示不超时) */
public var timeout: Int64
/* 是否只读 */
public var readOnly: Bool
public init() {
this.propagation = Propagation.Required
this.isolation = IsolationLevel.Default
this.timeout = -1
this.readOnly = false
}
}
/*
* 事务状态接口
* 表示当前事务的状态
*/
public interface TransactionStatus {
/*
* 是否是新事务
*/
func isNewTransaction(): Bool
/*
* 设置回滚标记
*/
func setRollbackOnly(): Unit
/*
* 是否标记为回滚
*/
func isRollbackOnly(): Bool
/*
* 事务是否已完成
*/
func isCompleted(): Bool
}
/*
* 默认事务状态实现类
*/
public class DefaultTransactionStatus <: TransactionStatus {
/* 是否是新事务 */
private var newTransaction: Bool
/* 是否标记回滚 */
private var rollbackOnly: Bool = false
/* 是否已完成 */
private var completed: Bool = false
public init(newTransaction!: Bool = true) {
this.newTransaction = newTransaction
}
public func isNewTransaction(): Bool {
return newTransaction
}
public func setRollbackOnly(): Unit {
this.rollbackOnly = true
}
public func isRollbackOnly(): Bool {
return rollbackOnly
}
public func isCompleted(): Bool {
return completed
}
/*
* 标记事务完成
*/
public func setCompleted(): Unit {
this.completed = true
}
}
/*
* 事务管理器接口
* 定义事务管理的核心操作
*/
public interface TransactionManager {
/*
* 获取事务
* @param definition 事务定义
* @return 事务状态
*/
func getTransaction(definition: TransactionDefinition): TransactionStatus
/*
* 提交事务
* @param status 事务状态
*/
func commit(status: TransactionStatus): Unit
/*
* 回滚事务
* @param status 事务状态
*/
func rollback(status: TransactionStatus): Unit
}
/*
* 事务模板类
* 提供编程式事务支持,简化事务操作
*/
public class TransactionTemplate {
/* 事务管理器 */
private var transactionManager: TransactionManager
/* 事务定义 */
private var transactionDefinition: TransactionDefinition
public init(transactionManager: TransactionManager, definition!: TransactionDefinition = TransactionDefinition()) {
this.transactionManager = transactionManager
this.transactionDefinition = definition
}
/*
* 在事务中执行操作
* @param action 事务内执行的回调函数
* @return 回调函数的返回值
*/
public func execute<T>(action: (TransactionStatus) -> T): T {
let status = transactionManager.getTransaction(transactionDefinition)
try {
let result = action(status)
if (!status.isRollbackOnly()) {
transactionManager.commit(status)
} else {
transactionManager.rollback(status)
}
return result
} catch (e: Exception) {
transactionManager.rollback(status)
throw e
}
}
/*
* 在事务中执行操作(无返回值)
* @param action 事务内执行的回调函数
*/
public func executeWithoutResult(action: (TransactionStatus) -> Unit): Unit {
let status = transactionManager.getTransaction(transactionDefinition)
try {
action(status)
if (!status.isRollbackOnly()) {
transactionManager.commit(status)
} else {
transactionManager.rollback(status)
}
} catch (e: Exception) {
transactionManager.rollback(status)
throw e
}
}
}
/*
* 事务对象
* 封装数据库连接和事务属性
*/
public class Transaction {
/* 数据库连接 */
private let connection: Connection
/* 事务定义 */
private let definition: TransactionDefinition
/* 是否已完成 */
private var completed: Bool = false
public init(connection: Connection, definition: TransactionDefinition) {
this.connection = connection
this.definition = definition
}
/*
* 获取数据库连接
* @return 数据库连接
*/
public func getConnection(): Connection {
return connection
}
/*
* 获取事务定义
* @return 事务定义
*/
public func getDefinition(): TransactionDefinition {
return definition
}
/*
* 提交事务
*/
public func commit(): Unit {
if (!completed) {
connection.commit()
completed = true
}
}
/*
* 回滚事务
*/
public func rollback(): Unit {
if (!completed) {
connection.rollback()
completed = true
}
}
/*
* 是否已完成
* @return 是否已完成
*/
public func isCompleted(): Bool {
return completed
}
}
/*
* 线程本地存储
* 用于存储当前线程的事务上下文
*/
public class ThreadLocal<T> {
/* 存储值 */
private var value: Option<T> = None
public init() {}
/*
* 获取值
* @return 值选项
*/
public func get(): Option<T> {
return value
}
/*
* 设置值
* @param newValue 新值
*/
public func set(newValue: Option<T>): Unit {
this.value = newValue
}
/*
* 清除值
*/
public func remove(): Unit {
this.value = None
}
}
/*
* 事务持有者
* 存储当前线程的事务信息
*/
public class TransactionHolder {
/* 事务对象 */
private let transaction: Transaction
/* 是否是新事务 */
private let newTransaction: Bool
public init(transaction!: Transaction, newTransaction!: Bool = true) {
this.transaction = transaction
this.newTransaction = newTransaction
}
/*
* 获取事务对象
* @return 事务对象
*/
public func getTransaction(): Transaction {
return transaction
}
/*
* 是否是新事务
* @return 是否是新事务
*/
public func isNewTransaction(): Bool {
return newTransaction
}
}
/*
* 数据源事务管理器
* 基于 DataSource 的事务管理器实现
*/
public class DataSourceTransactionManager <: TransactionManager {
/* 数据源 */
private let dataSource: DataSource
/* 线程本地存储(事务持有者) */
private let transactionHolderThreadLocal: ThreadLocal<TransactionHolder>
/* 线程本地存储(事务状态) */
private let statusThreadLocal: ThreadLocal<DefaultTransactionStatus>
public init(dataSource: DataSource) {
this.dataSource = dataSource
this.transactionHolderThreadLocal = ThreadLocal<TransactionHolder>()
this.statusThreadLocal = ThreadLocal<DefaultTransactionStatus>()
}
/*
* 获取事务
* @param definition 事务定义
* @return 事务状态
*/
public func getTransaction(definition: TransactionDefinition): TransactionStatus {
// 检查是否已有事务
match (transactionHolderThreadLocal.get()) {
case Some(holder) =>
// 已有事务,根据传播行为处理
return handleExistingTransaction(definition, holder)
case None =>
// 没有事务,根据传播行为创建新事务
return startNewTransaction(definition)
}
}
/*
* 处理已存在的事务
*/
private func handleExistingTransaction(definition: TransactionDefinition, holder: TransactionHolder): TransactionStatus {
match (definition.propagation) {
case Propagation.Required =>
// 加入现有事务
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case Propagation.Supports =>
// 加入现有事务或以非事务执行
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case Propagation.Mandatory =>
// 必须有事务,已有则加入
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case Propagation.RequiresNew =>
// 挂起现有事务,创建新事务
return startNewTransaction(definition)
case Propagation.NotSupported =>
// 以非事务执行
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case Propagation.Never =>
// 不允许有事务
throw OrmException("Existing transaction found for NEVER propagation")
case Propagation.Nested =>
// 创建嵌套事务
let status = DefaultTransactionStatus(newTransaction: true)
statusThreadLocal.set(Some(status))
return status
}
}
/*
* 启动新事务
*/
private func startNewTransaction(definition: TransactionDefinition): TransactionStatus {
match (definition.propagation) {
case Propagation.Mandatory =>
throw OrmException("No existing transaction found for MANDATORY propagation")
case Propagation.Never =>
// 以非事务执行
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case Propagation.NotSupported =>
// 以非事务执行
let status = DefaultTransactionStatus(newTransaction: false)
statusThreadLocal.set(Some(status))
return status
case _ =>
// 创建新事务
match (dataSource.getConnection()) {
case Some(conn) =>
let transaction = Transaction(conn, definition)
let holder = TransactionHolder(transaction: transaction, newTransaction: true)
transactionHolderThreadLocal.set(Some(holder))
let status = DefaultTransactionStatus(newTransaction: true)
statusThreadLocal.set(Some(status))
return status
case None =>
throw OrmException("Failed to get connection for transaction")
}
}
}
/*
* 提交事务
* @param status 事务状态
*/
public func commit(status: TransactionStatus): Unit {
if (status.isCompleted()) {
throw OrmException("Transaction is already completed")
}
if (status.isRollbackOnly()) {
rollback(status)
return
}
match (transactionHolderThreadLocal.get()) {
case Some(holder) =>
if (holder.isNewTransaction()) {
holder.getTransaction().commit()
holder.getTransaction().getConnection().close()
transactionHolderThreadLocal.remove()
statusThreadLocal.remove()
}
case None => ()
}
match (status) {
case defaultStatus: DefaultTransactionStatus =>
defaultStatus.setCompleted()
case _ => ()
}
}
/*
* 回滚事务
* @param status 事务状态
*/
public func rollback(status: TransactionStatus): Unit {
if (status.isCompleted()) {
throw OrmException("Transaction is already completed")
}
match (transactionHolderThreadLocal.get()) {
case Some(holder) =>
if (holder.isNewTransaction()) {
holder.getTransaction().rollback()
holder.getTransaction().getConnection().close()
transactionHolderThreadLocal.remove()
statusThreadLocal.remove()
}
case None => ()
}
match (status) {
case defaultStatus: DefaultTransactionStatus =>
defaultStatus.setCompleted()
case _ => ()
}
}
/*
* 获取当前事务的连接
* @return 连接选项
*/
public func getCurrentConnection(): Option<Connection> {
match (transactionHolderThreadLocal.get()) {
case Some(holder) =>
return Some(holder.getTransaction().getConnection())
case None =>
return None
}
}
/*
* 判断当前是否有事务
* @return 是否有事务
*/
public func hasTransaction(): Bool {
match (transactionHolderThreadLocal.get()) {
case Some(_) => return true
case None => return false
}
}
}