/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.sync.{AtomicInt64, AtomicUInt64, Timer, AtomicBool}
interface Pool<T> {
func put(e: T): Unit
func get(): T
}
// concurrently safe, lock free
// max capacity is UInt32.Max
// for cases when put on one thread and get on another
class ConcurrentRingPool<T> <: Pool<T> {
var ring: ?ConcurrentRing<T>
var victim: ?ConcurrentRing<T> = None
let capacity: Int64
let threshold: Int64
let newFn: () -> T
let resetFn: ?((T) -> Unit)
let putLock = AtomicBool(false)
var timer: ?Timer = None
var needClean = true
init(capacity: Int64, threshold: Int64, newFn!: () -> T, resetFn!: ?((T) -> Unit) = None) {
ring = ConcurrentRing<T>(capacity)
this.capacity = capacity
this.threshold = threshold
this.newFn = newFn
this.resetFn = resetFn
timer = Timer.repeat(Duration.second * 2, Duration.second * 2, clean, style: Skip)
}
func clean(): Unit {
if (!needClean) {
needClean = true
if (let Some(r) <- ring) {
if (r.size > threshold) {
victim = ring
ring = None
return
}
}
} else {
if (let Some(r) <- ring) {
victim = ring
ring = None
return
}
}
victim = None
}
public func put(elem: T): Unit {
if (needClean) {
needClean = false
}
while (true) {
match (ring) {
case None =>
if (!putLock.compareAndSwap(false, true)) {
continue
}
ring = ConcurrentRing<T>(capacity)
putLock.compareAndSwap(true, false)
case Some(r) =>
r.put(elem)
return
}
}
}
public func get(): T {
if (needClean) {
needClean = false
}
if (let Some(r) <- ring) {
if (let Some(elem) <- r.get()) {
if (let Some(fn) <- resetFn) {
fn(elem)
}
return elem
}
}
if (let Some(r) <- victim) {
if (let Some(elem) <- r.get()) {
if (let Some(fn) <- resetFn) {
fn(elem)
}
return elem
}
}
return newFn()
}
func close(): Unit {
timer?.cancel()
}
}
// should not put concurrently or get concurrently
// but put and get can be located in different threads
class ConcurrentRing<T> {
let data: Array<Option<T>>
let capacity: UInt64
let getTicket = AtomicUInt64(0)
let putTicket = AtomicUInt64(0)
init(capacity: Int64) {
this.data = Array<Option<T>>(capacity, repeat: None)
this.capacity = UInt64(capacity)
}
prop size: Int64 {
get() {
Int64(putTicket.load() - getTicket.load())
}
}
func put(elem: T): Unit {
while (true) {
let ticket = putTicket.load()
let ticketIdx = Int64(ticket % capacity)
if (ticket >= getTicket.load() + capacity - 1) {
return
}
match (data[ticketIdx]) {
case Some(_) => return
case None =>
let nextIdx = (ticketIdx + 1) % Int64(capacity)
if (data[nextIdx].isSome()) {
return
}
if (putTicket.compareAndSwap(ticket, ticket + 1)) {
data[ticketIdx] = elem
return
}
}
}
}
func get(): ?T {
while (true) {
let ticket = getTicket.load()
let ticketIdx = Int64(ticket % capacity)
match (data[ticketIdx]) {
case None => return None
case Some(elem) =>
if (getTicket.compareAndSwap(ticket, ticket + 1)) {
data[ticketIdx] = None
return elem
}
}
}
return None
}
}
class PutSafeRingPool<T> <: Pool<T> {
let data: Array<Option<T>>
let capacity: Int64
let newFn: () -> T
let resetFn: ?((T) -> Unit)
var putTicket: AtomicInt64 = AtomicInt64(0)
var tail = 0
init(capacity: Int64, newFn!: () -> T, resetFn!: ?((T) -> Unit) = None) {
if (capacity <= 0) {
throw HttpException("InternalError, pool capacity must >= 0, current capacity = ${capacity}.")
}
this.data = Array<Option<T>>(capacity + 1, repeat: None)
this.capacity = capacity
this.newFn = newFn
this.resetFn = resetFn
}
public func put(elem: T): Unit {
while (true) {
let h = putTicket.load()
match (data[h]) {
case Some(_) => return
case None =>
if (putTicket.compareAndSwap(h, (h + 1) % capacity)) {
data[h] = elem
return
}
}
}
}
public func get(): T {
return match (data[tail]) {
case None => newFn()
case Some(elem) =>
data[tail] = None
tail++
tail %= capacity
if (let Some(fn) <- resetFn) {
fn(elem)
}
elem
}
}
}
class ArrayWrapper {
private let _raw: Array<Byte>
private var _size: Int64 = 0 // the size to be used
static let empty = ArrayWrapper(0)
init(rawSize: Int64) {
_raw = Array<Byte>(rawSize, repeat: 0)
_size = rawSize
}
init(raw: Array<Byte>) {
_raw = raw
_size = raw.size
}
prop data: Array<Byte> {
get() {
_raw[.._size]
}
}
mut prop size: Int64 {
get() {
_size
}
set(v) {
_size = v
}
}
prop rawSize: Int64 {
get() {
_raw.size
}
}
}
// cache h2 body
class ArrayPool {
let array_9: ConcurrentRingPool<ArrayWrapper>
let array_32: ConcurrentRingPool<ArrayWrapper>
let array_128: ConcurrentRingPool<ArrayWrapper>
let array_512: ConcurrentRingPool<ArrayWrapper>
let array_2048: ConcurrentRingPool<ArrayWrapper>
let array_4096: ConcurrentRingPool<ArrayWrapper>
let array_8192: ConcurrentRingPool<ArrayWrapper>
let array_16384: ConcurrentRingPool<ArrayWrapper>
init(capacity: Int64, threshold: Int64) {
array_9 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(9)})
array_32 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(32)})
array_128 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(128)})
array_512 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(512)})
array_2048 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(2048)})
array_4096 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(4096)})
array_8192 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(8192)})
array_16384 = ConcurrentRingPool<ArrayWrapper>(capacity, threshold, newFn: {=> ArrayWrapper(16384)})
}
func get(size: Int64): ArrayWrapper {
let array = match {
case size <= 9 => array_9.get()
case size <= 32 => array_32.get()
case size <= 128 => array_128.get()
case size <= 512 => array_512.get()
case size <= 2048 => array_2048.get()
case size <= 4096 => array_4096.get()
case size <= 8192 => array_8192.get()
case size <= 16384 => array_16384.get()
case _ => ArrayWrapper(size)
}
array.size = size
return array
}
func put(item: ArrayWrapper): Unit {
match (item.rawSize) {
case 9 => array_9.put(item)
case 32 => array_32.put(item)
case 128 => array_128.put(item)
case 512 => array_512.put(item)
case 2048 => array_2048.put(item)
case 4096 => array_4096.put(item)
case 8192 => array_8192.put(item)
case 16384 => array_16384.put(item)
case _ => ()
}
}
func close(): Unit {
array_9.close()
array_32.close()
array_128.close()
array_512.close()
array_2048.close()
array_4096.close()
array_8192.close()
array_16384.close()
}
}