#ifndef BASE_TASK_SEQUENCE_MANAGER_HIERARCHICAL_TIMING_WHEEL_H_
#define BASE_TASK_SEQUENCE_MANAGER_HIERARCHICAL_TIMING_WHEEL_H_
#include <algorithm>
#include <array>
#include <numeric>
#include <vector>
#include "base/containers/intrusive_heap.h"
#include "base/task/sequence_manager/timing_wheel.h"
#include "base/time/time.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace base::sequence_manager {
class BASE_EXPORT HierarchicalTimingWheelHandle {
public:
enum : size_t { kInvalidIndex = std::numeric_limits<size_t>::max() };
HierarchicalTimingWheelHandle();
HierarchicalTimingWheelHandle(const HierarchicalTimingWheelHandle& other) =
default;
HierarchicalTimingWheelHandle(HierarchicalTimingWheelHandle&& other) noexcept;
HierarchicalTimingWheelHandle& operator=(
const HierarchicalTimingWheelHandle& other) = default;
HierarchicalTimingWheelHandle& operator=(
HierarchicalTimingWheelHandle&& other) noexcept;
~HierarchicalTimingWheelHandle();
internal::TimingWheelHandle GetTimingWheelHandle() const;
void SetTimingWheelHandle(internal::TimingWheelHandle timing_wheel_handle);
void ClearTimingWheelHandle();
HeapHandle GetHeapHandle();
void SetHeapHandle(HeapHandle handle);
void ClearHeapHandle();
size_t GetHierarchyIndex() const;
void SetHierarchyIndex(size_t hierarchy_index);
void ClearHierarchyIndex();
static HierarchicalTimingWheelHandle Invalid();
bool IsValid() const;
private:
internal::TimingWheelHandle timing_wheel_handle_;
HeapHandle heap_handle_;
size_t hierarchy_index_ = kInvalidIndex;
};
template <typename T>
struct DefaultHierarchicalTimingWheelHandleAccessor {
void SetTimingWheelHandle(T* element,
internal::TimingWheelHandle handle) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->SetTimingWheelHandle(handle);
}
void ClearTimingWheelHandle(T* element) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->ClearTimingWheelHandle();
}
HeapHandle GetHeapHandle(const T* element) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
return htw_handle->GetHeapHandle();
}
void SetHeapHandle(T* element, HeapHandle handle) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->SetHeapHandle(handle);
}
void ClearHeapHandle(T* element) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->ClearHeapHandle();
}
void SetHierarchyIndex(T* element, size_t hierarchy_index) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->SetHierarchyIndex(hierarchy_index);
}
void ClearHierarchyIndex(T* element) const {
HierarchicalTimingWheelHandle* htw_handle = element->handle();
htw_handle->ClearHierarchyIndex();
}
};
template <typename T>
struct GetDelayedRunTime {
TimeTicks operator()(const T& element) { return element.delayed_run_time; }
};
template <typename T>
struct Compare {
bool operator()(const T& lhs, const T& rhs) const {
return lhs.delayed_run_time > rhs.delayed_run_time;
}
};
template <typename T,
size_t TotalWheels,
size_t WheelSize,
size_t SmallestBucketDeltaInMicroseconds,
typename HierarchicalTimingWheelHandleAccessor =
DefaultHierarchicalTimingWheelHandleAccessor<T>,
typename GetDelayedRunTime = GetDelayedRunTime<T>,
typename Compare = Compare<T>>
class HierarchicalTimingWheel {
public:
explicit HierarchicalTimingWheel(
TimeTicks last_wakeup,
const HierarchicalTimingWheelHandleAccessor&
hierarchical_timing_wheel_handle_accessor =
HierarchicalTimingWheelHandleAccessor(),
const GetDelayedRunTime& get_delayed_run_time = GetDelayedRunTime(),
const Compare compare = Compare())
: small_delay_heap_(compare, hierarchical_timing_wheel_handle_accessor),
large_delay_heap_(compare, hierarchical_timing_wheel_handle_accessor),
last_wakeup_(last_wakeup),
hierarchical_timing_wheel_handle_accessor_(
hierarchical_timing_wheel_handle_accessor),
get_delayed_run_time_(get_delayed_run_time) {}
HierarchicalTimingWheel(HierarchicalTimingWheel&&) = delete;
HierarchicalTimingWheel& operator=(HierarchicalTimingWheel&&) = delete;
HierarchicalTimingWheel(const HierarchicalTimingWheel&) = delete;
HierarchicalTimingWheel& operator=(const HierarchicalTimingWheel&) = delete;
~HierarchicalTimingWheel() = default;
size_t Size() {
return small_delay_heap_.size() + large_delay_heap_.size() +
std::accumulate(std::begin(wheels_), std::end(wheels_), 0,
[](size_t i, auto& wheel) {
return wheel.total_elements() + i;
});
}
typename std::vector<T>::const_iterator Insert(T element) {
DCHECK(get_delayed_run_time_(element) > last_wakeup_);
const TimeDelta delay = get_delayed_run_time_(element) - last_wakeup_;
const size_t hierarchy_index = FindHierarchyIndex(delay);
if (IsHeap(hierarchy_index)) {
auto& heap = GetHeapForHierarchyIndex(hierarchy_index);
hierarchical_timing_wheel_handle_accessor_.SetHierarchyIndex(
&element, hierarchy_index);
auto it = heap.insert(std::move(element));
return it;
} else {
auto& wheel = GetTimingWheelForHierarchyIndex(hierarchy_index);
hierarchical_timing_wheel_handle_accessor_.SetHierarchyIndex(
&element, hierarchy_index);
auto it = wheel.Insert(std::move(element), delay);
return it;
}
}
std::vector<T> Update(TimeTicks now) {
DCHECK(now >= last_wakeup_);
std::vector<T> expired_elements;
while (!small_delay_heap_.empty() &&
get_delayed_run_time_(small_delay_heap_.top()) <= now) {
T element = small_delay_heap_.take_top();
hierarchical_timing_wheel_handle_accessor_.ClearHierarchyIndex(&element);
expired_elements.push_back(std::move(element));
}
std::vector<T> elements;
const TimeDelta time_delta = now - last_wakeup_;
const size_t timing_wheels_delay_upperbound =
SmallestBucketDeltaInMicroseconds * Pow(WheelSize, TotalWheels);
const TimeTicks timing_wheels_maximum_delayed_run_time =
now + Milliseconds(timing_wheels_delay_upperbound);
last_wakeup_ = now;
for (size_t wheel_index = 0; wheel_index < TotalWheels; wheel_index++) {
wheels_[wheel_index].AdvanceTimeAndRemoveExpiredElements(time_delta,
elements);
}
while (!large_delay_heap_.empty() &&
get_delayed_run_time_(large_delay_heap_.top()) <
timing_wheels_maximum_delayed_run_time) {
elements.push_back(std::move(large_delay_heap_.take_top()));
}
for (auto& element : elements) {
if (now >= get_delayed_run_time_(element)) {
hierarchical_timing_wheel_handle_accessor_.ClearHierarchyIndex(
&element);
expired_elements.emplace_back(std::move(element));
} else {
Insert(std::move(element));
}
}
return expired_elements;
}
void Remove(HierarchicalTimingWheelHandle& handle) {
DCHECK(handle.IsValid());
if (handle.GetTimingWheelHandle().IsValid()) {
auto& wheel = GetTimingWheelForHierarchyIndex(handle.GetHierarchyIndex());
wheel.Remove(handle.GetTimingWheelHandle());
} else {
auto& heap = GetHeapForHierarchyIndex(handle.GetHierarchyIndex());
heap.erase(handle.GetHeapHandle());
}
}
typename std::vector<T>::const_reference Top() {
DCHECK_NE(Size(), 0u);
if (!small_delay_heap_.empty()) {
return small_delay_heap_.top();
}
for (size_t i = 0; i < TotalWheels; i++) {
if (wheels_[i].total_elements() != 0) {
return wheels_[i].Top();
}
}
return large_delay_heap_.top();
}
private:
bool IsHeap(size_t hierarchy_index) {
return hierarchy_index == 0 or hierarchy_index == TotalWheels + 1;
}
auto& GetHeapForHierarchyIndex(size_t hierarchy_index) {
DCHECK(hierarchy_index == 0 || hierarchy_index == TotalWheels + 1);
return hierarchy_index == 0 ? small_delay_heap_ : large_delay_heap_;
}
auto& GetTimingWheelForHierarchyIndex(size_t hierarchy_index) {
DCHECK(hierarchy_index > 0);
DCHECK(hierarchy_index < TotalWheels + 1);
return wheels_[hierarchy_index - 1];
}
size_t FindHierarchyIndex(TimeDelta delay) {
DCHECK(!delay.is_zero());
if (delay < Microseconds(SmallestBucketDeltaInMicroseconds))
return 0;
for (size_t i = 0; i < TotalWheels; i++) {
if (delay < (wheels_[i].time_delta_per_bucket() * WheelSize)) {
return i + 1;
}
}
return TotalWheels + 1;
}
constexpr static std::size_t Pow(size_t a, size_t b) {
size_t res = 1;
for (size_t i = 0; i < b; i++) {
res *= a;
}
return res;
}
using Wheel =
typename internal::TimingWheel<T,
WheelSize,
HierarchicalTimingWheelHandleAccessor,
GetDelayedRunTime>;
template <size_t... I>
static std::array<Wheel, TotalWheels> MakeWheels(std::index_sequence<I...>) {
return {(Wheel(Microseconds(SmallestBucketDeltaInMicroseconds *
Pow(WheelSize, I))))...};
}
std::array<Wheel, TotalWheels> wheels_ =
MakeWheels(std::make_index_sequence<TotalWheels>{});
IntrusiveHeap<T, Compare, HierarchicalTimingWheelHandleAccessor>
small_delay_heap_;
IntrusiveHeap<T, Compare, HierarchicalTimingWheelHandleAccessor>
large_delay_heap_;
TimeTicks last_wakeup_;
HierarchicalTimingWheelHandleAccessor
hierarchical_timing_wheel_handle_accessor_;
GetDelayedRunTime get_delayed_run_time_;
};
}
#endif