#ifndef CHROMECAST_BASE_OBSERVER_H_
#define CHROMECAST_BASE_OBSERVER_H_
#include <stddef.h>
#include <stdint.h>
#include <memory>
#include <utility>
#include <vector>
#include "base/check_op.h"
#include "base/containers/contains.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/location.h"
#include "base/memory/raw_ref.h"
#include "base/memory/ref_counted.h"
#include "base/notreached.h"
#include "base/sequence_checker.h"
#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"
namespace chromecast {
namespace subtle {
template <typename T>
class ObservableInternals;
}
template <typename T>
class Observable;
template <typename T>
class Observer {
public:
Observer(const Observer& other);
Observer& operator=(const Observer&) = delete;
~Observer();
void SetOnUpdateCallback(base::RepeatingClosure callback) {
on_update_callback_ = std::move(callback);
}
const T& GetValue() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return *value_;
}
private:
friend class subtle::ObservableInternals<T>;
friend class Observable<T>;
explicit Observer(scoped_refptr<subtle::ObservableInternals<T>> internals);
void OnUpdate();
const scoped_refptr<subtle::ObservableInternals<T>> internals_;
const raw_ref<const T> value_;
base::RepeatingClosure on_update_callback_;
SEQUENCE_CHECKER(sequence_checker_);
};
template <typename T>
class Observable {
static_assert(std::is_copy_constructible<T>::value,
"Observable values must be copyable");
static_assert(std::is_copy_assignable<T>::value,
"Observable values must be copy-assignable");
public:
explicit Observable(const T& initial_value);
Observable(const Observable&) = delete;
Observable& operator=(const Observable&) = delete;
Observer<T> Observe();
void SetValue(const T& new_value);
const T& GetValue() const;
T GetValueThreadSafe() const;
private:
const scoped_refptr<subtle::ObservableInternals<T>> internals_;
};
namespace subtle {
template <typename T>
class ObservableInternals
: public base::RefCountedThreadSafe<ObservableInternals<T>> {
public:
explicit ObservableInternals(const T& initial_value)
: value_(initial_value) {}
ObservableInternals(const ObservableInternals&) = delete;
ObservableInternals& operator=(const ObservableInternals&) = delete;
void SetValue(const T& new_value) {
base::AutoLock lock(lock_);
value_ = new_value;
for (auto& item : per_sequence_) {
item.SetValue(new_value);
}
}
const T& GetValue() const { return value_; }
T GetValueThreadSafe() const {
base::AutoLock lock(lock_);
return value_;
}
const T& AddObserver(Observer<T>* observer) {
DCHECK(observer);
DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();
base::AutoLock lock(lock_);
auto it = per_sequence_.begin();
while (it != per_sequence_.end() && it->task_runner() != task_runner) {
++it;
}
if (it == per_sequence_.end()) {
per_sequence_.emplace_back(std::move(task_runner), value_);
it = --per_sequence_.end();
}
it->AddObserver(observer);
return it->value();
}
void RemoveObserver(Observer<T>* observer) {
DCHECK(observer);
DCHECK(base::SequencedTaskRunner::HasCurrentDefault());
auto task_runner = base::SequencedTaskRunner::GetCurrentDefault();
base::AutoLock lock(lock_);
for (size_t i = 0; i < per_sequence_.size(); ++i) {
if (per_sequence_[i].task_runner() == task_runner) {
per_sequence_[i].RemoveObserver(observer);
if (per_sequence_[i].Empty()) {
per_sequence_[i].Swap(per_sequence_.back());
per_sequence_.pop_back();
}
return;
}
}
NOTREACHED() << "Tried to remove observer from unknown task runner";
}
private:
class SequenceOwnedInfo {
public:
explicit SequenceOwnedInfo(const T& value) : value_(value) {}
SequenceOwnedInfo(const SequenceOwnedInfo&) = delete;
SequenceOwnedInfo& operator=(const SequenceOwnedInfo&) = delete;
const T& value() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return value_;
}
void AddObserver(Observer<T>* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(observer);
DCHECK(!base::Contains(observers_, observer));
observers_.push_back(observer);
}
void RemoveObserver(Observer<T>* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(observer);
DCHECK(base::Contains(observers_, observer));
std::erase(observers_, observer);
}
bool Empty() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return observers_.empty();
}
void SetValue(const T& value) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
value_ = value;
for (auto* obs : observers_) {
obs->OnUpdate();
}
}
static void Destroy(std::unique_ptr<SequenceOwnedInfo> self) {
}
private:
std::vector<Observer<T>*> observers_;
T value_;
SEQUENCE_CHECKER(sequence_checker_);
};
class PerSequenceInfo {
public:
PerSequenceInfo(scoped_refptr<base::SequencedTaskRunner> task_runner,
const T& value)
: task_runner_(std::move(task_runner)),
owned_info_(std::make_unique<SequenceOwnedInfo>(value)) {}
PerSequenceInfo(PerSequenceInfo&& other) = default;
~PerSequenceInfo() {
if (!owned_info_) {
return;
}
DCHECK(Empty());
task_runner_->PostNonNestableTask(
FROM_HERE,
base::BindOnce(&SequenceOwnedInfo::Destroy, std::move(owned_info_)));
}
const T& value() const { return owned_info_->value(); }
const base::SequencedTaskRunner* task_runner() const {
return task_runner_.get();
}
void AddObserver(Observer<T>* observer) {
owned_info_->AddObserver(observer);
}
void RemoveObserver(Observer<T>* observer) {
owned_info_->RemoveObserver(observer);
}
bool Empty() const { return owned_info_->Empty(); }
void Swap(PerSequenceInfo& other) {
std::swap(task_runner_, other.task_runner_);
std::swap(owned_info_, other.owned_info_);
}
void SetValue(const T& value) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&SequenceOwnedInfo::SetValue,
base::Unretained(owned_info_.get()), value));
}
private:
scoped_refptr<base::SequencedTaskRunner> task_runner_;
std::unique_ptr<SequenceOwnedInfo> owned_info_;
};
friend class base::RefCountedThreadSafe<ObservableInternals>;
~ObservableInternals() {}
mutable base::Lock lock_;
T value_;
std::vector<PerSequenceInfo> per_sequence_;
};
}
template <typename T>
Observer<T>::Observer(scoped_refptr<subtle::ObservableInternals<T>> internals)
: internals_(std::move(internals)), value_(internals_->AddObserver(this)) {}
template <typename T>
Observer<T>::Observer(const Observer& other) : Observer(other.internals_) {}
template <typename T>
Observer<T>::~Observer() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
internals_->RemoveObserver(this);
}
template <typename T>
void Observer<T>::OnUpdate() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (on_update_callback_) {
on_update_callback_.Run();
}
}
template <typename T>
Observable<T>::Observable(const T& initial_value)
: internals_(base::WrapRefCounted(
new subtle::ObservableInternals<T>(initial_value))) {}
template <typename T>
Observer<T> Observable<T>::Observe() {
return Observer<T>(internals_);
}
template <typename T>
void Observable<T>::SetValue(const T& new_value) {
internals_->SetValue(new_value);
}
template <typename T>
const T& Observable<T>::GetValue() const {
return internals_->GetValue();
}
template <typename T>
T Observable<T>::GetValueThreadSafe() const {
return internals_->GetValueThreadSafe();
}
}
#endif