/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * This source file is part of the Cangjie project, licensed under Apache-2.0
 * with Runtime Library Exception.
 *
 * See https://cangjie-lang.cn/pages/LICENSE for license information.
 */

/**
 * @file
 *
 * This file defines a ConcurrentPriorityQueue class for a given capacity (Default 10).
 *
 */

package stdx.actors

import std.collection.concurrent.ArrayBlockingQueue

private const DEFAULT_NUM_OF_PRIORITIES = 10

class ConcurrentPriorityQueue<T> {
    private let numOfPriorities: Int64
    private let queuePool: Array<ArrayBlockingQueue<T>>
    private let semaphore = SingleConsumerSemaphore()

    init(capacity: Int64) {
        this(DEFAULT_NUM_OF_PRIORITIES, capacity)
    }

    init(numOfPriorities: Int64, capacity: Int64) {
        this.numOfPriorities = numOfPriorities
        this.queuePool = Array<ArrayBlockingQueue<T>>(numOfPriorities, { _ => ArrayBlockingQueue<T>(capacity) })
    }

    func add(item: T, priority: Int64): Unit {
        if (priority <= 0 || priority > numOfPriorities) {
            throw IllegalArgumentException("Priority must be in the range [1..${numOfPriorities}].")
        }
        queuePool[priority - 1].add(item)
        semaphore.signal()
    }

    func remove(timeout: Duration): Option<T> {
        if (!semaphore.wait(timeout)) {
            return None
        }

        var index: Int64 = -1
        for (i in 0..numOfPriorities) {
            if (queuePool[i].size > 0) {
                index = i
            }
        }
        if (index != -1) {
            return queuePool[index].remove()
        }

        return None
    }
}