910e62b5创建于 1月15日历史提交
// Copyright 2023 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/cpp/bindings/direct_receiver.h"

#include <memory>
#include <utility>

#include "base/barrier_closure.h"
#include "base/functional/bind.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ref.h"
#include "base/message_loop/message_pump_type.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/sequence_bound.h"
#include "base/threading/thread.h"
#include "mojo/core/embedder/embedder.h"
#include "mojo/core/ipcz_api.h"
#include "mojo/core/test/mojo_test_base.h"
#include "mojo/public/cpp/bindings/pending_receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/tests/direct_receiver_unittest.test-mojom.h"
#include "testing/gtest/include/gtest/gtest.h"
#include "third_party/ipcz/src/test/test_base.h"

namespace mojo::test::direct_receiver_unittest {

namespace {

// Helper to run a closure on `thread`, blocking until it completes.
template <typename Fn>
void RunOn(base::Thread& thread, Fn closure) {
  base::WaitableEvent wait;
  thread.task_runner()->PostTask(FROM_HERE, base::BindLambdaForTesting([&] {
                                   closure();
                                   wait.Signal();
                                 }));
  wait.Wait();
}

// Stubs for creating a dummy handle using ipcz.Box().

IpczResult DummyBoxSerializer(uintptr_t object,
                              uint32_t flags,
                              const void* options,
                              volatile void* data,
                              size_t* num_bytes,
                              IpczHandle* handles,
                              size_t* num_handles) {
  *num_bytes = 0;
  *num_handles = 0;
  return IPCZ_RESULT_OK;
}

void DummyBoxDestructor(uintptr_t object, uint32_t flags, const void* options) {
}

// Setting ThreadLocalNode's portal handle to a null ScopedHandle causes
// AdoptPipe() to crash. Just closing it causes AdoptPipe() to appear to
// succeed, but not actually transfer the pipe to the local node. So we need
// to pass a valid non-portal handle, which will cause ipcz.Put() to fail with
// IPCZ_RESULT_INVALID_ARGUMENT.
ScopedHandle CreateDummyHandle(IpczHandle node) {
  const IpczBoxContents dummy_contents{
      .size = sizeof(IpczBoxContents),
      .type = IPCZ_BOX_TYPE_APPLICATION_OBJECT,
      .object = {.application_object = 0},
      .serializer = &DummyBoxSerializer,
      .destructor = &DummyBoxDestructor,
  };
  IpczHandle dummy_handle;
  const IpczResult box_result = core::GetIpczAPI().Box(
      node, &dummy_contents, IPCZ_NO_FLAGS, nullptr, &dummy_handle);
  EXPECT_EQ(box_result, IPCZ_RESULT_OK);
  EXPECT_NE(dummy_handle, IPCZ_INVALID_HANDLE);
  return ScopedHandle{Handle{dummy_handle}};
}

}  // namespace

// We use an ipcz-internal test fixture as a base because it provides useful
// introspection into internal portal state. We also use MojoTestBase because it
// provides multiprocess test facilities.
class DirectReceiverTest : public ipcz::test::internal::TestBase,
                           public core::test::MojoTestBase {
  void SetUp() override {
    if (!core::IsMojoIpczEnabled()) {
      GTEST_SKIP() << "This test is only valid when MojoIpcz is enabled.";
    }
  }

 private:
  base::test::TaskEnvironment task_environment_;
};

// From the time this object's constructor returns, and until the object is
// destroyed, the IO thread will not run any new tasks.
class ScopedPauseIOThread {
 public:
  ScopedPauseIOThread() {
    base::RunLoop loop;
    core::GetIOTaskRunner()->PostTask(
        FROM_HERE,
        base::BindLambdaForTesting([this, quit = loop.QuitClosure()] {
          // It's OK for the caller to continue before we start waiting. What's
          // important is that this is the last task the IO thread will run
          // until it's unpaused.
          quit.Run();
          unblock_event_.Wait();
        }));
    loop.Run();
  }

  ~ScopedPauseIOThread() {
    base::RunLoop loop;
    unblock_event_.Signal();
    core::GetIOTaskRunner()->PostTask(FROM_HERE, loop.QuitClosure());
    loop.Run();
  }

 private:
  base::WaitableEvent unblock_event_;
};

class ServiceImpl : public mojom::Service {
 public:
  explicit ServiceImpl(scoped_refptr<base::SingleThreadTaskRunner> task_runner)
      : task_runner_(std::move(task_runner)) {}
  ~ServiceImpl() override = default;

  DirectReceiver<mojom::Service>& receiver() { return receiver_; }

  // Simulates a failure in the underlying transport of the ThreadLocalNode,
  // which should cause DirectReceiver::Bind() to fall back to behaving like a
  // normal receiver.
  void SimulateFailure() {
    receiver_.node_for_testing().ReplacePortalForTesting(
        CreateDummyHandle(receiver_.node_for_testing().node()));
  }

  // Binds our DirectReceiver to `receiver` and then uses a test-only API to
  // pause it internally, preventing Mojo bindings from processing incoming
  // messages. This is needed so we can use some ipcz test facilities to wait
  // for a direct internal link for the pipe, a process which uses non-Mojo
  // communication. Signals `bound_event` when finished. `ping_event` is
  // retained by the ServiceImpl and signaled when it receives its first Ping()
  // call.
  void BindAndPauseReceiver(PendingReceiver<mojom::Service> receiver,
                            base::WaitableEvent* bound_event,
                            base::WaitableEvent* ping_event) {
    ping_event_ = ping_event;
    receiver_.Bind(std::move(receiver));
    receiver_.receiver_for_testing().Pause();
    bound_event->Signal();
  }

  void UnpauseReceiver() { receiver_.receiver_for_testing().Resume(); }

  // Exposes the underlying portal used by the DirectReceiver on its own node.
  // The ServiceRunner below needs this to wait for the portal to establish a
  // direct link to the child.
  IpczHandle GetReceiverPortal() {
    return receiver_.receiver_for_testing().internal_state()->handle().value();
  }

  // mojom::Service:
  void Ping(PingCallback callback) override {
    EXPECT_TRUE(task_runner_->BelongsToCurrentThread());
    std::move(callback).Run();
    if (auto event = std::exchange(ping_event_, nullptr)) {
      event->Signal();
    }
  }

 private:
  DirectReceiver<mojom::Service> receiver_{DirectReceiverKey{}, this};
  const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
  raw_ptr<base::WaitableEvent> ping_event_ = nullptr;
};

class ServiceRunner {
 public:
  // Creates a service runner that uses `shared_thread` to run a ServiceImpl.
  // If `shared_thread` is null, spawns a new background thread to use.
  explicit ServiceRunner(DirectReceiverTest& test,
                         base::Thread* shared_thread = nullptr)
      : test_(test), impl_thread_(shared_thread) {
    if (!impl_thread_) {
      owned_thread_ = std::make_unique<base::Thread>("Impl Thread");
      impl_thread_ = owned_thread_.get();
      impl_thread_->StartWithOptions(
          base::Thread::Options{base::MessagePumpType::IO, 0});
    }
  }

  ~ServiceRunner() { service_.SynchronouslyResetForTest(); }

  base::Thread* impl_thread() const { return impl_thread_; }

  // Runs a new ServiceImpl on the service thread, where it binds to `receiver`
  // via a DirectReceiver. This call only returns once the ServiceImpl is
  // running, fully bound, and has a direct link to its child's portal; thus
  // ensuring that in a well-behaved system, all child IPC to the service goes
  // directly to the service thread without an intermediate IO thread hop.
  // However if `simulate_failure` is true, this is NOT a well-behaved system,
  // so child IPC should be received on the service thread WITH an intermediate
  // thread hop (as opposed to some other failure mode). `ping_event` is
  // signaled when the service receives its first Ping() call.
  void Start(PendingReceiver<mojom::Service> receiver,
             base::WaitableEvent& ping_event,
             bool simulate_failure = false) {
    service_.emplace(impl_thread_->task_runner(), impl_thread_->task_runner());

    if (simulate_failure) {
      service_.AsyncCall(&ServiceImpl::SimulateFailure);
    }

    // First the service bound on its own thread.
    base::WaitableEvent bound_event;
    service_.AsyncCall(&ServiceImpl::BindAndPauseReceiver)
        .WithArgs(std::move(receiver), &bound_event, &ping_event);
    bound_event.Wait();

    // Now wait for its portal to have a direct link. This internally uses some
    // non-Mojo message passing to synchronize with the child, but it's safe
    // because the Receiver is paused and any received message from the child
    // will be removed from the pipe before unpausing below.
    base::RunLoop wait_loop;
    service_.AsyncCall(&ServiceImpl::GetReceiverPortal)
        .Then(base::BindLambdaForTesting([&](IpczHandle portal) {
          test_->WaitForDirectRemoteLink(portal);
          wait_loop.Quit();
        }));
    wait_loop.Run();

    // Now the receiver can resume listening for messages.
    base::RunLoop unpause_loop;
    service_.AsyncCall(&ServiceImpl::UnpauseReceiver)
        .Then(unpause_loop.QuitClosure());
    unpause_loop.Run();
  }

 private:
  const raw_ref<DirectReceiverTest> test_;
  // Optional service thread owned by this ServiceRunner. Must come before
  // `impl_thread_` so it's destroyed after it, to avoid a dangling raw_ptr.
  std::unique_ptr<base::Thread> owned_thread_;
  // The service thread that will be used (either `owned_thread_` or a shared
  // service thread owned by some other ServiceRunner).
  raw_ptr<base::Thread> impl_thread_;
  base::SequenceBound<ServiceImpl> service_;
};

TEST_F(DirectReceiverTest, NoIOThreadHopInBroker) {
  ServiceRunner runner{*this};
  RunTestClient("NoIOThreadHopInBroker_Child", [&](MojoHandle child) {
    PendingRemote<mojom::Service> remote;
    PendingReceiver<mojom::Service> receiver =
        remote.InitWithNewPipeAndPassReceiver();

    // Pass the child its pipe.
    MojoHandle remote_pipe = remote.PassPipe().release().value();
    WriteMessageWithHandles(child, "", &remote_pipe, 1);

    // Start the service. This blocks until it has a direct link to the child
    // portal (i.e. no proxies), so any message sent from the child after this
    // returns should not hop through our IO thread, but should instead go
    // directly to the ServiceImpl's thread.
    base::WaitableEvent ping_event;
    runner.Start(std::move(receiver), ping_event);

    // Pause the IO thread and tell the child to ping the service.
    {
      ScopedPauseIOThread pause_io;
      WriteMessage(child, "ok go");

      // Now wait for receipt of the Ping() before unblocking IO. The only way
      // this wait can terminate is if the service receives the child's Ping()
      // IPC directly.
      ping_event.Wait();
    }

    // Wait for the child to finish.
    EXPECT_EQ("done", ReadMessage(child));
  });
}

DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoIOThreadHopInBroker_Child,
                                  DirectReceiverTest,
                                  test_pipe_handle) {
  const ScopedMessagePipeHandle test_pipe{MessagePipeHandle{test_pipe_handle}};

  // Before binding to a Remote, wait for the pipe's portal to have a direct
  // link to the service.
  MojoHandle handle;
  ReadMessageWithHandles(test_pipe->value(), &handle, 1);
  WaitForDirectRemoteLink(handle);

  Remote<mojom::Service> service{PendingRemote<mojom::Service>{
      MakeScopedHandle(MessagePipeHandle{handle}), 0}};

  // Wait for the test to be ready for our Ping(), ensuring that its IO thread
  // is paused first.
  EXPECT_EQ("ok go", ReadMessage(test_pipe->value()));

  base::RunLoop loop;
  service->Ping(loop.QuitClosure());
  loop.Run();

  WriteMessage(test_pipe->value(), "done");
}

TEST_F(DirectReceiverTest, NoIOThreadHopInNonBrokerProcess) {
  PendingRemote<mojom::Service> remote;
  PendingReceiver<mojom::Service> receiver =
      remote.InitWithNewPipeAndPassReceiver();
  RunTestClient("NoIOThreadHopInNonBroker_Child", [&](MojoHandle child) {
    MojoHandle service_pipe = receiver.PassPipe().release().value();
    WriteMessageWithHandles(child, "", &service_pipe, 1);

    // Before binding it to a Remote, wait for the pipe's portal to have a
    // direct link to the service portal in the child process.
    WaitForDirectRemoteLink(remote.internal_state()->pipe->value());
    Remote<mojom::Service> service{std::move(remote)};

    // Wait for the child to be ready for our Ping(), ensuring that its IO
    // thread is paused first.
    EXPECT_EQ("ok go", ReadMessage(child));

    base::RunLoop loop;
    service->Ping(loop.QuitClosure());
    loop.Run();

    // Wait for the child to finish.
    EXPECT_EQ("done", ReadMessage(child));
  });
}

DEFINE_TEST_CLIENT_TEST_WITH_PIPE(NoIOThreadHopInNonBroker_Child,
                                  DirectReceiverTest,
                                  test_pipe_handle) {
  const ScopedMessagePipeHandle test_pipe{MessagePipeHandle{test_pipe_handle}};

  // First get the service pipe from the test process.
  MojoHandle service_pipe;
  ReadMessageWithHandles(test_pipe->value(), &service_pipe, 1);
  PendingReceiver<mojom::Service> receiver{
      ScopedMessagePipeHandle{MessagePipeHandle{service_pipe}}};

  // Start the service. This blocks until it has a direct link to the test
  // process's portal (i.e. no proxies), so any message sent from the test
  // process after this returns should not hop through our IO thread, but should
  // instead go directly to the ServiceImpl's thread.
  ServiceRunner runner{*this};
  base::WaitableEvent ping_event;
  runner.Start(std::move(receiver), ping_event);

  // Pause the IO thread and tell the test process to ping the service.
  {
    ScopedPauseIOThread pause_io;
    WriteMessage(test_pipe->value(), "ok go");

    // Now wait for receipt of the Ping() before unblocking IO. The only way
    // this wait can terminate is if the service receives the Ping() directly.
    ping_event.Wait();
  }

  WriteMessage(test_pipe->value(), "done");
}

TEST_F(DirectReceiverTest, FallbackToIOThreadHopOnFailure) {
  // Create two services using the same service thread. One will have a direct
  // connection, so it can get messages from the child when the IO thread is
  // paused. The other simulates a failure in ThreadLocalNode::AdoptPipe, which
  // should cause it to fall back to an IO thread hop.
  ServiceRunner direct_runner{*this};
  ServiceRunner fallback_runner{*this, direct_runner.impl_thread()};
  RunTestClient("FallbackToIOThreadHopOnFailure_Child", [&](MojoHandle child) {
    PendingRemote<mojom::Service> direct_remote;
    PendingReceiver<mojom::Service> direct_receiver =
        direct_remote.InitWithNewPipeAndPassReceiver();

    PendingRemote<mojom::Service> fallback_remote;
    PendingReceiver<mojom::Service> fallback_receiver =
        fallback_remote.InitWithNewPipeAndPassReceiver();

    // Pass the child its pipes.
    MojoHandle remote_pipes[] = {direct_remote.PassPipe().release().value(),
                                 fallback_remote.PassPipe().release().value()};
    WriteMessageWithHandles(child, "", remote_pipes, 2);

    // Start the services. This blocks until they have direct links to the child
    // portal (i.e. no proxies).
    base::WaitableEvent direct_ping_event;
    direct_runner.Start(std::move(direct_receiver), direct_ping_event);
    base::WaitableEvent fallback_ping_event;
    fallback_runner.Start(std::move(fallback_receiver), fallback_ping_event,
                          /*simulate_failure=*/true);

    // Pause the IO thread and tell the child to ping the services.
    {
      ScopedPauseIOThread pause_io;
      WriteMessage(child, "ok go");

      // Now wait for receipt of the direct Ping(), which comes in directly on
      // the service thread, before unblocking IO.
      direct_ping_event.Wait();

      // Since the child sends the fallback Ping() before the direct Ping(), if
      // it came directly on the service thread it would be received first.
      // Since it's NOT received yet, it must not have come directly. (Note that
      // if `direct_runner` and `fallback_runner` used different service
      // threads, the order that the two WaitableEvent's are signaled would be
      // ambiguous.)
      EXPECT_FALSE(fallback_ping_event.IsSignaled());
    }

    // Now that the IO thread is unblocked the fallback Ping() should arrive.
    fallback_ping_event.Wait();

    // Wait for the child to finish.
    EXPECT_EQ("done", ReadMessage(child));
  });
}

DEFINE_TEST_CLIENT_TEST_WITH_PIPE(FallbackToIOThreadHopOnFailure_Child,
                                  DirectReceiverTest,
                                  test_pipe_handle) {
  const ScopedMessagePipeHandle test_pipe{MessagePipeHandle{test_pipe_handle}};

  // Before binding to a Remote, wait for the pipe's portal to have a direct
  // link to the service.
  MojoHandle handles[2];
  ReadMessageWithHandles(test_pipe->value(), handles, 2);
  WaitForDirectRemoteLink(handles[0]);
  WaitForDirectRemoteLink(handles[1]);

  Remote<mojom::Service> direct_service{PendingRemote<mojom::Service>{
      MakeScopedHandle(MessagePipeHandle{handles[0]}), 0}};
  Remote<mojom::Service> fallback_service{PendingRemote<mojom::Service>{
      MakeScopedHandle(MessagePipeHandle{handles[1]}), 0}};

  // Wait for the test to be ready for our Ping(), ensuring that its IO thread
  // is paused first.
  EXPECT_EQ("ok go", ReadMessage(test_pipe->value()));

  base::RunLoop loop;
  auto quit_closure = base::BarrierClosure(2, loop.QuitClosure());
  // Send the fallback Ping() first (see comment in the ScopedPauseIOThread
  // block above).
  fallback_service->Ping(quit_closure);
  direct_service->Ping(quit_closure);
  loop.Run();

  WriteMessage(test_pipe->value(), "done");
}

TEST_F(DirectReceiverTest, ThreadLocalInstanceShared) {
  // Creates two DirectReceivers on the same thread and validates that they
  // share the same underlying ipcz node.

  base::Thread io_thread{"Test IO thread"};
  io_thread.StartWithOptions(
      base::Thread::Options{base::MessagePumpType::IO, 0});

  std::unique_ptr<ServiceImpl> impl1;
  std::unique_ptr<ServiceImpl> impl2;
  Remote<mojom::Service> remote1;
  Remote<mojom::Service> remote2;
  auto receiver1 = remote1.BindNewPipeAndPassReceiver();
  auto receiver2 = remote2.BindNewPipeAndPassReceiver();
  RunOn(io_thread, [&] {
    impl1 = std::make_unique<ServiceImpl>(io_thread.task_runner());
    impl1->receiver().Bind(std::move(receiver1));

    impl2 = std::make_unique<ServiceImpl>(io_thread.task_runner());
    impl2->receiver().Bind(std::move(receiver2));

    // Both receivers should be using the same node to receive I/O.
    EXPECT_EQ(impl1->receiver().node_for_testing().node(),
              impl2->receiver().node_for_testing().node());
  });

  // For good measure, verify that the receivers actually work too.
  base::RunLoop loop1;
  remote1->Ping(loop1.QuitClosure());
  loop1.Run();
  base::RunLoop loop2;
  remote2->Ping(loop2.QuitClosure());
  loop2.Run();

  RunOn(io_thread, [&] {
    // Also verify that when the last receiver goes away on this thread, the
    // thread's node also goes away.
    EXPECT_TRUE(internal::ThreadLocalNode::CurrentThreadHasInstance());
    impl1.reset();
    EXPECT_TRUE(internal::ThreadLocalNode::CurrentThreadHasInstance());
    impl2.reset();
    EXPECT_FALSE(internal::ThreadLocalNode::CurrentThreadHasInstance());
  });
}

TEST_F(DirectReceiverTest, UniqueNodePerThread) {
  // Creates a DirectReceiver on each of two distinct threads and validates that
  // they each have their own distinct ipcz node.

  base::Thread io_thread1{"Test IO thread 1"};
  base::Thread io_thread2{"Test IO thread 2"};
  io_thread1.StartWithOptions(
      base::Thread::Options{base::MessagePumpType::IO, 0});
  io_thread2.StartWithOptions(
      base::Thread::Options{base::MessagePumpType::IO, 0});

  std::unique_ptr<ServiceImpl> impl1;
  std::unique_ptr<ServiceImpl> impl2;
  Remote<mojom::Service> remote1;
  Remote<mojom::Service> remote2;
  auto receiver1 = remote1.BindNewPipeAndPassReceiver();
  auto receiver2 = remote2.BindNewPipeAndPassReceiver();
  IpczHandle node1, node2;
  RunOn(io_thread1, [&] {
    impl1 = std::make_unique<ServiceImpl>(io_thread1.task_runner());
    impl1->receiver().Bind(std::move(receiver1));
    node1 = impl1->receiver().node_for_testing().node();
  });
  RunOn(io_thread2, [&] {
    impl2 = std::make_unique<ServiceImpl>(io_thread2.task_runner());
    impl2->receiver().Bind(std::move(receiver2));
    node2 = impl2->receiver().node_for_testing().node();
  });

  // Both receivers should be using different nodes to receive I/O.
  EXPECT_NE(node1, node2);

  // For good measure, verify that the receivers actually work too.
  base::RunLoop loop1;
  remote1->Ping(loop1.QuitClosure());
  loop1.Run();
  base::RunLoop loop2;
  remote2->Ping(loop2.QuitClosure());
  loop2.Run();

  RunOn(io_thread1, [&] {
    EXPECT_TRUE(internal::ThreadLocalNode::CurrentThreadHasInstance());
    impl1.reset();
    EXPECT_FALSE(internal::ThreadLocalNode::CurrentThreadHasInstance());
  });

  RunOn(io_thread2, [&] {
    EXPECT_TRUE(internal::ThreadLocalNode::CurrentThreadHasInstance());
    impl2.reset();
    EXPECT_FALSE(internal::ThreadLocalNode::CurrentThreadHasInstance());
  });
}

TEST_F(DirectReceiverTest, BindInvalidPendingReceiver) {
  base::Thread io_thread("Test IO thread 1");
  io_thread.StartWithOptions(
      base::Thread::Options{base::MessagePumpType::IO, 0});

  RunOn(io_thread, [&] {
    EXPECT_FALSE(internal::ThreadLocalNode::CurrentThreadHasInstance());
    auto impl = std::make_unique<ServiceImpl>(io_thread.task_runner());
    impl->receiver().Bind(NullReceiver());
    EXPECT_FALSE(impl->receiver().receiver_for_testing().is_bound());
  });
}

}  // namespace mojo::test::direct_receiver_unittest