* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <condition_variable>
#include <mutex>
#include <thread>
#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
#include <qos.h>
#include <sys/syscall.h>
#include <unistd.h>
#include "res_sched_client.h"
#endif
#include <base/containers/vector.h>
#include <meta/base/interface_macros.h>
#include <meta/base/namespace.h>
#include <meta/ext/object.h>
#include <meta/interface/intf_task_queue.h>
#include <meta/interface/intf_task_queue_registry.h>
#include "future.h"
#include "task_queue.h"
META_BEGIN_NAMESPACE()
constexpr uint32_t RES_TYPE_EXT_ENGINE_SET_QOS = 10028;
class ThreadedTaskQueue
: public IntroduceInterfaces<MetaObject, IThreadedTaskQueue, ITaskQueueThreadInfo, TaskQueueImpl> {
META_OBJECT(ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IntroduceInterfaces)
public:
using Token = ITaskQueue::Token;
META_NO_COPY_MOVE(ThreadedTaskQueue)
ThreadedTaskQueue() = default;
~ThreadedTaskQueue() override
{
Shutdown();
}
uint64_t CurrentThread() const
{
return std::hash<std::thread::id>{}(std::this_thread::get_id());
}
bool CurrentThreadIsExecutionThread() const override
{
return CurrentThread() == threadId_;
}
bool SetExecutionThread()
{
auto id = CurrentThread();
if (threadId_ != 0) {
return (threadId_ == id);
}
threadId_ = id;
return true;
}
bool Build(const IMetadata::Ptr& data) override
{
bool ret = Super::Build(data);
if (ret) {
self_ = GetSelf<ITaskQueue>();
thread_ = std::thread([this]() {
SetExecutionThread();
ProcessTasks();
});
}
return ret;
}
bool InvokeTask(const ITaskQueueTask::Ptr& task) override
{
if (task) {
auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
auto ret = task->Invoke();
GetTaskQueueRegistry().SetCurrentTaskQueue(q);
return ret;
}
return false;
}
void Shutdown() override
{
Close();
addCondition_.notify_one();
if (thread_.joinable()) {
thread_.join();
}
}
void CancelTask(Token token) override
{
TaskQueueImpl::CancelTask(token);
}
Token AddTask(ITaskQueueTask::Ptr p) override
{
return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
}
Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
{
auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
if (t) {
addCondition_.notify_one();
}
return t;
}
IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
{
IPromise::Ptr promise(new Promise);
BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
auto f = task->GetFuture();
AddTask(BASE_NS::move(task));
return f;
}
void ProcessTasks()
{
std::unique_lock lock{mutex_};
execThread_ = std::this_thread::get_id();
#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
int ret = OHOS::QOS::SetThreadQos(OHOS::QOS::QosLevel::QOS_USER_INTERACTIVE);
CORE_LOG_I("set engine child thread qos %s", ret == 0 ? "success" : "failed");
auto tid = syscall(SYS_gettid);
if (tid > 0) {
std::unordered_map<std::string, std::string> mapPayload{
{"pid", std::to_string(getpid())}, {"tid", std::to_string(tid)}};
CORE_LOG_I("ReportEngineResType %s %s", mapPayload["pid"].c_str(), mapPayload["tid"].c_str());
OHOS::ResourceSchedule::ResSchedClient::GetInstance().ReportData(
RES_TYPE_EXT_ENGINE_SET_QOS, 1, mapPayload);
}
#endif
while (!terminate_) {
if (!tasks_.empty()) {
TimeSpan delta = tasks_.back().executeTime - Time();
if (delta > TimeSpan::Microseconds(0)) {
addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
}
} else {
addCondition_.wait(lock);
}
auto curTime = Time();
TaskQueueImpl::ProcessTasks(lock, curTime);
}
#if defined(__OHOS_PLATFORM__) && !defined(BUILD_PUBLIC_VERSION)
std::unordered_map<std::string, std::string> mapPayload{
{"pid", std::to_string(getpid())}, {"tid", std::to_string(tid)}};
CORE_LOG_I("ReportEngineResType %s %s", mapPayload["pid"].c_str(), mapPayload["tid"].c_str());
OHOS::ResourceSchedule::ResSchedClient::GetInstance().ReportData(RES_TYPE_EXT_ENGINE_SET_QOS, 0, mapPayload);
#endif
}
private:
uint64_t threadId_{0};
std::condition_variable addCondition_;
std::thread thread_;
};
namespace Internal {
IObjectFactory::Ptr GetThreadedTaskQueueFactory()
{
return ThreadedTaskQueue::GetFactory();
}
}
META_END_NAMESPACE()