* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "DelayedWorkQueue.h"
#include "common.h"
void DelayedWorkQueue::Offer(ScheduledFutureTask* task)
{
if (task == nullptr) {
THROW_RUNTIME_ERROR("DelayedWorkQueue::Offer, task is nullptr");
}
std::lock_guard<std::mutex> lock(queueMutex);
queue.push(task);
condition.notify_one();
}
ScheduledFutureTask* DelayedWorkQueue::Take()
{
std::unique_lock<std::mutex> lock(queueMutex);
while (!stop) {
condition.wait(lock, [this] { return stop || !queue.empty(); });
if (stop) {
break;
}
ScheduledFutureTask* task = queue.top();
long delay = task->GetDelay();
if (delay <= 0) {
queue.pop();
return task;
}
condition.wait_for(
lock,
std::chrono::milliseconds(delay),
[this, task] {
return stop || queue.empty() || queue.top() != task || task->GetDelay() <= 0;
});
}
return nullptr;
}
void DelayedWorkQueue::Shutdown()
{
{
std::lock_guard<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
}
void DelayedWorkQueue::NotifyAll()
{
std::lock_guard<std::mutex> lock(queueMutex);
condition.notify_all();
}