910e62b5创建于 1月15日历史提交
// Copyright 2012 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "ipc/ipc_sync_channel.h"

#include <stddef.h>
#include <stdint.h>

#include <memory>
#include <utility>

#include "base/functional/bind.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
#include "ipc/ipc_channel_factory.h"
#include "ipc/param_traits_macros.h"
#include "mojo/public/cpp/bindings/sync_event_watcher.h"

using base::WaitableEvent;

namespace IPC {

namespace {

// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
constinit thread_local SyncChannel::ReceivedSyncMsgQueue* received_queue =
    nullptr;

}  // namespace

// Currently, this class remains to manage a shared waitable event on
// a thread across a number of sync channels on that thread.
class SyncChannel::ReceivedSyncMsgQueue :
    public base::RefCountedThreadSafe<ReceivedSyncMsgQueue> {
 public:
  // Returns the ReceivedSyncMsgQueue instance for this thread, creating one
  // if necessary.  Call RemoveContext on the same thread when done.
  static ReceivedSyncMsgQueue* AddContext() {
    // We want one ReceivedSyncMsgQueue per listener thread (i.e. since multiple
    // SyncChannel objects can block the same thread).
    if (!received_queue) {
      received_queue = new ReceivedSyncMsgQueue();
    }
    ++received_queue->listener_count_;
    return received_queue;
  }

  // SyncChannel calls this in its destructor.
  void RemoveContext(SyncContext* context) {
    base::AutoLock auto_lock(message_lock_);

    if (--listener_count_ == 0) {
      DCHECK(received_queue);
      received_queue = nullptr;
      sync_dispatch_watcher_.reset();
    }
  }

  base::WaitableEvent* dispatch_event() { return &dispatch_event_; }

 private:
  friend class base::RefCountedThreadSafe<ReceivedSyncMsgQueue>;

  // See the comment in SyncChannel::SyncChannel for why this event is created
  // as manual reset.
  ReceivedSyncMsgQueue()
      : dispatch_event_(base::WaitableEvent::ResetPolicy::MANUAL,
                        base::WaitableEvent::InitialState::NOT_SIGNALED),
        sync_dispatch_watcher_(std::make_unique<mojo::SyncEventWatcher>(
            &dispatch_event_,
            base::BindRepeating(&ReceivedSyncMsgQueue::OnDispatchEventReady,
                                base::Unretained(this)))) {
    sync_dispatch_watcher_->AllowWokenUpBySyncWatchOnSameThread();
  }

  ~ReceivedSyncMsgQueue() = default;

  void OnDispatchEventReady() {}

  // Signaled when we get a synchronous message that we must respond to, as the
  // sender needs its reply before it can reply to our original synchronous
  // message.
  base::WaitableEvent dispatch_event_;
  base::Lock message_lock_;
  int listener_count_ = 0;

  // Watches |dispatch_event_| during all sync handle watches on this thread.
  std::unique_ptr<mojo::SyncEventWatcher> sync_dispatch_watcher_;
};

SyncChannel::SyncContext::SyncContext(
    Listener* listener,
    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    WaitableEvent* shutdown_event)
    : ChannelProxy::Context(listener, ipc_task_runner, listener_task_runner),
      received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
      shutdown_event_(shutdown_event) {}

SyncChannel::SyncContext::~SyncContext() = default;

base::WaitableEvent* SyncChannel::SyncContext::GetDispatchEvent() {
  return received_sync_msgs_->dispatch_event();
}

void SyncChannel::SyncContext::Clear() {
  received_sync_msgs_->RemoveContext(this);
  Context::Clear();
}

void SyncChannel::SyncContext::OnChannelError() {
  shutdown_watcher_.StopWatching();
  Context::OnChannelError();
}

void SyncChannel::SyncContext::OnChannelOpened() {
  if (shutdown_event_) {
    shutdown_watcher_.StartWatching(
        shutdown_event_,
        base::BindOnce(&SyncChannel::SyncContext::OnShutdownEventSignaled,
                       base::Unretained(this)),
        base::SequencedTaskRunner::GetCurrentDefault());
  }
  Context::OnChannelOpened();
}

void SyncChannel::SyncContext::OnChannelClosed() {
  shutdown_watcher_.StopWatching();
  Context::OnChannelClosed();
}

void SyncChannel::SyncContext::OnShutdownEventSignaled(WaitableEvent* event) {
  DCHECK_EQ(event, shutdown_event_);
}

// static
std::unique_ptr<SyncChannel> SyncChannel::Create(
    Listener* listener,
    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    WaitableEvent* shutdown_event) {
  return base::WrapUnique(new SyncChannel(
      listener, ipc_task_runner, listener_task_runner, shutdown_event));
}

SyncChannel::SyncChannel(
    Listener* listener,
    const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
    const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner,
    WaitableEvent* shutdown_event)
    : ChannelProxy(new SyncContext(listener,
                                   ipc_task_runner,
                                   listener_task_runner,
                                   shutdown_event)),
      sync_handle_registry_(mojo::SyncHandleRegistry::current()) {
  // The current (listener) thread must be distinct from the IPC thread, or else
  // sending synchronous messages will deadlock.
  DCHECK_NE(ipc_task_runner.get(),
            base::SingleThreadTaskRunner::GetCurrentDefault().get());
  StartWatching();
}

SyncChannel::~SyncChannel() = default;

void SyncChannel::OnDispatchEventSignaled(base::WaitableEvent* event) {
  DCHECK_EQ(sync_context()->GetDispatchEvent(), event);
  sync_context()->GetDispatchEvent()->Reset();
  StartWatching();
}

void SyncChannel::StartWatching() {
  // |dispatch_watcher_| watches the event asynchronously, only dispatching
  // messages once the listener thread is unblocked and pumping its task queue.
  // The ReceivedSyncMsgQueue also watches this event and may dispatch
  // immediately if woken up by a message which it's allowed to dispatch.
  dispatch_watcher_.StartWatching(
      sync_context()->GetDispatchEvent(),
      base::BindOnce(&SyncChannel::OnDispatchEventSignaled,
                     base::Unretained(this)),
      sync_context()->listener_task_runner());
}

}  // namespace IPC