#ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
#define BASE_OBSERVER_LIST_THREADSAFE_H_
#include <unordered_map>
#include <utility>
#include "base/auto_reset.h"
#include "base/base_export.h"
#include "base/check.h"
#include "base/check_op.h"
#include "base/containers/contains.h"
#include "base/dcheck_is_on.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/observer_list.h"
#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/base/attributes.h"
namespace base {
namespace internal {
class BASE_EXPORT ObserverListThreadSafeBase
: public RefCountedThreadSafe<ObserverListThreadSafeBase> {
public:
struct NotificationDataBase {
NotificationDataBase(void* observer_list_in, const Location& from_here_in)
: observer_list(observer_list_in), from_here(from_here_in) {}
raw_ptr<void> observer_list;
Location from_here;
};
ObserverListThreadSafeBase() = default;
ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete;
ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) =
delete;
protected:
template <typename ObserverType, typename Method>
struct Dispatcher;
template <typename ObserverType, typename ReceiverType, typename... Params>
struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> {
static void Run(void (ReceiverType::*m)(Params...),
Params... params,
ObserverType* obj) {
(obj->*m)(std::forward<Params>(params)...);
}
};
static const NotificationDataBase*& GetCurrentNotification();
virtual ~ObserverListThreadSafeBase() = default;
private:
friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
};
}
template <class ObserverType>
class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase {
public:
enum class AddObserverResult {
kBecameNonEmpty,
kWasAlreadyNonEmpty,
};
enum class RemoveObserverResult {
kWasOrBecameEmpty,
kRemainsNonEmpty,
};
ObserverListThreadSafe() = default;
explicit ObserverListThreadSafe(ObserverListPolicy policy)
: policy_(policy) {}
ObserverListThreadSafe(const ObserverListThreadSafe&) = delete;
ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete;
AddObserverResult AddObserver(ObserverType* observer) {
DCHECK(SequencedTaskRunner::HasCurrentDefault())
<< "An observer can only be registered when "
"SequencedTaskRunner::HasCurrentDefault. If this is in a unit test, "
"you're likely merely missing a "
"base::test::(SingleThread)TaskEnvironment in your fixture. "
"Otherwise, try running this code on a named thread (main/UI/IO) or "
"from a task posted to a base::SequencedTaskRunner or "
"base::SingleThreadTaskRunner.";
AutoLock auto_lock(lock_);
bool was_empty = observers_.empty();
DCHECK(!Contains(observers_, observer));
const scoped_refptr<SequencedTaskRunner> task_runner =
SequencedTaskRunner::GetCurrentDefault();
const size_t observer_id = ++observer_id_counter_;
ObserverTaskRunnerInfo task_info = {task_runner, observer_id};
observers_[observer] = std::move(task_info);
if (policy_ == ObserverListPolicy::ALL) {
if (const NotificationDataBase* const current_notification =
GetCurrentNotification();
current_notification && current_notification->observer_list == this) {
const NotificationData* notification_data =
static_cast<const NotificationData*>(current_notification);
task_runner->PostTask(
current_notification->from_here,
BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
UnsafeDangling(observer),
NotificationData(this, observer_id,
current_notification->from_here,
notification_data->method)));
}
}
return was_empty ? AddObserverResult::kBecameNonEmpty
: AddObserverResult::kWasAlreadyNonEmpty;
}
RemoveObserverResult RemoveObserver(ObserverType* observer) {
AutoLock auto_lock(lock_);
observers_.erase(observer);
return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty
: RemoveObserverResult::kRemainsNonEmpty;
}
void AssertEmpty() const {
#if DCHECK_IS_ON()
AutoLock auto_lock(lock_);
DCHECK(observers_.empty());
#endif
}
template <typename Method, typename... Params>
void Notify(const Location& from_here, Method m, Params&&... params) {
RepeatingCallback<void(ObserverType*)> method =
BindRepeating(&Dispatcher<ObserverType, Method>::Run, m,
std::forward<Params>(params)...);
AutoLock lock(lock_);
for (const auto& observer : observers_) {
observer.second.task_runner->PostTask(
from_here,
BindOnce(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, this,
UnsafeDangling(observer.first),
NotificationData(this, observer.second.observer_id,
from_here, method)));
}
}
private:
friend class RefCountedThreadSafe<ObserverListThreadSafeBase>;
struct NotificationData : public NotificationDataBase {
NotificationData(ObserverListThreadSafe* observer_list_in,
size_t observer_id_in,
const Location& from_here_in,
const RepeatingCallback<void(ObserverType*)>& method_in)
: NotificationDataBase(observer_list_in, from_here_in),
method(method_in),
observer_id(observer_id_in) {}
RepeatingCallback<void(ObserverType*)> method;
size_t observer_id;
};
~ObserverListThreadSafe() override = default;
void NotifyWrapper(MayBeDangling<ObserverType> observer,
const NotificationData& notification) {
{
AutoLock auto_lock(lock_);
DCHECK_EQ(notification.observer_list, this);
auto it = observers_.find(observer);
if (it == observers_.end() ||
it->second.observer_id != notification.observer_id) {
return;
}
DCHECK(it->second.task_runner->RunsTasksInCurrentSequence());
}
const AutoReset<const NotificationDataBase*> resetter_(
&GetCurrentNotification(), ¬ification);
notification.method.Run(observer);
}
const ObserverListPolicy policy_ = ObserverListPolicy::ALL;
mutable Lock lock_;
size_t observer_id_counter_ GUARDED_BY(lock_) = 0;
struct ObserverTaskRunnerInfo {
scoped_refptr<SequencedTaskRunner> task_runner;
size_t observer_id = 0;
};
std::unordered_map<ObserverType*, ObserverTaskRunnerInfo> observers_
GUARDED_BY(lock_);
};
}
#endif