910e62b5创建于 1月15日历史提交
// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/sequence_manager/work_queue.h"

#include <optional>

#include "base/debug/alias.h"
#include "base/task/common/task_annotator.h"
#include "base/task/sequence_manager/fence.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/task_order.h"
#include "base/task/sequence_manager/work_queue_sets.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/cleanup/cleanup.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"

namespace base::sequence_manager::internal {

WorkQueue::WorkQueue(TaskQueueImpl* task_queue,
                     const char* name,
                     QueueType queue_type)
    : task_queue_(task_queue), name_(name), queue_type_(queue_type) {}

Value::List WorkQueue::AsValue(TimeTicks now) const {
  Value::List state;
  for (const Task& task : tasks_) {
    state.Append(TaskQueueImpl::TaskAsValue(task, now));
  }
  return state;
}

WorkQueue::~WorkQueue() {
  DCHECK(!work_queue_sets_) << task_queue_->GetName() << " : "
                            << work_queue_sets_->GetName() << " : " << name_;
}

const Task* WorkQueue::GetFrontTask() const {
  if (tasks_.empty()) {
    return nullptr;
  }
  return &tasks_.front();
}

const Task* WorkQueue::GetBackTask() const {
  if (tasks_.empty()) {
    return nullptr;
  }
  return &tasks_.back();
}

bool WorkQueue::BlockedByFence() const {
  if (!fence_) {
    return false;
  }

  // If the queue is empty then any future tasks will have a higher enqueue
  // order and will be blocked. The queue is also blocked if the head is past
  // the fence.
  return tasks_.empty() || tasks_.front().task_order() >= fence_->task_order();
}

std::optional<TaskOrder> WorkQueue::GetFrontTaskOrder() const {
  if (tasks_.empty() || BlockedByFence()) {
    return std::nullopt;
  }
  // Quick sanity check.
  DCHECK(tasks_.front().task_order() <= tasks_.back().task_order())
      << task_queue_->GetName() << " : " << work_queue_sets_->GetName() << " : "
      << name_;
  return tasks_.front().task_order();
}

void WorkQueue::Push(Task task) {
  bool was_empty = tasks_.empty();
#ifndef NDEBUG
  DCHECK(task.enqueue_order_set());
#endif

  // Make sure the task order is strictly increasing.
  DCHECK(was_empty || tasks_.back().task_order() < task.task_order());
  // Make sure enqueue order is strictly increasing for immediate queues and
  // monotonically increasing for delayed queues.
  DCHECK(was_empty || tasks_.back().enqueue_order() < task.enqueue_order() ||
         (queue_type_ == QueueType::kDelayed &&
          tasks_.back().enqueue_order() == task.enqueue_order()));

  // Amortized O(1).
  tasks_.push_back(std::move(task));

  if (!was_empty) {
    return;
  }

  // If we hit the fence, pretend to WorkQueueSets that we're empty.
  if (work_queue_sets_ && !BlockedByFence()) {
    work_queue_sets_->OnTaskPushedToEmptyQueue(this);
  }
}

WorkQueue::TaskPusher::TaskPusher(WorkQueue* work_queue)
    : work_queue_(work_queue), was_empty_(work_queue->Empty()) {}

WorkQueue::TaskPusher::TaskPusher(TaskPusher&& other)
    : work_queue_(other.work_queue_), was_empty_(other.was_empty_) {
  other.work_queue_ = nullptr;
}

void WorkQueue::TaskPusher::Push(Task task) {
  DCHECK(work_queue_);

#ifndef NDEBUG
  DCHECK(task.enqueue_order_set());
#endif

  // Make sure the task order is strictly increasing.
  DCHECK(work_queue_->tasks_.empty() ||
         work_queue_->tasks_.back().task_order() < task.task_order());
  // Make sure enqueue order is strictly increasing for immediate queues and
  // monotonically increasing for delayed queues.
  DCHECK(work_queue_->tasks_.empty() ||
         work_queue_->tasks_.back().enqueue_order() < task.enqueue_order() ||
         (work_queue_->queue_type_ == QueueType::kDelayed &&
          work_queue_->tasks_.back().enqueue_order() == task.enqueue_order()));

  // Amortized O(1).
  work_queue_->tasks_.push_back(std::move(task));
}

WorkQueue::TaskPusher::~TaskPusher() {
  // If |work_queue_| became non empty and it isn't blocked by a fence then we
  // must notify |work_queue_->work_queue_sets_|.
  if (was_empty_ && work_queue_ && !work_queue_->Empty() &&
      work_queue_->work_queue_sets_ && !work_queue_->BlockedByFence()) {
    work_queue_->work_queue_sets_->OnTaskPushedToEmptyQueue(work_queue_);
  }
}

WorkQueue::TaskPusher WorkQueue::CreateTaskPusher() {
  return TaskPusher(this);
}

void WorkQueue::PushNonNestableTaskToFront(Task task) {
  DCHECK(task.nestable == Nestable::kNonNestable);

  bool was_empty = tasks_.empty();
  bool was_blocked = BlockedByFence();
#ifndef NDEBUG
  DCHECK(task.enqueue_order_set());
#endif

  if (!was_empty) {
    // Make sure the task order is strictly increasing.
    DCHECK(task.task_order() < tasks_.front().task_order())
        << task_queue_->GetName() << " : " << work_queue_sets_->GetName()
        << " : " << name_;
    // Make sure the enqueue order is strictly increasing for immediate queues
    // and monotonically increasing for delayed queues.
    DCHECK(task.enqueue_order() < tasks_.front().enqueue_order() ||
           (queue_type_ == QueueType::kDelayed &&
            task.enqueue_order() == tasks_.front().enqueue_order()))
        << task_queue_->GetName() << " : " << work_queue_sets_->GetName()
        << " : " << name_;
  }

  // Amortized O(1).
  tasks_.push_front(std::move(task));

  if (!work_queue_sets_) {
    return;
  }

  // Pretend  to WorkQueueSets that nothing has changed if we're blocked.
  if (BlockedByFence()) {
    return;
  }

  // Pushing task to front may unblock the fence.
  if (was_empty || was_blocked) {
    work_queue_sets_->OnTaskPushedToEmptyQueue(this);
  } else {
    work_queue_sets_->OnQueuesFrontTaskChanged(this);
  }
}

void WorkQueue::TakeImmediateIncomingQueueTasks() {
  DCHECK(tasks_.empty());

  task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
  if (tasks_.empty()) {
    return;
  }

  // If we hit the fence, pretend to WorkQueueSets that we're empty.
  if (work_queue_sets_ && !BlockedByFence()) {
    work_queue_sets_->OnTaskPushedToEmptyQueue(this);
  }
}

Task WorkQueue::TakeTaskFromWorkQueue() {
  DCHECK(work_queue_sets_);
  DCHECK(!tasks_.empty());

  Task pending_task = std::move(tasks_.front());
  tasks_.pop_front();
  // NB immediate tasks have a different pipeline to delayed ones.
  if (tasks_.empty()) {
    // NB delayed tasks are inserted via Push, no don't need to reload those.
    if (queue_type_ == QueueType::kImmediate) {
      // Short-circuit the queue reload so that OnPopMinQueueInSet does the
      // right thing.
      task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
    }
  }

  DCHECK(work_queue_sets_);
#if DCHECK_IS_ON()
  // If diagnostics are on it's possible task queues are being selected at
  // random so we can't use the (slightly) more efficient OnPopMinQueueInSet.
  work_queue_sets_->OnQueuesFrontTaskChanged(this);
#else
  // OnPopMinQueueInSet calls GetFrontTaskOrder which checks
  // BlockedByFence() so we don't need to here.
  work_queue_sets_->OnPopMinQueueInSet(this);
#endif
  task_queue_->TraceQueueSize();
  return pending_task;
}

bool WorkQueue::RemoveCancelledTasks(RemoveCancelledTasksPolicy policy) {
  // Since task destructors could have a side-effect of deleting this task queue
  // we move cancelled tasks into a temporary container which can be emptied
  // without accessing |this|.
  absl::InlinedVector<Task, 8> tasks_to_delete;

  for (auto& pending_task : tasks_) {
#if DCHECK_IS_ON()
    // Checking if a task is cancelled can trip DCHECK/CHECK failures out of the
    // control of the SequenceManager code, so provide a task trace for easier
    // diagnosis. See crbug.com/374409662 for context.
    absl::Cleanup resetter = [original_task =
                                  TaskAnnotator::CurrentTaskForThread()] {
      TaskAnnotator::SetCurrentTaskForThread({}, original_task);
    };
    TaskAnnotator::SetCurrentTaskForThread(base::PassKey<WorkQueue>(),
                                           &pending_task);
#endif
    CHECK(pending_task.task, base::NotFatalUntil::M140);

    if (pending_task.task.IsCancelled()) {
      tasks_to_delete.push_back(std::move(pending_task));
    } else if (policy == RemoveCancelledTasksPolicy::kFront) {
      // Stop iterating when encountering a non-cancelled tasks and the policy
      // is to remove only from the front.
      break;
    }
  }

  if (tasks_to_delete.empty()) {
    return false;
  }

  if (policy == RemoveCancelledTasksPolicy::kFront) {
    tasks_.erase(tasks_.begin(),
                 tasks_.begin() + base::checked_cast<std::ptrdiff_t>(
                                      tasks_to_delete.size()));
  } else {
    DCHECK_EQ(policy, RemoveCancelledTasksPolicy::kAll);
    std::erase_if(tasks_, [](const Task& task) { return task.task.is_null(); });
  }

  if (tasks_.empty()) {
    // NB delayed tasks are inserted via Push, no don't need to reload those.
    if (queue_type_ == QueueType::kImmediate) {
      // Short-circuit the queue reload so that OnPopMinQueueInSet does the
      // right thing.
      task_queue_->TakeImmediateIncomingQueueTasks(&tasks_);
    }
  }

  // If we have a valid |heap_handle_| (i.e. we're not blocked by a fence or
  // disabled) then |work_queue_sets_| needs to be told.
  if (heap_handle_.IsValid()) {
    CHECK(work_queue_sets_);
    work_queue_sets_->OnQueuesFrontTaskChanged(this);
  }
  task_queue_->TraceQueueSize();

  return true;
}

void WorkQueue::AssignToWorkQueueSets(WorkQueueSets* work_queue_sets) {
  work_queue_sets_ = work_queue_sets;
}

void WorkQueue::AssignSetIndex(size_t work_queue_set_index) {
  work_queue_set_index_ = work_queue_set_index;
}

bool WorkQueue::InsertFenceImpl(Fence fence) {
  DCHECK(!fence_ || fence.task_order() >= fence_->task_order() ||
         fence.IsBlockingFence());
  bool was_blocked_by_fence = BlockedByFence();
  fence_ = fence;
  return was_blocked_by_fence;
}

void WorkQueue::InsertFenceSilently(Fence fence) {
  // Ensure that there is no fence present or a new one blocks queue completely.
  DCHECK(!fence_ || fence_->IsBlockingFence());
  InsertFenceImpl(fence);
}

bool WorkQueue::InsertFence(Fence fence) {
  bool was_blocked_by_fence = InsertFenceImpl(fence);
  if (!work_queue_sets_) {
    return false;
  }

  // Moving the fence forward may unblock some tasks.
  if (!tasks_.empty() && was_blocked_by_fence && !BlockedByFence()) {
    work_queue_sets_->OnTaskPushedToEmptyQueue(this);
    return true;
  }
  // Fence insertion may have blocked all tasks in this work queue.
  if (BlockedByFence()) {
    work_queue_sets_->OnQueueBlocked(this);
  }
  return false;
}

bool WorkQueue::RemoveFence() {
  bool was_blocked_by_fence = BlockedByFence();
  fence_ = std::nullopt;
  if (work_queue_sets_ && !tasks_.empty() && was_blocked_by_fence) {
    work_queue_sets_->OnTaskPushedToEmptyQueue(this);
    return true;
  }
  return false;
}

void WorkQueue::PopTaskForTesting() {
  if (tasks_.empty()) {
    return;
  }
  tasks_.pop_front();
}

void WorkQueue::CollectTasksOlderThan(TaskOrder reference,
                                      std::vector<const Task*>* result) const {
  for (const Task& task : tasks_) {
    if (task.task_order() >= reference) {
      break;
    }

    result->push_back(&task);
  }
}

}  // namespace base::sequence_manager::internal