/*
* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* This source file is part of the Cangjie project, licensed under Apache-2.0
* with Runtime Library Exception.
*
* See https://cangjie-lang.cn/pages/LICENSE for license information.
*/
package stdx.net.http
import std.collection.{ArrayList, LinkedList, LinkedListNode}
import std.sync.{AtomicBool, AtomicInt64, Mutex}
import stdx.log.{Logger, getGlobalLogger, LogLevel}
enum RejectPolicy {
| Reject // ThreadPoolRejectException
| Block // block until processed
| Discard // discard this task
}
interface Task {
func run(): Unit
func close(): Unit
func closeGracefully(): Unit
}
// a Task is a simple interface to run some task
// a Task will run by a Worker
open class FutureTask<T> <: Task {
var value: ?T = None
var closed = false
FutureTask(let fn: () -> T, let connId!: ?UInt64, let onClose!: ?(() -> Unit), let onCloseGracefully!: ?(() -> Unit)) {}
public open func run(): Unit {
ThreadContext.connId = connId // set connection id for server logger
value = fn()
ThreadContext.connId = None // clear connection id
}
public open func close(): Unit {
if (let Some(f) <- onClose) {
f() // should not block
}
closed = true
}
public open func closeGracefully(): Unit {
if (let Some(f) <- onCloseGracefully) {
f()
}
closed = true
}
public open func get(): T {
while (!closed) {
return value ?? continue
}
throw ConcurrentException("Failed to get from FutureTask, task cancelled.")
}
public open func tryGet(): ?T {
return value
}
}
class DummyFutureTask<T> <: FutureTask<T> {
DummyFutureTask() {
super({=> throw NoneValueException()}, connId: None, onClose: {=>}, onCloseGracefully: {=>})
}
public func run(): Unit { /* nothing to do here */ }
public func close(): Unit { /* nothing to do here */ }
public func closeGracefully(): Unit { /* nothing to do here */ }
public func get(): T {
throw MethodUnsupportedException("Failed to get from DummyFutureTask.")
}
public func tryGet(): ?T {
return None
}
}
// a Blocking Queue
type TaskQueue = ClosableBlockingQueue<Task>
// a Worker will take Task from TaskQueue, and keep running until TaskQueue closed
class Worker {
let taskQueue: TaskQueue
let pool: CoroutinePool
var runningTask: ?Task = None
let taskMutex = Mutex()
var selfNode: ?LinkedListNode<Worker> = None
init(pool: CoroutinePool) {
this.pool = pool
this.taskQueue = pool.taskQueue
spawn {
run()
}
}
init(task: Task, pool: CoroutinePool) {
this.pool = pool
this.taskQueue = pool.taskQueue
runningTask = task
pool.actives.fetchAdd(1)
spawn {
task.run()
pool.actives.fetchSub(1)
synchronized(taskMutex) {
runningTask = None
}
run()
}
}
// run until TaskQueue closed - receive None from TaskQueue
private func run(): Unit {
while (!pool.isClosed()) {
if (let Some(task) <- taskQueue.dequeue(Duration.second * 5)) {
synchronized(taskMutex) {
runningTask = task
}
pool.actives.fetchAdd(1)
task.run()
pool.actives.fetchSub(1)
synchronized(taskMutex) {
runningTask = None
}
} else {
if (let Some(v) <- selfNode) {
synchronized(pool.workersMutex) {
pool.workers.remove(v)
}
break
}
}
}
}
func close() {
synchronized(taskMutex) {
if (let Some(task) <- runningTask) {
task.close()
}
}
}
func closeGracefully() {
if (let Some(task) <- runningTask) {
task.closeGracefully()
}
}
}
class CoroutinePool {
let taskQueue: TaskQueue
let workers = LinkedList<Worker>()
let workersMutex = Mutex()
var closed: AtomicBool = AtomicBool(false)
let actives = AtomicInt64(0)
var _logger: Logger = getGlobalLogger()
CoroutinePool(
let preheatSize: Int64, // preheat size while start
let capacity: Int64, // pool capacity
let queueCapacity: Int64, // queue capacity
let rejectPolicy!: RejectPolicy = Reject
) { // reject policy while queue is full
assert(0 < capacity, "capacity should greater than 0, but got ${capacity}")
assert(0 < queueCapacity, "queue capacity should greater than 0, but got ${queueCapacity}")
assert(0 <= preheatSize && preheatSize <= capacity,
"preheat size should between 0 and ${capacity}, but got ${preheatSize}")
taskQueue = TaskQueue(queueCapacity)
start()
}
mut prop logger: Logger {
get() {
return _logger
}
set(v) {
_logger = v
}
}
private func start(): Unit {
// preheat
for (_ in 0..preheatSize) {
let worker = Worker(this)
worker.selfNode = workers.addLast(worker)
}
}
func submit<T>(
fn: () -> T,
connId!: ?UInt64,
onClose!: ?(() -> Unit) = None,
onCloseGracefully!: ?(() -> Unit) = None,
rejectPolicy!: ?RejectPolicy = None
): FutureTask<T> {
if (taskQueue.isClosed()) {
throw ConcurrentException("Failed to execute task, queue is closed.")
}
let task = FutureTask(fn, connId: connId, onClose: onClose, onCloseGracefully: onCloseGracefully)
// have idle worker && task queue not full ==> add to queue
// have idle worker && task queue is full ==> !!!impossible!!!
// no idle worker && task queue not full && worker.size < capacity ==> create new worker
// no idle worker && task queue not full && worker.size >= capacity ==> add to queue
// no idle worker && task queue is full && worker.size < capacity ==> create new worker
// no idle worker && task queue is full && worker.size >= capacity ==> !!!reject!!!
match {
case workerIsFree() => // add to queue
taskEnqueue(task, "worker is free")
case workers.size < capacity => // create new worker
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[CoroutinePool#submit] Create new worker")
}
if (isClosed()) {
throw ConcurrentException("Failed to execute task, coroutine pool is closed.")
}
synchronized(workersMutex) {
let worker = Worker(task, this)
worker.selfNode = workers.addLast(worker)
}
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger,
"[CoroutinePool#submit] Created new worker, current size/capacity: ${workers.size}/${capacity}")
}
case taskQueue.isFull() => // !!!reject!!!
let policy = rejectPolicy ?? this.rejectPolicy
match (policy) {
case Reject => throw CoroutinePoolRejectException("Pool is busy.")
case Block => taskEnqueue(task, "pool is busy, waiting...")
case Discard =>
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[CoroutinePool#submit] Pool is busy, discard task")
}
return DummyFutureTask<T>()
}
case _ => // add to queue
taskEnqueue(task, "worker is full")
}
return task
}
func close(): Unit {
synchronized(workersMutex) {
closed.store(true) // mark first
taskQueue.close() // close input stream
while (let Some(task) <- taskQueue.tryDequeue()) { // cancel remaining task
task.close()
}
for (worker in workers) { // cancel running task
worker.close()
}
}
}
func closeGracefully(): Unit {
synchronized(workersMutex) {
closed.store(true) // mark first
taskQueue.close() // workers will close after queue closed and emptied
let futureList = ArrayList<Future<Unit>>()
while (let Some(task) <- taskQueue.tryDequeue()) {
let f = spawn {
task.closeGracefully()
}
futureList.add(f)
}
for (worker in workers) {
let f = spawn {
worker.closeGracefully()
}
futureList.add(f)
}
for (f in futureList) {
f.get()
}
}
}
func isClosed(): Bool {
return closed.load()
}
private func taskEnqueue(task: Task, reason: String): Unit {
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[CoroutinePool#taskEnqueue] Begin, ${reason}")
}
if (!taskQueue.enqueue(task)) {
throw ConcurrentException("Failed to execute task, queue is closed.")
}
if (logger.enabled(LogLevel.TRACE)) {
httpLogTrace(logger, "[CoroutinePool#taskEnqueue] End")
}
}
private func workerIsFree(): Bool {
return workers.size > 10 && (workers.size / 2 > actives.load()) // 10 and 2 is magic number, should be adjustment
}
}
func assert(flag: Bool, msg: String): Unit {
if (flag) {
return
}
throw IllegalArgumentException("${msg}")
}