#include "base/task/sequence_manager/work_queue.h"
#include "base/containers/stack_container.h"
#include "base/debug/alias.h"
#include "base/json/json_writer.h"
#include "base/task/sequence_manager/fence.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_order.h"
#include "base/task/sequence_manager/work_queue_sets.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace base {
namespace sequence_manager {
namespace internal {
WorkQueue::WorkQueue(TaskQueueImpl* task_queue,
const char* name,
QueueType queue_type)
: task_queue_(task_queue), name_(name), queue_type_(queue_type) {}
Value::List WorkQueue::AsValue(TimeTicks now) const {
Value::List state;
for (const Task& task : tasks_)
state.Append(TaskQueueImpl::TaskAsValue(task, now));
return state;
}
WorkQueue::~WorkQueue() {
DCHECK(!work_queue_sets_) << task_queue_->GetName() << " : "
<< work_queue_sets_->GetName() << " : " << name_;
}
const Task* WorkQueue::GetFrontTask() const {
if (tasks_.empty())
return nullptr;
return &tasks_.front();
}
const Task* WorkQueue::GetBackTask() const {
if (tasks_.empty())
return nullptr;
return &tasks_.back();
}
bool WorkQueue::BlockedByFence() const {
if (!fence_)
return false;
return tasks_.empty() || tasks_.front().task_order() >= fence_->task_order();
}
absl::optional<TaskOrder> WorkQueue::GetFrontTaskOrder() const {
if (tasks_.empty() || BlockedByFence())
return absl::nullopt;
DCHECK(tasks_.front().task_order() <= tasks_.back().task_order())
<< task_queue_->GetName() << " : " << work_queue_sets_->GetName() << " : "
<< name_;
return tasks_.front().task_order();
}
void WorkQueue::Push(Task task) {
bool was_empty = tasks_.empty();
#ifndef NDEBUG
DCHECK(task.enqueue_order_set());
#endif
DCHECK(was_empty || tasks_.back().task_order() < task.task_order());
DCHECK(was_empty || tasks_.back().enqueue_order() < task.enqueue_order() ||
(queue_type_ == QueueType::kDelayed &&
tasks_.back().enqueue_order() == task.enqueue_order()));
tasks_.push_back(std::move(task));
if (!was_empty)
return;
if (work_queue_sets_ && !BlockedByFence())
work_queue_sets_->OnTaskPushedToEmptyQueue(this);
}
WorkQueue::TaskPusher::TaskPusher(WorkQueue* work_queue)
: work_queue_(work_queue), was_empty_(work_queue->Empty()) {}
WorkQueue::TaskPusher::TaskPusher(TaskPusher&& other)
: work_queue_(other.work_queue_), was_empty_(other.was_empty_) {
other.work_queue_ = nullptr;
}
void WorkQueue::TaskPusher::Push(Task task) {
DCHECK(work_queue_);
#ifndef NDEBUG
DCHECK(task.enqueue_order_set());
#endif
DCHECK(work_queue_->tasks_.empty() ||
work_queue_->tasks_.back().task_order() < task.task_order());
DCHECK(work_queue_->tasks_.empty() ||
work_queue_->tasks_.back().enqueue_order() < task.enqueue_order() ||
(work_queue_->queue_type_ == QueueType::kDelayed &&
work_queue_->tasks_.back().enqueue_order() == task.enqueue_order()));
work_queue_->tasks_.push_back(std::move(task));
}
WorkQueue::TaskPusher::~TaskPusher() {
if (was_empty_ && work_queue_ && !work_queue_->Empty() &&
work_queue_->work_queue_sets_ && !work_queue_->BlockedByFence()) {
work_queue_->work_queue_sets_->OnTaskPushedToEmptyQueue(work_queue_);
}
}
WorkQueue::TaskPusher WorkQueue::CreateTaskPusher() {
return TaskPusher(this);
}
void WorkQueue::PushNonNestableTaskToFront(Task task) {
DCHECK(task.nestable == Nestable::kNonNestable);
bool was_empty = tasks_.empty();
bool was_blocked = BlockedByFence();
#ifndef NDEBUG
DCHECK(task.enqueue_order_set());
#endif
if (!was_empty) {
DCHECK(task.task_order() < tasks_.front().task_order())
<< task_queue_->GetName() << " : " << work_queue_sets_->GetName()
<< " : " << name_;
DCHECK(task.enqueue_order() < tasks_.front().enqueue_order() ||
(queue_type_ == QueueType::kDelayed &&
task.enqueue_order() == tasks_.front().enqueue_order()))
<< task_queue_->GetName() << " : " << work_queue_sets_->GetName()
<< " : " << name_;
}
tasks_.push_front(std::move(task));
if (!work_queue_sets_)
return;
if (BlockedByFence())
return;
if (was_empty || was_blocked) {
work_queue_sets_->OnTaskPushedToEmptyQueue(this);
} else {
work_queue_sets_->OnQueuesFrontTaskChanged(this);
}
}
void WorkQueue::TakeImmediateIncomingQueueTasks() {
DCHECK(tasks_.empty());
task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
if (tasks_.empty())
return;
if (work_queue_sets_ && !BlockedByFence())
work_queue_sets_->OnTaskPushedToEmptyQueue(this);
}
Task WorkQueue::TakeTaskFromWorkQueue() {
DCHECK(work_queue_sets_);
DCHECK(!tasks_.empty());
Task pending_task = std::move(tasks_.front());
tasks_.pop_front();
if (tasks_.empty()) {
if (queue_type_ == QueueType::kImmediate) {
task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
}
#if defined(OHOS_BUGFIX_CRASH)
#define MAX_QUE_SIZE (1 << 20)
if (tasks_.max_size() >= MAX_QUE_SIZE) {
LOG(ERROR) << "QTL, QueueName: " << task_queue_->GetName() << " : " << work_queue_sets_->GetName()
<< " : " << name_;
LOG(ERROR) << "QTL, max_size is " << tasks_.max_size()
<< " last task post from:" << pending_task.posted_from.ToString();
TimeTicks now = task_queue_->sequence_manager()->NowTicks();
std::string QueueDump;
JSONWriter::Write(task_queue_->AsValue(now, false), &QueueDump);
LOG(ERROR) << "QTL, Queue Dump: " << QueueDump;
const Task* immediate_back_task = task_queue_->immediate_work_queue()->GetBackTask();
if (immediate_back_task) {
std::string backTaskDump;
JSONWriter::Write(TaskQueueImpl::TaskAsValue(*immediate_back_task, now), &backTaskDump);
LOG(ERROR) << "QTL, immediate back task: " << backTaskDump;
}
}
#endif
tasks_.MaybeShrinkQueue();
}
DCHECK(work_queue_sets_);
#if DCHECK_IS_ON()
work_queue_sets_->OnQueuesFrontTaskChanged(this);
#else
work_queue_sets_->OnPopMinQueueInSet(this);
#endif
task_queue_->TraceQueueSize();
return pending_task;
}
bool WorkQueue::RemoveAllCanceledTasksFromFront() {
if (!work_queue_sets_) {
return false;
}
StackVector<Task, 8> tasks_to_delete;
while (!tasks_.empty()) {
const auto& pending_task = tasks_.front();
if (pending_task.task && !pending_task.IsCanceled())
break;
tasks_to_delete->push_back(std::move(tasks_.front()));
tasks_.pop_front();
}
if (!tasks_to_delete->empty()) {
if (tasks_.empty()) {
if (queue_type_ == QueueType::kImmediate) {
task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
}
tasks_.MaybeShrinkQueue();
}
if (heap_handle_.IsValid())
work_queue_sets_->OnQueuesFrontTaskChanged(this);
task_queue_->TraceQueueSize();
}
return !tasks_to_delete->empty();
}
void WorkQueue::AssignToWorkQueueSets(WorkQueueSets* work_queue_sets) {
work_queue_sets_ = work_queue_sets;
}
void WorkQueue::AssignSetIndex(size_t work_queue_set_index) {
work_queue_set_index_ = work_queue_set_index;
}
bool WorkQueue::InsertFenceImpl(Fence fence) {
DCHECK(!fence_ || fence.task_order() >= fence_->task_order() ||
fence.IsBlockingFence());
bool was_blocked_by_fence = BlockedByFence();
fence_ = fence;
return was_blocked_by_fence;
}
void WorkQueue::InsertFenceSilently(Fence fence) {
DCHECK(!fence_ || fence_->IsBlockingFence());
InsertFenceImpl(fence);
}
bool WorkQueue::InsertFence(Fence fence) {
bool was_blocked_by_fence = InsertFenceImpl(fence);
if (!work_queue_sets_)
return false;
if (!tasks_.empty() && was_blocked_by_fence && !BlockedByFence()) {
work_queue_sets_->OnTaskPushedToEmptyQueue(this);
return true;
}
if (BlockedByFence())
work_queue_sets_->OnQueueBlocked(this);
return false;
}
bool WorkQueue::RemoveFence() {
bool was_blocked_by_fence = BlockedByFence();
fence_ = absl::nullopt;
if (work_queue_sets_ && !tasks_.empty() && was_blocked_by_fence) {
work_queue_sets_->OnTaskPushedToEmptyQueue(this);
return true;
}
return false;
}
void WorkQueue::MaybeShrinkQueue() {
tasks_.MaybeShrinkQueue();
}
void WorkQueue::PopTaskForTesting() {
if (tasks_.empty())
return;
tasks_.pop_front();
}
void WorkQueue::CollectTasksOlderThan(TaskOrder reference,
std::vector<const Task*>* result) const {
for (const Task& task : tasks_) {
if (task.task_order() >= reference)
break;
result->push_back(&task);
}
}
}
}
}