* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Simple queue to pass data among threads.
*/
#ifndef DATASYSTEM_COMMON_UTIL_QUEUE_H
#define DATASYSTEM_COMMON_UTIL_QUEUE_H
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include "datasystem/common/util/status_helper.h"
namespace datasystem {
static constexpr int QUE_NO_TIMEOUT = -1;
* @brief It is a simple thread safe fixed size queue for the producer/consumer problem.
* The name of the APIs are mostly taken from the Java BlockingQueue counterpart.
* <ol>
* <li> Put -- Push an element into the back of the queue. The operation blocks if the queue is full.
* <li> Add -- Like Put but return error if the queue is full.
* <li> Offer -- Like Put but with a timeout. Return error if the insert is not successful before the timeout expires..
* <li> Take -- Pop an element from the front of the queue The operation blocks if the queue is empty.
* <li> Remove -- Like Take but return error if the queue is empty.
* <li> Poll -- Like Take but with a timeout. Return error if the pop is not successful before the timeout expires.
* </ol>
* @tparam T Type of element of the queue.
*/
template <typename T>
class Queue final {
public:
typedef T value_type;
typedef T *pointer;
typedef const T *const_pointer;
typedef T &reference;
typedef const T &const_reference;
explicit Queue(size_t capacity) : capacity_(capacity), head_(0), tail_(0), hwm_(0)
{
buf_ = std::make_unique<T[]>(capacity_);
}
void Reset(size_t capacity)
{
capacity_ = capacity;
buf_ = std::make_unique<T[]>(capacity_);
head_ = 0;
tail_ = 0;
hwm_ = 0;
}
~Queue() = default;
Queue(const Queue &) = delete;
Queue(Queue &&other) noexcept = delete;
Queue &operator=(const Queue &) = delete;
Queue &operator=(Queue &&other) noexcept = delete;
*
* @return Capacity of the queue.
*/
auto Capacity() const
{
return capacity_;
}
*
* @return Number of elements in the queue.
*/
auto Size() const
{
return tail_ - head_;
}
*
* @return If the queue is empty.
*/
auto Empty() const
{
return head_ == tail_;
}
* @return the high water mark ratio.
*/
float GetHWMRatio()
{
if (capacity_ == 0) {
LOG(ERROR) << "GetHWMRatio failed, capacity is 0";
return 0.0;
}
float ratio = static_cast<float>(hwm_.load()) / static_cast<float>(capacity_);
hwm_ = 0;
return ratio;
}
* @brief Push an element to the end of the queue.
* It is a blocking call if the queue is full.
* @param[in] ele Element to be pushed.
* @return Status of the call, only K_OK as of now.
*/
Status Put(const_reference ele)
{
std::unique_lock<std::mutex> lock(mux_);
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "Put failed, capacity is 0");
cvFull_.wait(lock, IsNotFull);
auto k = tail_++ % capacity_;
buf_[k] = ele;
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
return Status::OK();
}
* @brief Push an element to the end of the queue. Return error if the queue is full.
* @param[in] ele Element to be added.
* @return Status K_OK or K_TRY_AGAIN (if queue is full).
*/
Status Add(const_reference ele)
{
std::unique_lock<std::mutex> lock(mux_);
if (IsNotFull()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, add failed");
auto k = tail_++ % capacity_;
buf_[k] = ele;
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN, "The queue is full");
}
return Status::OK();
}
* @brief Push an element to the end of the queue with a timeout.
* @param[in] ele Element to be pushed.
* @param[in] timeout in milliseconds
* @return Status K_OK or K_TRY_AGAIN (if timeout expires).
*/
Status Offer(const_reference ele, uint64_t timeout)
{
std::unique_lock<std::mutex> lock(mux_);
cvFull_.wait_for(lock, std::chrono::milliseconds(timeout), IsNotFull);
if (IsNotFull()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, offer failed");
auto k = tail_++ % capacity_;
buf_[k] = ele;
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN,
"The queue is full within allowed time: " + std::to_string(timeout) + " ms");
}
return Status::OK();
}
* @brief Push an element to the end of the queue.
* It is a blocking call if the queue is full.
* @param[in] ele Element to be pushed.
* @return Status object.
*/
Status Put(T &&ele)
{
std::unique_lock<std::mutex> lock(mux_);
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, put failed");
cvFull_.wait(lock, IsNotFull);
auto k = tail_++ % capacity_;
buf_[k] = std::forward<T>(ele);
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
return Status::OK();
}
* @brief Push an element to the end of the queue. Return error if the queue is full.
* @param[in] ele Element to be pushed.
* @return Status K_OK or K_TRY_AGAIN (if queue is full).
*/
Status Add(T &&ele)
{
std::unique_lock<std::mutex> lock(mux_);
if (IsNotFull()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero failed");
auto k = tail_++ % capacity_;
buf_[k] = std::forward<T>(ele);
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN, "The queue is full");
}
return Status::OK();
}
* @brief Push an element to the end of the queue with a timeout.
* @param[in] ele Element to be pushed.
* @param[in] timeout (in milliseconds).
* @return Status K_OK or K_TRY_AGAIN (if timeout expires).
*/
Status Offer(T &&ele, uint64_t timeout)
{
std::unique_lock<std::mutex> lock(mux_);
cvFull_.wait_for(lock, std::chrono::milliseconds(timeout), IsNotFull);
if (IsNotFull()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "offer failed, division by zero");
auto k = tail_++ % capacity_;
buf_[k] = std::forward<T>(ele);
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN,
"The queue is full within allowed time: " + std::to_string(timeout) + " ms");
}
return Status::OK();
}
* @brief Push an element to the end of the queue by simply calling the constructor of T in place.
* It is a blocking call if the queue is full.
* @param[in] ele Element to be pushed.
* @return Status object.
*/
template <typename... Ts>
Status EmplaceBack(Ts &&...args) noexcept
{
std::unique_lock<std::mutex> lock(mux_);
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, emplace failed");
cvFull_.wait(lock, IsNotFull);
auto k = tail_++ % capacity_;
new (&(buf_[k])) T(std::forward<Ts>(args)...);
hwm_ = std::max(hwm_.load(), Size());
cvEmpty_.notify_all();
return Status::OK();
}
* @brief Pop an element from the front of the queue.
* It is a blocking call if the queue is empty.
* @param[out] p The pointer to take in popped element.
* @return Status object.
*/
Status Take(pointer p)
{
RETURN_RUNTIME_ERROR_IF_NULL(p);
std::unique_lock<std::mutex> lock(mux_);
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, take failed");
cvEmpty_.wait(lock, IsNotEmpty);
auto k = head_++ % capacity_;
*p = std::move(buf_[k]);
cvFull_.notify_all();
return Status::OK();
}
* @brief Pop an element from the front of the queue. Return error if the queue is empty.
* @param[out] p The pointer to take in popped element.
* @return Status K_OK or K_TRY_AGAIN (if queue is empty).
*/
Status Remove(pointer p)
{
RETURN_RUNTIME_ERROR_IF_NULL(p);
std::unique_lock<std::mutex> lock(mux_);
if (IsNotEmpty()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero, remove failed");
auto k = head_++ % capacity_;
*p = std::move(buf_[k]);
cvFull_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN, "The queue is empty");
}
return Status::OK();
}
* @brief Pop an element from the front of the queue. Return error if timeout expires.
* @param[out] p The pointer to take in popped element.
* @param[in] timeout in milliseconds.
* @return Status K_OK or K_TRY_AGAIN (if timeout expires).
*/
Status Poll(pointer p, uint64_t timeout)
{
RETURN_RUNTIME_ERROR_IF_NULL(p);
std::unique_lock<std::mutex> lock(mux_);
cvEmpty_.wait_for(lock, std::chrono::milliseconds(timeout), IsNotEmpty);
if (IsNotEmpty()) {
CHECK_FAIL_RETURN_STATUS(capacity_ != 0, K_RUNTIME_ERROR, "division by zero");
auto k = head_++ % capacity_;
*p = std::move(buf_[k]);
cvFull_.notify_all();
} else {
RETURN_STATUS(StatusCode::K_TRY_AGAIN,
"The queue is empty within allowed time: " + std::to_string(timeout) + " ms");
}
return Status::OK();
}
* @brief A wrapper to call different variation of send
* @param[in] ele
* @param[in] timeout
* @return Status
*/
Status Send(const_reference ele, int timeout)
{
if (timeout > 0) {
return Offer(ele, timeout);
} else if (timeout == QUE_NO_TIMEOUT) {
return Put(ele);
} else {
return Add(ele);
}
}
* @brief A wrapper to call different variation of send
* @param[in] ele
* @param[in] timeout
* @return Status
*/
Status Send(T &&ele, int timeout)
{
if (timeout > 0) {
return Offer(std::forward<T>(ele), timeout);
} else if (timeout == QUE_NO_TIMEOUT) {
return Put(std::forward<T>(ele));
} else {
return Add(std::forward<T>(ele));
}
}
* @brief A wrapper to call different variation of receive
* @param ele
* @param timeout
* @return
*/
Status Recv(reference ele, int timeout)
{
if (timeout > 0) {
return Poll(&ele, timeout);
} else if (timeout == QUE_NO_TIMEOUT) {
return Take(&ele);
} else {
return Remove(&ele);
}
}
* @brief Same as Poll but not popping any element.
* @param[in] timeout Timeout.
* @return Status of the call.
*/
Status PollRecv(uint64_t timeout)
{
std::unique_lock<std::mutex> lock(mux_);
cvEmpty_.wait_for(lock, std::chrono::milliseconds(timeout), IsNotEmpty);
RETURN_OK_IF_TRUE(IsNotEmpty());
RETURN_STATUS(StatusCode::K_TRY_AGAIN,
"The queue is empty within allowed time: " + std::to_string(timeout) + " ms");
}
* @brief Same as Offer but not inserting any element.
* @param[in] timeout Timeout of the call.
* @return Status of the call.
*/
Status PollSend(uint64_t timeout)
{
std::unique_lock<std::mutex> lock(mux_);
cvFull_.wait_for(lock, std::chrono::milliseconds(timeout), IsNotFull);
RETURN_OK_IF_TRUE(IsNotFull());
RETURN_STATUS(StatusCode::K_TRY_AGAIN,
"The queue is full within allowed time: " + std::to_string(timeout) + " ms");
}
private:
std::unique_ptr<T[]> buf_;
size_t capacity_;
size_t head_;
size_t tail_;
std::atomic<size_t> hwm_;
std::mutex mux_;
std::condition_variable cvEmpty_;
std::condition_variable cvFull_;
std::function<bool()> IsNotFull = [this]() -> bool { return Size() != Capacity(); };
std::function<bool()> IsNotEmpty = [this]() -> bool { return !Empty(); };
};
}
#endif