/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
// The Cangjie API is in Beta. For details on its capabilities and limitations, please refer to the README file.
package std.database.sql
import std.collection.ArrayList
import std.collection.concurrent.LinkedBlockingQueue
import std.time.DateTime
import std.sync.{Timer, CatchupStyle, AtomicBool, Mutex}
class ResourcePool<T> <: Pool<T> & Equatable<ResourcePool<T>> {
let mux = Mutex()
let entryQueue: LinkedBlockingQueue<Entry<T>>
let idleEntry: LinkedBlockingQueue<Entry<T>>
var options: Options<T>
var ticker: Timer
var entryCount: Int32
var isclosed = AtomicBool(false)
init(options: Options<T>) {
this.options = options
entryQueue = LinkedBlockingQueue<Entry<T>>(Int64(options.maxSize))
idleEntry = LinkedBlockingQueue<Entry<T>>(Int64(options.maxSize))
entryCount = 0
ticker = Timer.once(Duration.hour) {=> ()}
backgroundCheckIdleLife()
}
/**
* Maximum number of connections in the connection pool. If the value is less than or equal to 0, the value is Int32.Max.
* If the configured value is smaller than the current value, the setting takes effect only after idle connections are reclaimed.
*/
public mut prop maxSize: Int32 {
get() {
options.maxSize
}
set(value) {
if (value <= 0) {
options.maxSize = Int32.Max
} else {
options.maxSize = value
}
}
}
/**
* Maximum number of idle connections. If the number of idle connections exceeds this value, the excess connections are disconnected.
* If the value is less than or equal to 0, Int32.Max is used.
*/
public mut prop maxIdleSize: Int32 {
get() {
options.maxIdleSize
}
set(value) {
if (value <= 0) {
options.maxIdleSize = Int32.Max
} else {
options.maxIdleSize = value
}
}
}
func backgroundCheckIdleLife() {
ticker.cancel()
ticker = Timer.repeat(Duration.second, options.keepaliveTime, checkLife, style: Delay)
}
func checkLife(): Unit {
checkMaxLifeTime()
checkIdleTimeout()
}
func checkMaxLifeTime() {
let count: Int64
synchronized(mux) {
count = entryQueue.size
}
for (_ in 0..count) {
let item = entryQueue.tryRemove()
match (item) {
case Some(v) =>
if (v.isValid && DateTime.now().toUnixTimeStamp() - v.creationTime > options.maxLifeTime) {
v.isValid = false
this.discard(v)
}
case _ =>
eprintln("can not try get idle entry")
break
}
}
}
func checkIdleTimeout() {
let idleCount: Int64
synchronized(mux) {
idleCount = idleEntry.size
}
let list = ArrayList<Entry<T>>()
for (_ in 0..idleCount) {
let item = idleEntry.tryRemove()
match (item) {
case Some(v) =>
if (v.isValid) {
appendLegalIdleEntry(v, idleCount, list)
}
case _ =>
eprintln("can not try get idle entry")
break
}
}
for (entry in list) {
this.release(entry)
}
}
func getIdleEntry(timeout: Duration): Entry<T> {
if (let Some(v) <- idleEntry.remove(timeout)) {
if (v.isValid) {
return v
}
} else {
throw SqlException("Acquired entries exceed maxSize(${maxSize})")
}
let canCreate = synchronized(mux) {
if (entryCount >= options.maxSize) {
false
} else {
entryCount++
true
}
}
if (!canCreate) {
throw SqlException("Acquired entries exceed maxSize(${maxSize})")
}
try {
return createEntry()
} catch (e: Exception) {
synchronized(mux) {
entryCount--
}
throw e
}
}
public func acquire(timeout: Duration): Option<Entry<T>> {
let shouldCreate: Bool
synchronized(mux) {
shouldCreate = idleEntry.size == 0 && entryCount < options.maxSize
if (shouldCreate) {
entryCount++
}
}
if (shouldCreate) {
try {
return createEntry()
} catch (e: Exception) {
synchronized(mux) {
if (entryCount > 0) {
entryCount--
}
}
throw e
}
}
return getIdleEntry(timeout)
}
func createEntry(): Entry<T> {
var value = options.constructor()
match (value) {
case Some(v) =>
let entry = Entry<T>(this, v)
entryQueue.add(entry)
return entry
case None => throw Exception("constructor entry failed")
}
}
public func release(entry: Entry<T>): Option<Unit> {
match (entry.pool as ResourcePool<T>) {
case Some(p) => if (p != this) {
throw SqlException("the entry is owned by another pool")
}
case None => throw SqlException("the entry is not owned by any pool")
}
if (DateTime.now().toUnixTimeStamp() - entry.creationTime > options.maxLifeTime) {
entry.isValid = false
this.discard(entry)
} else {
idleEntry.add(entry)
}
return
}
public func discard(entry: Entry<T>): Option<Unit> {
match (entry.pool as ResourcePool<T>) {
case Some(p) => if (p != this) {
throw SqlException("the entry is owned by another pool")
}
case None => throw SqlException("the entry is not owned by any pool")
}
synchronized(mux) {
if (entryCount > 0) {
entryCount--
}
}
options.destructor(entry.value)
return
}
public func isClosed() {
return isclosed.load()
}
public func close() {
if (!isclosed.compareAndSwap(false, true)) { // compareAndSwap return true if the value equal first parameter.
return
}
for (_ in 0..idleEntry.size) {
var item = idleEntry.tryRemove()
match (item) {
case Some(v) =>
if (v.isValid) {
v.isValid = false
this.discard(v)
}
case None =>
eprintln("can not try get idle entry")
break
}
}
}
public operator func !=(other: ResourcePool<T>): Bool {
return !(this == other)
}
public operator func ==(other: ResourcePool<T>): Bool {
return refEq(this, other)
}
func appendLegalIdleEntry(v: Entry<T>, idleCount: Int64, list: ArrayList<Entry<T>>): Unit {
var idle = v.idleDuration()
if ((idle > options.idleTimeout || Int32(idleCount) > options.maxIdleSize) || (DateTime.now().
toUnixTimeStamp() - v.creationTime > options.maxLifeTime)) {
try {
v.isValid = false
this.discard(v)
} catch (e: Exception) {
eprintln("exception ${e.message}")
}
} else {
list.add(v)
}
}
}