/*
 * Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H

#include <condition_variable>
#include <list>
#include <mutex>
#include <stdint.h>
#include "MxBase/ErrorCode/ErrorCode.h"

namespace MxBase {
static const int DEFAULT_MAX_QUEUE_SIZE = 50000;

template<typename T> class BlockingQueue {
public:
    BlockingQueue(uint32_t maxSize = DEFAULT_MAX_QUEUE_SIZE) : maxSize_(maxSize), isStoped_(false) {}

    ~BlockingQueue() {}

    APP_ERROR Push(const T& item, bool isWait = false)
    {
        std::unique_lock<std::mutex> lck(mutex_);
        while (queue_.size() >= maxSize_ && isWait && !isStoped_) {
            fullCond_.wait(lck);
        }
        if (isStoped_) {
            return APP_ERR_QUEUE_STOPED;
        }
        if (queue_.size() >= maxSize_) {
            return APP_ERR_QUEUE_FULL;
        }
        queue_.push_back(item);
        emptyCond_.notify_one();
        return APP_ERR_OK;
    }

    APP_ERROR Push_Front(const T &item, bool isWait = false)
    {
        std::unique_lock<std::mutex> lck(mutex_);
        while (queue_.size() >= maxSize_ && isWait && !isStoped_) {
            fullCond_.wait(lck);
        }
        if (isStoped_) {
            return APP_ERR_QUEUE_STOPED;
        }
        if (queue_.size() >= maxSize_) {
            return APP_ERR_QUEUE_FULL;
        }
        queue_.push_front(item);
        emptyCond_.notify_one();
        return APP_ERR_OK;
    }

    APP_ERROR Pop(T &item)
    {
        std::unique_lock<std::mutex> lck(mutex_);
        while (queue_.empty() && !isStoped_) {
            emptyCond_.wait(lck);
        }
        if (isStoped_) {
            return APP_ERR_QUEUE_STOPED;
        }
        if (queue_.empty()) {
            return APP_ERR_QUEUE_EMPTY;
        } else {
            item = queue_.front();
            queue_.pop_front();
        }
        fullCond_.notify_one();
        return APP_ERR_OK;
    }

    APP_ERROR Pop(T& item, unsigned int timeOutMs)
    {
        std::unique_lock<std::mutex> lck(mutex_);
        auto realTime = std::chrono::milliseconds(timeOutMs);
        if (queue_.empty() && !isStoped_) {
            emptyCond_.wait_until(lck, std::chrono::steady_clock::now() + realTime);
        }
        if (isStoped_) {
            return APP_ERR_QUEUE_STOPED;
        }
        if (queue_.empty()) {
            return APP_ERR_QUEUE_EMPTY;
        } else {
            item = queue_.front();
            queue_.pop_front();
        }
        fullCond_.notify_one();
        return APP_ERR_OK;
    }

    void Stop()
    {
        {
            std::unique_lock<std::mutex> lck(mutex_);
            isStoped_ = true;
        }
        fullCond_.notify_all();
        emptyCond_.notify_all();
    }

    void Restart()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        isStoped_ = false;
    }

    APP_ERROR GetBackItem(T &item)
    {
        std::unique_lock<std::mutex> lck(mutex_);
        if (isStoped_) {
            return APP_ERR_QUEUE_STOPED;
        }
        if (queue_.empty()) {
            return APP_ERR_QUEUE_EMPTY;
        }
        item = queue_.back();
        return APP_ERR_OK;
    }

    // if the queue is stoped ,need call this function to release the unprocessed items
    std::list<T> GetRemainItems()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        if (!isStoped_) {
            return std::list<T>();
        }
        return queue_;
    }

    APP_ERROR IsEmpty()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        return queue_.empty();
    }

    APP_ERROR IsFull()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        return queue_.size() >= maxSize_;
    }

    int GetSize()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        return queue_.size();
    }

    void Clear()
    {
        std::unique_lock<std::mutex> lck(mutex_);
        queue_.clear();
    }

private:
    std::list<T> queue_;
    std::condition_variable fullCond_;
    std::condition_variable emptyCond_;
    std::mutex mutex_;
    uint32_t maxSize_;

    bool isStoped_;
};
}
#endif // __INC_BLOCKING_QUEUE_H__