* Copyright (c) 2021-2025 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common_components/taskpool/task_queue.h"
namespace common {
void TaskQueue::PostTask(std::unique_ptr<Task> task)
{
std::lock_guard<std::mutex> guard(mtx_);
DCHECK_CC(!terminate_);
tasks_.push_back(std::move(task));
cv_.notify_one();
}
void TaskQueue::PostDelayedTask(std::unique_ptr<Task> task, uint64_t delayMilliseconds)
{
std::lock_guard<std::mutex> guard(mtx_);
DCHECK_CC(!terminate_);
auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMilliseconds);
delayedTasks_.insert({deadline, std::move(task)});
cv_.notify_one();
}
std::unique_ptr<Task> TaskQueue::PopTask()
{
std::unique_lock<std::mutex> lock(mtx_);
while (true) {
MoveExpiredTask(lock);
if (!tasks_.empty()) {
std::unique_ptr<Task> task = std::move(tasks_.front());
tasks_.pop_front();
return task;
}
if (terminate_) {
cv_.notify_all();
return nullptr;
}
WaitForTask(lock);
}
}
void TaskQueue::TerminateTask(int32_t id, TaskType type)
{
std::lock_guard<std::mutex> guard(mtx_);
for (auto &task : tasks_) {
if (id != ALL_TASK_ID && id != task->GetId()) {
continue;
}
if (type != TaskType::ALL && type != task->GetTaskType()) {
continue;
}
task->Terminated();
}
for (auto &taskItem : delayedTasks_) {
if (id != ALL_TASK_ID && id != (taskItem.second)->GetId()) {
continue;
}
if (type != TaskType::ALL && type != (taskItem.second)->GetTaskType()) {
continue;
}
(taskItem.second)->Terminated();
}
}
void TaskQueue::Terminate()
{
std::lock_guard<std::mutex> guard(mtx_);
terminate_ = true;
cv_.notify_all();
}
void TaskQueue::ForEachTask(const std::function<void(Task*)> &f)
{
std::lock_guard<std::mutex> guard(mtx_);
for (auto &task : tasks_) {
if (task.get() != nullptr) {
f(task.get());
}
}
}
void TaskQueue::MoveExpiredTask(std::unique_lock<std::mutex> &lock)
{
DCHECK_CC(!mtx_.try_lock());
while (!delayedTasks_.empty()) {
auto it = delayedTasks_.begin();
auto currentTime = std::chrono::steady_clock::now();
if ((std::chrono::duration_cast<std::chrono::duration<double>>(it->first - currentTime)).count() > 0) {
return;
}
tasks_.push_back(std::move(it->second));
delayedTasks_.erase(it);
}
}
void TaskQueue::WaitForTask(std::unique_lock<std::mutex> &lock)
{
DCHECK_CC(!mtx_.try_lock());
if (!delayedTasks_.empty()) {
auto it = delayedTasks_.begin();
auto currentTime = std::chrono::steady_clock::now();
if ((std::chrono::duration_cast<std::chrono::duration<double>>(it->first - currentTime)).count() < 0) {
return;
}
auto waitingTime = std::chrono::duration_cast<std::chrono::milliseconds>(it->first - currentTime);
cv_.wait_for(lock, waitingTime);
} else {
cv_.wait(lock);
}
}
}