* Copyright (C) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/types.h>
#include <unistd.h>
#ifdef PLATFORM_IOS
#include <pthread.h>
#include <mm_malloc.h>
#include <stdlib.h>
#else
#include <malloc.h>
#endif
#include "task_queue.h"
#include "media_log.h"
#include "media_errors.h"
using namespace OHOS::QOS;
namespace {
constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_PLAYER, "TaskQueue"};
}
namespace OHOS {
namespace Media {
TaskQueue::~TaskQueue()
{
(void)Stop();
}
int32_t TaskQueue::Start()
{
std::unique_lock<std::mutex> lock(mutex_);
CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
isExit_ = false;
thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s]", FAKE_POINTER(this), name_.c_str());
return MSERR_OK;
}
int32_t TaskQueue::Stop() noexcept
{
std::unique_lock<std::mutex> lock(mutex_);
if (isExit_) {
MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str());
return MSERR_OK;
}
if (std::this_thread::get_id() == thread_->get_id()) {
MEDIA_LOGI("Stop at the task thread, reject");
return MSERR_INVALID_OPERATION;
}
std::unique_ptr<std::thread> t;
isExit_ = true;
cond_.notify_all();
std::swap(thread_, t);
lock.unlock();
if (t != nullptr && t->joinable()) {
t->join();
}
lock.lock();
CancelNotExecutedTaskLocked();
return MSERR_OK;
}
void TaskQueue::SetQos(const QosLevel level)
{
if (tid_ == -1) {
MEDIA_LOGW("SetQos thread level failed, tid invalid");
return;
}
MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level));
SetQosForOtherThread(level, tid_);
}
void TaskQueue::ResetQos()
{
if (tid_ == -1) {
MEDIA_LOGW("ResetQos thread level failed, tid invalid");
return;
}
ResetQosForOtherThread(tid_);
MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_));
}
__attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
bool cancelNotExecuted, uint64_t delayUs)
{
constexpr uint64_t MAX_DELAY_US = 10000000ULL;
CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
"Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
task->Clear();
CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
"Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
"], invalid! [%{public}s]",
delayUs, MAX_DELAY_US, name_.c_str());
std::unique_lock<std::mutex> lock(mutex_);
CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
"Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
if (cancelNotExecuted) {
CancelNotExecutedTaskLocked();
}
constexpr uint32_t US_TO_NS = 1000;
uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
"Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
return (item.executeTimeNs_ > executeTimeNs);
});
(void)taskList_.insert(iter, {task, executeTimeNs});
cond_.notify_all();
return 0;
}
__attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
{
MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
while (!taskList_.empty()) {
std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
taskList_.pop_front();
if (task != nullptr) {
task->Cancel();
}
}
}
__attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
{
constexpr uint32_t nameSizeMax = 15;
#ifdef PLATFORM_IOS
uint64_t tid;
pthread_threadid_np(NULL, &tid);
tid_ = tid;
#else
tid_ = gettid();
#endif
MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
#ifdef PLATFORM_IOS
pthread_setname_np(name_.substr(0, nameSizeMax).c_str());
#else
pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
#endif
#ifndef PLATFORM_IOS
(void)mallopt(-1003, 0);
#endif
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
if (isExit_) {
MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
return;
}
TaskHandlerItem item = taskList_.front();
uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
if (curTimeNs >= item.executeTimeNs_) {
taskList_.pop_front();
} else {
uint64_t diff = item.executeTimeNs_ - curTimeNs;
(void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
continue;
}
lock.unlock();
if (item.task_ == nullptr || item.task_->IsCanceled()) {
MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
continue;
}
item.task_->Execute();
if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
continue;
}
int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
if (res != MSERR_OK) {
MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
}
}
#ifndef PLATFORM_IOS
(void)mallopt(-1002, 0);
#endif
MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
}
bool TaskQueue::IsTaskExecuting()
{
std::unique_lock<std::mutex> lock(mutex_);
return isTaskExecuting_;
}
}
}