910e62b5创建于 1月15日历史提交
// Copyright 2019 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_
#define MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_

#include <memory>
#include <tuple>

#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/sequenced_task_runner_helpers.h"
#include "base/task/task_runner.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/lib/thread_safe_forwarder_base.h"
#include "mojo/public/cpp/bindings/message.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/runtime_features.h"

namespace mojo {

namespace internal {

template <typename RemoteType>
struct SharedRemoteTraits;

template <typename Interface>
struct SharedRemoteTraits<Remote<Interface>> {
  static void BindDisconnected(Remote<Interface>& remote) {
    std::ignore = remote.BindNewPipeAndPassReceiver();
  }
};

}  // namespace internal

// Helper that may be used from any sequence to serialize |Interface| messages
// and forward them elsewhere. In general, prefer `SharedRemote`, but this type
// may be useful when it's necessary to manually manage the lifetime of the
// underlying proxy object which will be used to ultimately send messages.
template <typename Interface>
class ThreadSafeForwarder : public internal::ThreadSafeForwarderBase {
 public:
  using ProxyType = typename Interface::Proxy_;

  // Constructs a ThreadSafeForwarder through which Messages are forwarded to
  // |forward| or |forward_with_responder| by posting to |task_runner|.
  //
  // Any message sent through this forwarding interface will dispatch its reply,
  // if any, back to the sequence which called the corresponding interface
  // method.
  explicit ThreadSafeForwarder(scoped_refptr<ThreadSafeProxy> thread_safe_proxy)
      : ThreadSafeForwarderBase(std::move(thread_safe_proxy)), proxy_(this) {}

  ThreadSafeForwarder(const ThreadSafeForwarder&) = delete;
  ThreadSafeForwarder& operator=(const ThreadSafeForwarder&) = delete;

  ~ThreadSafeForwarder() override = default;

  ProxyType& proxy() { return proxy_; }

 private:
  ProxyType proxy_;
};

template <typename Interface>
class SharedRemote;

template <typename RemoteType>
class SharedRemoteBase
    : public base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>> {
 public:
  using InterfaceType = typename RemoteType::InterfaceType;
  using PendingType = typename RemoteType::PendingType;

  SharedRemoteBase(const SharedRemoteBase&) = delete;
  SharedRemoteBase& operator=(const SharedRemoteBase&) = delete;

  InterfaceType* get() { return &forwarder_->proxy(); }
  InterfaceType* operator->() { return get(); }
  InterfaceType& operator*() { return *get(); }

  void set_disconnect_handler(
      base::OnceClosure handler,
      scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
    wrapper_->set_disconnect_handler(std::move(handler),
                                     std::move(handler_task_runner));
  }

  void Disconnect() { wrapper_->Disconnect(); }

 private:
  friend class base::RefCountedThreadSafe<SharedRemoteBase<RemoteType>>;
  template <typename Interface>
  friend class SharedRemote;
  template <typename Interface>
  friend class SharedAssociatedRemote;

  struct RemoteWrapperDeleter;

  // Helper class which owns a |RemoteType| instance on an appropriate sequence.
  // This is kept alive as long as it's bound within some ThreadSafeForwarder's
  // callbacks.
  class RemoteWrapper
      : public base::RefCountedThreadSafe<RemoteWrapper, RemoteWrapperDeleter> {
   public:
    RemoteWrapper(PendingType remote,
                  scoped_refptr<base::SequencedTaskRunner> task_runner)
        : task_runner_(std::move(task_runner)),
          remote_(std::move(remote), task_runner_),
          associated_group_(*remote_.internal_state()->associated_group()) {}

    RemoteWrapper(const RemoteWrapper&) = delete;
    RemoteWrapper& operator=(const RemoteWrapper&) = delete;

    std::unique_ptr<ThreadSafeForwarder<InterfaceType>> CreateForwarder(
        const base::Location& location) {
      return std::make_unique<ThreadSafeForwarder<InterfaceType>>(
          remote_.internal_state()->CreateThreadSafeProxy(
              base::MakeRefCounted<ProxyTarget>(this), location));
    }

    void set_disconnect_handler(
        base::OnceClosure handler,
        scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
      if (!task_runner_->RunsTasksInCurrentSequence()) {
        // Make sure we modify the remote's disconnect handler on the
        // correct sequence.
        task_runner_->PostTask(
            FROM_HERE,
            base::BindOnce(&RemoteWrapper::set_disconnect_handler, this,
                           std::move(handler), std::move(handler_task_runner)));
        return;
      }
      // The actual handler will post a task to run |handler| on
      // |handler_task_runner|.
      auto wrapped_handler =
          base::BindOnce(base::IgnoreResult(&base::TaskRunner::PostTask),
                         handler_task_runner, FROM_HERE, std::move(handler));
      // Because we may have had to post a task to set this handler,
      // this call may land after the remote has just been disconnected.
      // Manually invoke the handler in that case.
      if (!remote_.is_connected()) {
        std::move(wrapped_handler).Run();
        return;
      }
      remote_.set_disconnect_handler(std::move(wrapped_handler));
    }

    void Disconnect() {
      if (!task_runner_->RunsTasksInCurrentSequence()) {
        task_runner_->PostTask(
            FROM_HERE, base::BindOnce(&RemoteWrapper::Disconnect, this));
        return;
      }

      // Reset the remote and rebind it in a disconnected state, so that it's
      // usable but discards all messages.
      remote_.reset();
      internal::SharedRemoteTraits<RemoteType>::BindDisconnected(remote_);
    }

   private:
    friend struct RemoteWrapperDeleter;
    friend class base::DeleteHelper<RemoteWrapper>;

    ~RemoteWrapper() = default;

    // This provides a roundabout way for a ThreadSafeProxy to hold a reference
    // back to the RemoteWrapper which created it. The purpose is to ensure that
    // the RemoteWrapper lives at least as long as the ThreadSafeProxy, which in
    // turn ensures that it lives at least as long as any outgoing message task.
    class ProxyTarget : public ThreadSafeProxy::Target {
     public:
      explicit ProxyTarget(scoped_refptr<RemoteWrapper> wrapper)
          : wrapper_(std::move(wrapper)) {}

     private:
      ~ProxyTarget() override = default;

      const scoped_refptr<RemoteWrapper> wrapper_;
    };

    void DeleteOnCorrectThread() const {
      if (!task_runner_->RunsTasksInCurrentSequence()) {
        // NOTE: This is only called when there are no more references to
        // |this|, so binding it unretained is both safe and necessary.
        task_runner_->DeleteSoon(FROM_HERE, this);
      } else {
        delete this;
      }
    }

    const scoped_refptr<base::SequencedTaskRunner> task_runner_;
    RemoteType remote_;
    AssociatedGroup associated_group_;
  };

  struct RemoteWrapperDeleter {
    static void Destruct(const RemoteWrapper* wrapper) {
      wrapper->DeleteOnCorrectThread();
    }
  };

  explicit SharedRemoteBase(scoped_refptr<RemoteWrapper> wrapper,
                            const base::Location& location)
      : wrapper_(std::move(wrapper)),
        forwarder_(wrapper_->CreateForwarder(location)) {}

  // Creates a SharedRemoteBase bound to `pending_remote`. All messages sent
  // through the SharedRemote will first bounce through `task_runner`.
  static scoped_refptr<SharedRemoteBase> Create(
      PendingType pending_remote,
      scoped_refptr<base::SequencedTaskRunner> task_runner,
      const base::Location& location) {
    return new SharedRemoteBase(
        base::MakeRefCounted<RemoteWrapper>(std::move(pending_remote),
                                            std::move(task_runner)),
        location);
  }

  ~SharedRemoteBase() = default;

  const scoped_refptr<RemoteWrapper> wrapper_;
  const std::unique_ptr<ThreadSafeForwarder<InterfaceType>> forwarder_;
};

// SharedRemote wraps a non-thread-safe Remote and proxies messages to it. Note
// that SharedRemote itself is also NOT THREAD-SAFE, but unlike Remote it IS
// copyable cross-thread, and each copy is usable from its own thread. The
// trade-off compared to a Remote is some additional overhead and latency in
// message transmission, as sending a message usually incurs a task hop.
//
// Async calls are posted to the bound sequence (the sequence that the
// underlying Remote is bound to, i.e. |bind_task_runner| below), and responses
// are posted back to the calling sequence. Sync calls are dispatched directly
// if the call is made on the bound sequence, or posted otherwise.
//
// This means that in general, when making calls from sequences other than the
// bound sequence, a hop is first made *to* the bound sequence; and when
// receiving replies, a hop is made *from* the bound the sequence.
//
// Note that sync calls only block the calling sequence.
template <typename Interface>
class SharedRemote {
 public:
  // Constructs an unbound SharedRemote. This object cannot issue Interface
  // method calls and does not schedule any tasks. A default-constructed
  // SharedRemote may be replaced with a bound one via copy- or move-assignment.
  SharedRemote() = default;

  // Constructs a SharedRemote bound to `pending_remote` on the calling
  // sequence. See `Bind()` below for more details.
  explicit SharedRemote(
      PendingRemote<Interface> pending_remote,
      const base::Location& location = base::Location::Current()) {
    Bind(std::move(pending_remote), nullptr, location);
  }

  // Constructs a SharedRemote bound to `pending_remote` on the sequence given
  // by `bind_task_runner`. See `Bind()` below for more details.
  SharedRemote(PendingRemote<Interface> pending_remote,
               scoped_refptr<base::SequencedTaskRunner> bind_task_runner,
               const base::Location& location = base::Location::Current()) {
    Bind(std::move(pending_remote), std::move(bind_task_runner), location);
  }

  // SharedRemote supports both copy and move construction and assignment. These
  // are explicitly defaulted here for clarity.
  SharedRemote(const SharedRemote&) = default;
  SharedRemote(SharedRemote&&) = default;
  SharedRemote& operator=(const SharedRemote&) = default;
  SharedRemote& operator=(SharedRemote&&) = default;

  bool is_bound() const { return remote_ != nullptr; }
  explicit operator bool() const { return is_bound(); }

  Interface* get() const { return remote_->get(); }
  Interface* operator->() const { return get(); }
  Interface& operator*() const { return *get(); }

  void set_disconnect_handler(
      base::OnceClosure handler,
      scoped_refptr<base::SequencedTaskRunner> handler_task_runner) {
    remote_->set_disconnect_handler(std::move(handler),
                                    std::move(handler_task_runner));
  }

  // Clears this SharedRemote. Note that this does *not* necessarily close the
  // remote's endpoint as other SharedRemote instances may reference the same
  // underlying endpoint.
  void reset() { remote_.reset(); }

  // Disconnects the SharedRemote. This leaves the object in a usable state --
  // i.e. it's still safe to dereference and make calls -- but severs the
  // underlying connection so that no new replies will be received and all
  // outgoing messages will be discarded. This is useful when you want to force
  // a disconnection like with reset(), but you don't want the SharedRemote to
  // become unbound.
  void Disconnect() { remote_->Disconnect(); }

  // Binds this SharedRemote to `pending_remote` on the sequence given by
  // `bind_task_runner`, or the calling sequence if `bind_task_runner` is null.
  // Once bound, the SharedRemote may be used to send messages on the underlying
  // Remote. Messages always bounce through `bind_task_runner` before sending,
  // unless the caller is issuing a [Sync] call from `bind_task_runner` already;
  // in which case this behaves exactly like a regular Remote for that call.
  //
  // Any reply received by the SharedRemote is dispatched to whatever
  // SequencedTaskRunner was current when the corresponding request was made.
  //
  // A bound SharedRemote may be copied any number of times, to any number of
  // threads. Each copy sends messages through the same underlying Remote, after
  // bouncing through the same `bind_task_runner`.
  //
  // If this SharedRemote was already bound, it will be effectively unbound by
  // this call and re-bound to `pending_remote`. Any prior copies made are NOT
  // affected and will retain their reference to the original Remote.
  void Bind(PendingRemote<Interface> pending_remote,
            scoped_refptr<base::SequencedTaskRunner> bind_task_runner,
            const base::Location& location = base::Location::Current()) {
    if (!internal::GetRuntimeFeature_ExpectEnabled<Interface>()) {
      remote_.reset();
      return;
    }
    if (bind_task_runner && pending_remote) {
      remote_ = SharedRemoteBase<Remote<Interface>>::Create(
          std::move(pending_remote), std::move(bind_task_runner), location);
    } else if (pending_remote) {
      remote_ = SharedRemoteBase<Remote<Interface>>::Create(
          std::move(pending_remote),
          base::SequencedTaskRunner::GetCurrentDefault(), location);
    }
  }

  // Creates a new pipe, binding this SharedRemote to one end on
  // `bind_task_runner` and returning the other end as a PendingReceiver.
  PendingReceiver<Interface> BindNewPipeAndPassReceiver(
      scoped_refptr<base::SequencedTaskRunner> bind_task_runner = nullptr,
      const base::Location& location = base::Location::Current()) {
    if (!internal::GetRuntimeFeature_ExpectEnabled<Interface>()) {
      return PendingReceiver<Interface>();
    }
    PendingRemote<Interface> remote;
    auto receiver = remote.InitWithNewPipeAndPassReceiver();
    Bind(std::move(remote), std::move(bind_task_runner), location);
    return receiver;
  }

 private:
  scoped_refptr<SharedRemoteBase<Remote<Interface>>> remote_;
};

}  // namespace mojo

#endif  // MOJO_PUBLIC_CPP_BINDINGS_SHARED_REMOTE_H_