/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * This source file is part of the Cangjie project, licensed under Apache-2.0
 * with Runtime Library Exception.
 *
 * See https://cangjie-lang.cn/pages/LICENSE for license information.
 */

package stdx.net.http

import std.collection.{ArrayList, LinkedList, LinkedListNode}
import std.sync.{AtomicBool, AtomicInt64, Mutex}
import stdx.log.{Logger, getGlobalLogger, LogLevel}

enum RejectPolicy {
    | Reject // ThreadPoolRejectException
    | Block // block until processed
    | Discard // discard this task
}

interface Task {
    func run(): Unit
    func close(): Unit
    func closeGracefully(): Unit
}

// a Task is a simple interface to run some task
// a Task will run by a Worker
open class FutureTask<T> <: Task {
    var value: ?T = None
    var closed = false

    FutureTask(let fn: () -> T, let connId!: ?UInt64, let onClose!: ?(() -> Unit), let onCloseGracefully!: ?(() -> Unit)) {}

    public open func run(): Unit {
        ThreadContext.connId = connId // set connection id for server logger
        value = fn()
        ThreadContext.connId = None // clear connection id
    }

    public open func close(): Unit {
        if (let Some(f) <- onClose) {
            f() // should not block
        }
        closed = true
    }

    public open func closeGracefully(): Unit {
        if (let Some(f) <- onCloseGracefully) {
            f()
        }
        closed = true
    }

    public open func get(): T {
        while (!closed) {
            return value ?? continue
        }
        throw ConcurrentException("Failed to get from FutureTask, task cancelled.")
    }

    public open func tryGet(): ?T {
        return value
    }
}

class DummyFutureTask<T> <: FutureTask<T> {
    DummyFutureTask() {
        super({=> throw NoneValueException()}, connId: None, onClose: {=>}, onCloseGracefully: {=>})
    }

    public func run(): Unit { /* nothing to do here */ }
    public func close(): Unit { /* nothing to do here */ }
    public func closeGracefully(): Unit { /* nothing to do here */ }

    public func get(): T {
        throw MethodUnsupportedException("Failed to get from DummyFutureTask.")
    }

    public func tryGet(): ?T {
        return None
    }
}

// a Blocking Queue
type TaskQueue = ClosableBlockingQueue<Task>

// a Worker will take Task from TaskQueue, and keep running until TaskQueue closed
class Worker {
    let taskQueue: TaskQueue
    let pool: CoroutinePool
    var runningTask: ?Task = None
    let taskMutex = Mutex()
    var selfNode: ?LinkedListNode<Worker> = None

    init(pool: CoroutinePool) {
        this.pool = pool
        this.taskQueue = pool.taskQueue
        spawn {
            run()
        }
    }

    init(task: Task, pool: CoroutinePool) {
        this.pool = pool
        this.taskQueue = pool.taskQueue
        runningTask = task
        pool.actives.fetchAdd(1)
        spawn {
            task.run()
            pool.actives.fetchSub(1)
            synchronized(taskMutex) {
                runningTask = None
            }

            run()
        }
    }

    // run until TaskQueue closed - receive None from TaskQueue
    private func run(): Unit {
        while (!pool.isClosed()) {
            if (let Some(task) <- taskQueue.dequeue(Duration.second * 5)) {
                synchronized(taskMutex) {
                    runningTask = task
                }
                pool.actives.fetchAdd(1)
                task.run()
                pool.actives.fetchSub(1)
                synchronized(taskMutex) {
                    runningTask = None
                }
            } else {
                if (let Some(v) <- selfNode) {
                    synchronized(pool.workersMutex) {
                        pool.workers.remove(v)
                    }
                    break
                }
            }
        }
    }

    func close() {
        synchronized(taskMutex) {
            if (let Some(task) <- runningTask) {
                task.close()
            }
        }
    }

    func closeGracefully() {
        if (let Some(task) <- runningTask) {
            task.closeGracefully()
        }
    }
}

class CoroutinePool {
    let taskQueue: TaskQueue
    let workers = LinkedList<Worker>()
    let workersMutex = Mutex()
    var closed: AtomicBool = AtomicBool(false)

    let actives = AtomicInt64(0)

    var _logger: Logger = getGlobalLogger()

    CoroutinePool(
        let preheatSize: Int64, // preheat size while start
        let capacity: Int64, // pool capacity
        let queueCapacity: Int64, // queue capacity
        let rejectPolicy!: RejectPolicy = Reject
    ) { // reject policy while queue is full
        assert(0 < capacity, "capacity should greater than 0, but got ${capacity}")
        assert(0 < queueCapacity, "queue capacity should greater than 0, but got ${queueCapacity}")
        assert(0 <= preheatSize && preheatSize <= capacity,
            "preheat size should between 0 and ${capacity}, but got ${preheatSize}")

        taskQueue = TaskQueue(queueCapacity)
        start()
    }

    mut prop logger: Logger {
        get() {
            return _logger
        }
        set(v) {
            _logger = v
        }
    }

    private func start(): Unit {
        // preheat
        for (_ in 0..preheatSize) {
            let worker = Worker(this)
            worker.selfNode = workers.addLast(worker)
        }
    }

    func submit<T>(
        fn: () -> T,
        connId!: ?UInt64,
        onClose!: ?(() -> Unit) = None,
        onCloseGracefully!: ?(() -> Unit) = None,
        rejectPolicy!: ?RejectPolicy = None
    ): FutureTask<T> {
        if (taskQueue.isClosed()) {
            throw ConcurrentException("Failed to execute task, queue is closed.")
        }

        let task = FutureTask(fn, connId: connId, onClose: onClose, onCloseGracefully: onCloseGracefully)

        // have idle worker && task queue not full  ==> add to queue
        // have idle worker && task queue is full   ==> !!!impossible!!!
        // no idle worker && task queue not full &&  worker.size < capacity  ==> create new worker
        // no idle worker && task queue not full &&  worker.size >= capacity ==> add to queue
        // no idle worker && task queue is full && worker.size < capacity    ==> create new worker
        // no idle worker && task queue is full && worker.size >= capacity   ==> !!!reject!!!

        match {
            case workerIsFree() => // add to queue
                taskEnqueue(task, "worker is free")

            case workers.size < capacity => // create new worker
                if (logger.enabled(LogLevel.TRACE)) {
                    httpLogTrace(logger, "[CoroutinePool#submit] Create new worker")
                }
                if (isClosed()) {
                    throw ConcurrentException("Failed to execute task, coroutine pool is closed.")
                }
                synchronized(workersMutex) {
                    let worker = Worker(task, this)
                    worker.selfNode = workers.addLast(worker)
                }
                if (logger.enabled(LogLevel.TRACE)) {
                    httpLogTrace(logger,
                        "[CoroutinePool#submit] Created new worker, current size/capacity: ${workers.size}/${capacity}")
                }

            case taskQueue.isFull() => // !!!reject!!!
                let policy = rejectPolicy ?? this.rejectPolicy
                match (policy) {
                    case Reject => throw CoroutinePoolRejectException("Pool is busy.")
                    case Block => taskEnqueue(task, "pool is busy, waiting...")
                    case Discard =>
                        if (logger.enabled(LogLevel.TRACE)) {
                            httpLogTrace(logger, "[CoroutinePool#submit] Pool is busy, discard task")
                        }
                        return DummyFutureTask<T>()
                }

            case _ => // add to queue
                taskEnqueue(task, "worker is full")
        }
        return task
    }

    func close(): Unit {
        synchronized(workersMutex) {
            closed.store(true) // mark first
            taskQueue.close() // close input stream
            while (let Some(task) <- taskQueue.tryDequeue()) { // cancel remaining task
                task.close()
            }
            for (worker in workers) { // cancel running task
                worker.close()
            }
        }
    }

    func closeGracefully(): Unit {
        synchronized(workersMutex) {
            closed.store(true) // mark first
            taskQueue.close() // workers will close after queue closed and emptied

            let futureList = ArrayList<Future<Unit>>()
            while (let Some(task) <- taskQueue.tryDequeue()) {
                let f = spawn {
                    task.closeGracefully()
                }
                futureList.add(f)
            }
            for (worker in workers) {
                let f = spawn {
                    worker.closeGracefully()
                }
                futureList.add(f)
            }
            for (f in futureList) {
                f.get()
            }
        }
    }

    func isClosed(): Bool {
        return closed.load()
    }

    private func taskEnqueue(task: Task, reason: String): Unit {
        if (logger.enabled(LogLevel.TRACE)) {
            httpLogTrace(logger, "[CoroutinePool#taskEnqueue] Begin, ${reason}")
        }
        if (!taskQueue.enqueue(task)) {
            throw ConcurrentException("Failed to execute task, queue is closed.")
        }
        if (logger.enabled(LogLevel.TRACE)) {
            httpLogTrace(logger, "[CoroutinePool#taskEnqueue] End")
        }
    }

    private func workerIsFree(): Bool {
        return workers.size > 10 && (workers.size / 2 > actives.load()) // 10 and 2 is magic number, should be adjustment
    }
}

func assert(flag: Bool, msg: String): Unit {
    if (flag) {
        return
    }
    throw IllegalArgumentException("${msg}")
}