* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef META_SRC_TASK_QUEUE_H
#define META_SRC_TASK_QUEUE_H
#include <chrono>
#include <deque>
#include <mutex>
#include <thread>
#include <base/containers/vector.h>
#include <meta/interface/intf_clock.h>
#include <meta/interface/intf_task_queue.h>
META_BEGIN_NAMESPACE()
class TaskQueueImpl : public ITaskQueueExtend {
public:
using Token = ITaskQueue::Token;
void SetExtend(ITaskQueueExtend* extend) override
{
extend_ = extend ? extend : this;
}
void Shutdown() override
{
Close();
}
void CancelTask(Token token)
{
ITaskQueueTask::Ptr removed;
if (token != nullptr) {
std::unique_lock lock{mutex_};
if (token == execToken_) {
currentlyExecutingRemoved = true;
}
if (std::this_thread::get_id() != execThread_) {
while (!terminate_ && token == execToken_) {
lock.unlock();
std::this_thread::yield();
lock.lock();
}
}
for (auto it = tasks_.begin(); it != tasks_.end();) {
if (it->operation.get() == token) {
removed = BASE_NS::move(it->operation);
it = tasks_.erase(it);
} else {
++it;
}
}
for (auto it = rearm_.begin(); it != rearm_.end();) {
if (it->operation.get() == token) {
removed = BASE_NS::move(it->operation);
it = rearm_.erase(it);
} else {
++it;
}
}
}
}
Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
{
Token ret{p.get()};
if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
i->SetQueueAndToken(self_.lock(), ret);
}
if (tasks_.empty()) {
tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
} else if (tasks_.size() == 1) {
if (tasks_.front().executeTime >= excTime) {
tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
} else {
tasks_.insert(tasks_.begin(), {delay, excTime, BASE_NS::move(p)});
}
} else {
bool found = false;
for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
if (it->executeTime <= excTime) {
tasks_.insert(it, {delay, excTime, BASE_NS::move(p)});
found = true;
break;
}
}
if (!found) {
tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
}
}
return ret;
}
Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
{
if (p) {
std::unique_lock lock{mutex_};
return AddTaskImpl(BASE_NS::move(p), delay, excTime);
}
return nullptr;
}
TimeSpan Time() const
{
using namespace std::chrono;
return TimeSpan::Microseconds(
duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
}
void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
{
while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
auto task = BASE_NS::move(tasks_.back());
tasks_.pop_back();
execToken_ = task.operation.get();
currentlyExecutingRemoved = false;
lock.unlock();
bool redo = extend_->InvokeTask(task.operation);
if (!redo) {
task.operation.reset();
}
lock.lock();
if (currentlyExecutingRemoved) {
lock.unlock();
task.operation.reset();
lock.lock();
} else if (redo) {
rearm_.emplace_back(BASE_NS::move(task));
}
execToken_ = nullptr;
}
if (!terminate_) {
Rearm(curTime);
}
}
void Rearm(TimeSpan curTime)
{
for (auto it = rearm_.rbegin(); it != rearm_.rend(); ++it) {
auto& task = *it;
if (task.delay > TimeSpan()) {
uint64_t dt = static_cast<uint64_t>(task.delay.ToMicroseconds());
uint64_t et = static_cast<uint64_t>(task.executeTime.ToMicroseconds());
uint64_t ct = static_cast<uint64_t>(curTime.ToMicroseconds());
et += dt;
if (et <= ct) {
auto ticks = ((ct - et) + (dt - 1)) / dt;
et += (ticks * dt);
CORE_LOG_V("Skipped ticks %d", (int)ticks);
}
task.executeTime = TimeSpan::Microseconds(et);
} else {
task.executeTime = curTime;
}
AddTaskImpl(task.operation, task.delay, task.executeTime);
}
rearm_.clear();
}
void Close()
{
std::unique_lock lock{mutex_};
terminate_ = true;
tasks_.clear();
}
struct Task {
Task() = default;
Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p)
{}
TimeSpan delay;
TimeSpan executeTime;
ITaskQueueTask::Ptr operation{nullptr};
};
protected:
std::mutex mutex_;
ITaskQueueExtend* extend_{this};
bool terminate_{};
std::thread::id execThread_;
Token execToken_{nullptr};
std::deque<Task> tasks_;
BASE_NS::vector<Task> rearm_;
ITaskQueue::WeakPtr self_;
bool currentlyExecutingRemoved{};
};
META_END_NAMESPACE()
#endif