910e62b5创建于 1月15日历史提交
// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <stddef.h>
#include <stdint.h>

#include <algorithm>
#include <array>
#include <memory>
#include <tuple>
#include <utility>

#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/run_loop.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool.h"
#include "base/test/bind.h"
#include "base/test/scoped_feature_list.h"
#include "base/test/task_environment.h"
#include "base/test/test_future.h"
#include "base/threading/sequence_bound.h"
#include "base/threading/thread.h"
#include "base/threading/thread_restrictions.h"
#include "mojo/public/cpp/bindings/associated_receiver.h"
#include "mojo/public/cpp/bindings/associated_remote.h"
#include "mojo/public/cpp/bindings/lib/multiplex_router.h"
#include "mojo/public/cpp/bindings/pending_associated_receiver.h"
#include "mojo/public/cpp/bindings/pending_associated_remote.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "mojo/public/cpp/bindings/remote.h"
#include "mojo/public/cpp/bindings/shared_associated_remote.h"
#include "mojo/public/cpp/bindings/tests/associated_interface_unittest.test-mojom.h"
#include "mojo/public/cpp/bindings/unique_associated_receiver_set.h"
#include "mojo/public/cpp/system/functions.h"
#include "mojo/public/interfaces/bindings/tests/ping_service.test-mojom.h"
#include "mojo/public/interfaces/bindings/tests/test_associated_interfaces.test-mojom.h"
#include "mojo/public/interfaces/bindings/tests/test_sync_methods.test-mojom.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace test {
namespace associated_interface_unittest {
namespace {

using mojo::internal::MultiplexRouter;

class IntegerSenderImpl : public IntegerSender {
 public:
  IntegerSenderImpl() = default;
  explicit IntegerSenderImpl(PendingAssociatedReceiver<IntegerSender> receiver)
      : receiver_(this, std::move(receiver)) {}

  ~IntegerSenderImpl() override = default;

  void set_notify_send_method_called(
      base::RepeatingCallback<void(int32_t)> callback) {
    notify_send_method_called_ = std::move(callback);
  }

  void Echo(int32_t value, EchoCallback callback) override {
    if (value == -1) {
      receiver_.ReportBadMessage("Reporting bad message for value == -1");
      return;
    }
    std::move(callback).Run(value);
  }
  void Send(int32_t value) override { notify_send_method_called_.Run(value); }

  AssociatedReceiver<IntegerSender>* receiver() { return &receiver_; }

 private:
  AssociatedReceiver<IntegerSender> receiver_{this};
  base::RepeatingCallback<void(int32_t)> notify_send_method_called_;
};

class IntegerSenderConnectionImpl : public IntegerSenderConnection {
 public:
  explicit IntegerSenderConnectionImpl(
      PendingReceiver<IntegerSenderConnection> receiver)
      : receiver_(this, std::move(receiver)) {}

  ~IntegerSenderConnectionImpl() override = default;

  void GetSender(PendingAssociatedReceiver<IntegerSender> receiver) override {
    DCHECK(receiver.is_valid());
    senders_.Add(std::make_unique<IntegerSenderImpl>(), std::move(receiver));
  }

  void AsyncGetSender(AsyncGetSenderCallback callback) override {
    PendingAssociatedRemote<IntegerSender> remote;
    GetSender(remote.InitWithNewEndpointAndPassReceiver());
    std::move(callback).Run(std::move(remote));
  }

  Receiver<IntegerSenderConnection>* receiver() { return &receiver_; }

 private:
  Receiver<IntegerSenderConnection> receiver_;
  UniqueAssociatedReceiverSet<IntegerSender> senders_;
};

class AssociatedInterfaceTest : public testing::Test {
 public:
  AssociatedInterfaceTest()
      : main_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()) {}
  ~AssociatedInterfaceTest() override = default;

  template <typename T>
  PendingAssociatedRemote<T> EmulatePassingAssociatedRemote(
      PendingAssociatedRemote<T> remote,
      scoped_refptr<MultiplexRouter> source,
      scoped_refptr<MultiplexRouter> target) {
    ScopedInterfaceEndpointHandle handle = remote.PassHandle();
    CHECK(handle.pending_association());
    auto id = source->AssociateInterface(std::move(handle));
    return PendingAssociatedRemote<T>(target->CreateLocalEndpointHandle(id),
                                      remote.version());
  }

  void CreateRouterPair(scoped_refptr<MultiplexRouter>* router0,
                        scoped_refptr<MultiplexRouter>* router1) {
    MessagePipe pipe;
    *router0 = MultiplexRouter::CreateAndStartReceiving(
        std::move(pipe.handle0), MultiplexRouter::MULTI_INTERFACE, true,
        main_runner_);
    *router1 = MultiplexRouter::CreateAndStartReceiving(
        std::move(pipe.handle1), MultiplexRouter::MULTI_INTERFACE, false,
        main_runner_);
  }

  void CreateIntegerSenderWithExistingRouters(
      scoped_refptr<MultiplexRouter> router0,
      PendingAssociatedRemote<IntegerSender>* remote0,
      scoped_refptr<MultiplexRouter> router1,
      PendingAssociatedReceiver<IntegerSender>* receiver1) {
    *receiver1 = remote0->InitWithNewEndpointAndPassReceiver();
    *remote0 =
        EmulatePassingAssociatedRemote(std::move(*remote0), router1, router0);
  }

  void CreateIntegerSender(PendingAssociatedRemote<IntegerSender>* remote,
                           PendingAssociatedReceiver<IntegerSender>* receiver) {
    scoped_refptr<MultiplexRouter> router0;
    scoped_refptr<MultiplexRouter> router1;
    CreateRouterPair(&router0, &router1);
    CreateIntegerSenderWithExistingRouters(router1, remote, router0, receiver);
  }

 private:
  base::test::TaskEnvironment task_environment;
  scoped_refptr<base::SequencedTaskRunner> main_runner_;
};

void Fail() {
  FAIL() << "Unexpected connection error";
}

TEST_F(AssociatedInterfaceTest, InterfacesAtBothEnds) {
  // Bind to the same pipe two associated interfaces, whose implementation lives
  // at different ends. Test that the two don't interfere with each other.

  scoped_refptr<MultiplexRouter> router0;
  scoped_refptr<MultiplexRouter> router1;
  CreateRouterPair(&router0, &router1);

  PendingAssociatedReceiver<IntegerSender> receiver;
  PendingAssociatedRemote<IntegerSender> remote;

  CreateIntegerSenderWithExistingRouters(router1, &remote, router0, &receiver);
  IntegerSenderImpl impl0(std::move(receiver));
  AssociatedRemote<IntegerSender> remote0(std::move(remote));

  CreateIntegerSenderWithExistingRouters(router0, &remote, router1, &receiver);
  IntegerSenderImpl impl1(std::move(receiver));
  AssociatedRemote<IntegerSender> remote1(std::move(remote));

  base::RunLoop run_loop, run_loop2;
  bool remote0_callback_run = false;
  remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(123, value);
                  remote0_callback_run = true;
                  run_loop.Quit();
                }));

  bool remote1_callback_run = false;
  remote1->Echo(456, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(456, value);
                  remote1_callback_run = true;
                  run_loop2.Quit();
                }));

  run_loop.Run();
  run_loop2.Run();
  EXPECT_TRUE(remote0_callback_run);
  EXPECT_TRUE(remote1_callback_run);

  bool remote0_disconnect_handler_run = false;
  base::RunLoop run_loop3;
  remote0.set_disconnect_handler(base::BindLambdaForTesting([&] {
    remote0_disconnect_handler_run = true;
    run_loop3.Quit();
  }));

  impl0.receiver()->reset();
  run_loop3.Run();
  EXPECT_TRUE(remote0_disconnect_handler_run);

  bool remote1_disconnect_handler_run = false;
  base::RunLoop run_loop4;
  impl1.receiver()->set_disconnect_handler(base::BindLambdaForTesting([&] {
    remote1_disconnect_handler_run = true;
    run_loop4.Quit();
  }));

  remote1.reset();
  run_loop4.Run();
  EXPECT_TRUE(remote1_disconnect_handler_run);
}

class TestSender {
 public:
  TestSender()
      : task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
        next_sender_(nullptr),
        max_value_to_send_(-1) {}

  // The following three methods are called on the corresponding sender thread.
  void SetUp(PendingAssociatedRemote<IntegerSender> remote,
             TestSender* next_sender,
             int32_t max_value_to_send) {
    CHECK(task_runner()->RunsTasksInCurrentSequence());

    remote_.Bind(std::move(remote));
    next_sender_ = next_sender ? next_sender : this;
    max_value_to_send_ = max_value_to_send;
  }

  void Send(int32_t value) {
    CHECK(task_runner()->RunsTasksInCurrentSequence());

    if (value > max_value_to_send_)
      return;

    remote_->Send(value);

    next_sender_->task_runner()->PostTask(
        FROM_HERE, base::BindOnce(&TestSender::Send,
                                  base::Unretained(next_sender_), ++value));
  }

  void TearDown() {
    CHECK(task_runner()->RunsTasksInCurrentSequence());

    remote_.reset();
  }

  base::SequencedTaskRunner* task_runner() { return task_runner_.get(); }

 private:
  scoped_refptr<base::SequencedTaskRunner> task_runner_;
  raw_ptr<TestSender> next_sender_;
  int32_t max_value_to_send_;

  AssociatedRemote<IntegerSender> remote_;
};

class TestReceiver {
 public:
  TestReceiver()
      : task_runner_(base::ThreadPool::CreateSequencedTaskRunner({})),
        expected_calls_(0) {}

  void SetUp(PendingAssociatedReceiver<IntegerSender> receiver0,
             PendingAssociatedReceiver<IntegerSender> receiver1,
             size_t expected_calls,
             base::OnceClosure notify_finish) {
    CHECK(task_runner()->RunsTasksInCurrentSequence());

    impl0_ = std::make_unique<IntegerSenderImpl>(std::move(receiver0));
    impl0_->set_notify_send_method_called(base::BindRepeating(
        &TestReceiver::SendMethodCalled, base::Unretained(this)));
    impl1_ = std::make_unique<IntegerSenderImpl>(std::move(receiver1));
    impl1_->set_notify_send_method_called(base::BindRepeating(
        &TestReceiver::SendMethodCalled, base::Unretained(this)));

    expected_calls_ = expected_calls;
    notify_finish_ = std::move(notify_finish);
  }

  void TearDown() {
    CHECK(task_runner()->RunsTasksInCurrentSequence());

    impl0_.reset();
    impl1_.reset();
  }

  base::SequencedTaskRunner* task_runner() { return task_runner_.get(); }
  const std::vector<int32_t>& values() const { return values_; }

 private:
  void SendMethodCalled(int32_t value) {
    values_.push_back(value);

    if (values_.size() >= expected_calls_)
      std::move(notify_finish_).Run();
  }

  scoped_refptr<base::SequencedTaskRunner> task_runner_;
  size_t expected_calls_;

  std::unique_ptr<IntegerSenderImpl> impl0_;
  std::unique_ptr<IntegerSenderImpl> impl1_;

  std::vector<int32_t> values_;

  base::OnceClosure notify_finish_;
};

class NotificationCounter {
 public:
  NotificationCounter(size_t total_count, base::OnceClosure notify_finish)
      : total_count_(total_count),
        current_count_(0),
        notify_finish_(std::move(notify_finish)) {}

  ~NotificationCounter() = default;

  // Okay to call from any thread.
  void OnGotNotification() {
    bool finshed = false;
    {
      base::AutoLock locker(lock_);
      CHECK_LT(current_count_, total_count_);
      current_count_++;
      finshed = current_count_ == total_count_;
    }

    if (finshed)
      std::move(notify_finish_).Run();
  }

 private:
  base::Lock lock_;
  const size_t total_count_;
  size_t current_count_;
  base::OnceClosure notify_finish_;
};

TEST_F(AssociatedInterfaceTest, MultiThreadAccess) {
  // Set up four associated interfaces on a message pipe. Use the inteface
  // pointers on four threads in parallel; run the interface implementations on
  // two threads. Test that multi-threaded access works.

  const int32_t kMaxValue = 1000;
  MessagePipe pipe;
  scoped_refptr<MultiplexRouter> router0;
  scoped_refptr<MultiplexRouter> router1;
  CreateRouterPair(&router0, &router1);

  std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers;
  std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes;
  for (size_t i = 0; i < 4; ++i) {
    CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i],
                                           router0, &pending_receivers[i]);
  }

  std::array<TestSender, 4> senders;
  for (size_t i = 0; i < 4; ++i) {
    senders[i].task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]),
                       std::move(pending_remotes[i]), nullptr,
                       kMaxValue * (i + 1) / 4));
  }

  base::RunLoop run_loop;
  std::array<TestReceiver, 2> receivers;
  NotificationCounter counter(2, run_loop.QuitClosure());
  for (size_t i = 0; i < 2; ++i) {
    receivers[i].task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]),
                       std::move(pending_receivers[2 * i]),
                       std::move(pending_receivers[2 * i + 1]),
                       static_cast<size_t>(kMaxValue / 2),
                       base::BindOnce(&NotificationCounter::OnGotNotification,
                                      base::Unretained(&counter))));
  }

  for (size_t i = 0; i < 4; ++i) {
    senders[i].task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestSender::Send, base::Unretained(&senders[i]),
                       kMaxValue * i / 4 + 1));
  }

  run_loop.Run();

  for (size_t i = 0; i < 4; ++i) {
    base::RunLoop run_loop2;
    senders[i].task_runner()->PostTaskAndReply(
        FROM_HERE,
        base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])),
        run_loop2.QuitClosure());
    run_loop2.Run();
  }

  for (size_t i = 0; i < 2; ++i) {
    base::RunLoop run_loop2;
    receivers[i].task_runner()->PostTaskAndReply(
        FROM_HERE,
        base::BindOnce(&TestReceiver::TearDown,
                       base::Unretained(&receivers[i])),
        run_loop2.QuitClosure());
    run_loop2.Run();
  }

  EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size());
  EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size());

  std::vector<int32_t> all_values;
  all_values.insert(all_values.end(), receivers[0].values().begin(),
                    receivers[0].values().end());
  all_values.insert(all_values.end(), receivers[1].values().begin(),
                    receivers[1].values().end());

  std::sort(all_values.begin(), all_values.end());
  for (size_t i = 0; i < all_values.size(); ++i)
    ASSERT_EQ(static_cast<int32_t>(i + 1), all_values[i]);
}

TEST_F(AssociatedInterfaceTest, FIFO) {
  // Set up four associated interfaces on a message pipe. Use the inteface
  // pointers on four threads; run the interface implementations on two threads.
  // Take turns to make calls using the four pointers. Test that FIFO-ness is
  // preserved.

  const int32_t kMaxValue = 100;
  MessagePipe pipe;
  scoped_refptr<MultiplexRouter> router0;
  scoped_refptr<MultiplexRouter> router1;
  CreateRouterPair(&router0, &router1);

  std::array<PendingAssociatedReceiver<IntegerSender>, 4> pending_receivers;
  std::array<PendingAssociatedRemote<IntegerSender>, 4> pending_remotes;
  for (size_t i = 0; i < 4; ++i) {
    CreateIntegerSenderWithExistingRouters(router1, &pending_remotes[i],
                                           router0, &pending_receivers[i]);
  }

  std::array<TestSender, 4> senders;
  for (size_t i = 0; i < 4; ++i) {
    senders[i].task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestSender::SetUp, base::Unretained(&senders[i]),
                       std::move(pending_remotes[i]),
                       base::Unretained(&senders[(i + 1) % 4]), kMaxValue));
  }

  base::RunLoop run_loop;
  std::array<TestReceiver, 2> receivers;
  NotificationCounter counter(2, run_loop.QuitClosure());
  for (size_t i = 0; i < 2; ++i) {
    receivers[i].task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestReceiver::SetUp, base::Unretained(&receivers[i]),
                       std::move(pending_receivers[2 * i]),
                       std::move(pending_receivers[2 * i + 1]),
                       static_cast<size_t>(kMaxValue / 2),
                       base::BindOnce(&NotificationCounter::OnGotNotification,
                                      base::Unretained(&counter))));
  }

  senders[0].task_runner()->PostTask(
      FROM_HERE,
      base::BindOnce(&TestSender::Send, base::Unretained(&senders[0]), 1));

  run_loop.Run();

  for (size_t i = 0; i < 4; ++i) {
    base::RunLoop run_loop2;
    senders[i].task_runner()->PostTaskAndReply(
        FROM_HERE,
        base::BindOnce(&TestSender::TearDown, base::Unretained(&senders[i])),
        run_loop2.QuitClosure());
    run_loop2.Run();
  }

  for (size_t i = 0; i < 2; ++i) {
    base::RunLoop run_loop2;
    receivers[i].task_runner()->PostTaskAndReply(
        FROM_HERE,
        base::BindOnce(&TestReceiver::TearDown,
                       base::Unretained(&receivers[i])),
        run_loop2.QuitClosure());
    run_loop2.Run();
  }

  EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[0].values().size());
  EXPECT_EQ(static_cast<size_t>(kMaxValue / 2), receivers[1].values().size());

  for (size_t i = 0; i < 2; ++i) {
    for (size_t j = 1; j < receivers[i].values().size(); ++j)
      EXPECT_LT(receivers[i].values()[j - 1], receivers[i].values()[j]);
  }
}

TEST_F(AssociatedInterfaceTest, PassAssociatedInterfaces) {
  Remote<IntegerSenderConnection> connection_remote;
  IntegerSenderConnectionImpl connection(
      connection_remote.BindNewPipeAndPassReceiver());

  AssociatedRemote<IntegerSender> sender0;
  connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());

  base::RunLoop run_loop;
  sender0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(123, value);
                  run_loop.Quit();
                }));
  run_loop.Run();

  AssociatedRemote<IntegerSender> sender1;
  base::RunLoop run_loop2;
  connection_remote->AsyncGetSender(base::BindLambdaForTesting(
      [&](PendingAssociatedRemote<IntegerSender> sender) {
        sender1.Bind(std::move(sender));
        run_loop2.Quit();
      }));
  run_loop2.Run();
  EXPECT_TRUE(sender1);

  base::RunLoop run_loop3;
  sender1->Echo(456, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(456, value);
                  run_loop3.Quit();
                }));
  run_loop3.Run();
}

TEST_F(AssociatedInterfaceTest,
       ReceiverWaitAndPauseWhenNoAssociatedInterfaces) {
  Remote<IntegerSenderConnection> connection_remote;
  IntegerSenderConnectionImpl connection(
      connection_remote.BindNewPipeAndPassReceiver());

  AssociatedRemote<IntegerSender> sender0;
  connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());

  EXPECT_FALSE(
      connection.receiver()->internal_state()->HasAssociatedInterfaces());

  // There are no associated interfaces running on the pipe yet. It is okay to
  // pause.
  connection.receiver()->Pause();
  connection.receiver()->Resume();

  // There are no associated interfaces running on the pipe yet. It is okay to
  // wait.
  EXPECT_TRUE(connection.receiver()->WaitForIncomingCall());

  // The previous wait has dispatched the GetSender request message, therefore
  // an associated interface has been set up on the pipe. It is not allowed to
  // wait or pause.
  EXPECT_TRUE(
      connection.receiver()->internal_state()->HasAssociatedInterfaces());
}

class PingServiceImpl : public PingService {
 public:
  explicit PingServiceImpl(PendingAssociatedReceiver<PingService> receiver)
      : receiver_(this, std::move(receiver)) {}
  ~PingServiceImpl() override = default;

  AssociatedReceiver<PingService>& receiver() { return receiver_; }

  void set_ping_handler(base::RepeatingClosure handler) {
    ping_handler_ = std::move(handler);
  }

  // PingService:
  void Ping(PingCallback callback) override {
    if (ping_handler_)
      ping_handler_.Run();
    std::move(callback).Run();
  }

 private:
  AssociatedReceiver<PingService> receiver_;
  base::RepeatingClosure ping_handler_;
};

class PingProviderImpl : public AssociatedPingProvider {
 public:
  explicit PingProviderImpl(PendingReceiver<AssociatedPingProvider> receiver)
      : receiver_(this, std::move(receiver)) {}
  ~PingProviderImpl() override = default;

  // AssociatedPingProvider:
  void GetPing(PendingAssociatedReceiver<PingService> receiver) override {
    ping_services_.emplace_back(new PingServiceImpl(std::move(receiver)));

    if (expected_receivers_count_ > 0 &&
        ping_services_.size() == expected_receivers_count_ && quit_waiting_) {
      expected_receivers_count_ = 0;
      std::move(quit_waiting_).Run();
    }
  }

  std::vector<std::unique_ptr<PingServiceImpl>>& ping_services() {
    return ping_services_;
  }

  void WaitForReceivers(size_t count) {
    DCHECK(!quit_waiting_);
    expected_receivers_count_ = count;
    base::RunLoop loop;
    quit_waiting_ = loop.QuitClosure();
    loop.Run();
  }

 private:
  Receiver<AssociatedPingProvider> receiver_;
  std::vector<std::unique_ptr<PingServiceImpl>> ping_services_;
  size_t expected_receivers_count_ = 0;
  base::OnceClosure quit_waiting_;
};

class CallbackFilter : public MessageFilter {
 public:
  explicit CallbackFilter(base::RepeatingClosure callback)
      : callback_(std::move(callback)) {}
  ~CallbackFilter() override = default;

  static std::unique_ptr<CallbackFilter> Wrap(base::RepeatingClosure callback) {
    return std::make_unique<CallbackFilter>(std::move(callback));
  }

  // MessageFilter:
  bool WillDispatch(Message* message) override {
    callback_.Run();
    return true;
  }

  void DidDispatchOrReject(Message* message, bool accepted) override {}

 private:
  base::RepeatingClosure callback_;
};

// Verifies that filters work as expected on associated receivers, i.e. that
// they're notified in order, before dispatch; and that each associated
// receiver in a group operates with its own set of filters.
TEST_F(AssociatedInterfaceTest, ReceiverWithFilters) {
  Remote<AssociatedPingProvider> provider;
  PingProviderImpl provider_impl(provider.BindNewPipeAndPassReceiver());

  AssociatedRemote<PingService> ping_a, ping_b;
  provider->GetPing(ping_a.BindNewEndpointAndPassReceiver());
  provider->GetPing(ping_b.BindNewEndpointAndPassReceiver());
  provider_impl.WaitForReceivers(2);

  ASSERT_EQ(2u, provider_impl.ping_services().size());
  PingServiceImpl& ping_a_impl = *provider_impl.ping_services()[0];
  PingServiceImpl& ping_b_impl = *provider_impl.ping_services()[1];

  int a_status = 0;
  int b_status = 0;

  ping_a_impl.receiver().SetFilter(
      CallbackFilter::Wrap(base::BindLambdaForTesting([&] {
        EXPECT_EQ(0, a_status);
        EXPECT_EQ(0, b_status);
        a_status = 1;
      })));

  ping_b_impl.receiver().SetFilter(
      CallbackFilter::Wrap(base::BindLambdaForTesting([&] {
        EXPECT_EQ(1, a_status);
        EXPECT_EQ(0, b_status);
        b_status = 1;
      })));

  for (int i = 0; i < 10; ++i) {
    a_status = 0;
    b_status = 0;

    {
      base::RunLoop loop;
      ping_a->Ping(loop.QuitClosure());
      loop.Run();
    }

    EXPECT_EQ(1, a_status);
    EXPECT_EQ(0, b_status);

    {
      base::RunLoop loop;
      ping_b->Ping(loop.QuitClosure());
      loop.Run();
    }

    EXPECT_EQ(1, a_status);
    EXPECT_EQ(1, b_status);
  }
}

TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTesting) {
  PendingAssociatedReceiver<IntegerSender> receiver;
  PendingAssociatedRemote<IntegerSender> remote;
  CreateIntegerSender(&remote, &receiver);

  IntegerSenderImpl impl0(std::move(receiver));
  AssociatedRemote<IntegerSender> remote0(std::move(remote));
  remote0.set_disconnect_handler(base::BindOnce(&Fail));

  bool remote0_callback_run = false;
  remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(123, value);
                  remote0_callback_run = true;
                }));
  remote0.FlushForTesting();
  EXPECT_TRUE(remote0_callback_run);
}

TEST_F(AssociatedInterfaceTest, AssociatedRemoteFlushForTestingWithClosedPeer) {
  PendingAssociatedReceiver<IntegerSender> receiver;
  PendingAssociatedRemote<IntegerSender> remote;
  CreateIntegerSender(&remote, &receiver);

  AssociatedRemote<IntegerSender> remote0(std::move(remote));
  bool called = false;
  remote0.set_disconnect_handler(
      base::BindLambdaForTesting([&] { called = true; }));
  receiver.reset();

  remote0.FlushForTesting();
  EXPECT_TRUE(called);
  remote0.FlushForTesting();
}

TEST_F(AssociatedInterfaceTest, AssociatedBindingFlushForTesting) {
  PendingAssociatedReceiver<IntegerSender> receiver;
  PendingAssociatedRemote<IntegerSender> remote;
  CreateIntegerSender(&remote, &receiver);

  IntegerSenderImpl impl0(std::move(receiver));
  impl0.receiver()->set_disconnect_handler(base::BindOnce(&Fail));
  AssociatedRemote<IntegerSender> remote0(std::move(remote));

  bool remote0_callback_run = false;
  remote0->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                  EXPECT_EQ(123, value);
                  remote0_callback_run = true;
                }));

  // Because the flush is sent from the receiver, it only guarantees that the
  // request has been received, not the response. The second flush waits for the
  // response to be received.
  impl0.receiver()->FlushForTesting();
  impl0.receiver()->FlushForTesting();
  EXPECT_TRUE(remote0_callback_run);
}

TEST_F(AssociatedInterfaceTest,
       AssociatedReceiverFlushForTestingWithClosedPeer) {
  scoped_refptr<MultiplexRouter> router0;
  scoped_refptr<MultiplexRouter> router1;
  CreateRouterPair(&router0, &router1);

  PendingAssociatedReceiver<IntegerSender> receiver;
  {
    PendingAssociatedRemote<IntegerSender> remote;
    CreateIntegerSenderWithExistingRouters(router1, &remote, router0,
                                           &receiver);
  }

  IntegerSenderImpl impl(std::move(receiver));
  bool called = false;
  impl.receiver()->set_disconnect_handler(
      base::BindLambdaForTesting([&] { called = true; }));
  impl.receiver()->FlushForTesting();
  EXPECT_TRUE(called);
  impl.receiver()->FlushForTesting();
}

TEST_F(AssociatedInterfaceTest, ReceiverFlushForTesting) {
  Remote<IntegerSenderConnection> remote;
  IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
  bool called = false;
  remote->AsyncGetSender(base::BindLambdaForTesting(
      [&](PendingAssociatedRemote<IntegerSender> remote) { called = true; }));
  EXPECT_FALSE(called);
  impl.receiver()->set_disconnect_handler(base::BindOnce(&Fail));

  // Because the flush is sent from the receiver, it only guarantees that the
  // request has been received, not the response. The second flush waits for the
  // response to be received.
  impl.receiver()->FlushForTesting();
  impl.receiver()->FlushForTesting();

  EXPECT_TRUE(called);
}

TEST_F(AssociatedInterfaceTest, ReceiverFlushForTestingWithClosedPeer) {
  Remote<IntegerSenderConnection> remote;
  IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
  bool called = false;
  impl.receiver()->set_disconnect_handler(
      base::BindLambdaForTesting([&] { called = true; }));
  remote.reset();
  EXPECT_FALSE(called);
  impl.receiver()->FlushForTesting();
  EXPECT_TRUE(called);
  impl.receiver()->FlushForTesting();
}

TEST_F(AssociatedInterfaceTest, RemoteFlushForTesting) {
  Remote<IntegerSenderConnection> remote;
  IntegerSenderConnectionImpl impl(remote.BindNewPipeAndPassReceiver());
  bool called = false;
  remote.set_disconnect_handler(base::BindOnce(&Fail));
  remote->AsyncGetSender(base::BindLambdaForTesting(
      [&](PendingAssociatedRemote<IntegerSender> remote) { called = true; }));
  EXPECT_FALSE(called);
  remote.FlushForTesting();
  EXPECT_TRUE(called);
}

TEST_F(AssociatedInterfaceTest, RemoteFlushForTestingWithClosedPeer) {
  Remote<IntegerSenderConnection> remote;
  std::ignore = remote.BindNewPipeAndPassReceiver();
  bool called = false;
  remote.set_disconnect_handler(
      base::BindLambdaForTesting([&] { called = true; }));
  EXPECT_FALSE(called);
  remote.FlushForTesting();
  EXPECT_TRUE(called);
  remote.FlushForTesting();
}

TEST_F(AssociatedInterfaceTest, AssociatedReceiverConnectionErrorWithReason) {
  PendingAssociatedReceiver<IntegerSender> pending_receiver;
  PendingAssociatedRemote<IntegerSender> pending_remote;
  CreateIntegerSender(&pending_remote, &pending_receiver);

  IntegerSenderImpl impl(std::move(pending_receiver));
  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  base::RunLoop run_loop;
  impl.receiver()->set_disconnect_with_reason_handler(
      base::BindLambdaForTesting(
          [&](uint32_t custom_reason, const std::string& description) {
            EXPECT_EQ(123u, custom_reason);
            EXPECT_EQ("farewell", description);
            run_loop.Quit();
          }));

  remote.ResetWithReason(123u, "farewell");

  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest,
       PendingAssociatedReceiverConnectionErrorWithReason) {
  // Test that AssociatedReceiver is notified with connection error when the
  // interface hasn't associated with a message pipe and the peer is closed.

  AssociatedRemote<IntegerSender> remote;
  IntegerSenderImpl impl(remote.BindNewEndpointAndPassReceiver());

  base::RunLoop run_loop;
  impl.receiver()->set_disconnect_with_reason_handler(
      base::BindLambdaForTesting(
          [&](uint32_t custom_reason, const std::string& description) {
            EXPECT_EQ(123u, custom_reason);
            EXPECT_EQ("farewell", description);
            run_loop.Quit();
          }));

  remote.ResetWithReason(123u, "farewell");
  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest, AssociatedRemoteConnectionErrorWithReason) {
  PendingAssociatedReceiver<IntegerSender> pending_receiver;
  PendingAssociatedRemote<IntegerSender> pending_remote;
  CreateIntegerSender(&pending_remote, &pending_receiver);

  IntegerSenderImpl impl(std::move(pending_receiver));
  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  base::RunLoop run_loop;
  remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
      [&](uint32_t custom_reason, const std::string& description) {
        EXPECT_EQ(456u, custom_reason);
        EXPECT_EQ("farewell", description);
        run_loop.Quit();
      }));

  impl.receiver()->ResetWithReason(456u, "farewell");
  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest,
       PendingAssociatedRemoteConnectionErrorWithReason) {
  // Test that AssociatedInterfacePtr is notified with connection error when the
  // interface hasn't associated with a message pipe and the peer is closed.

  AssociatedRemote<IntegerSender> remote;
  auto pending_receiver = remote.BindNewEndpointAndPassReceiver();

  base::RunLoop run_loop;
  remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
      [&](uint32_t custom_reason, const std::string& description) {
        EXPECT_EQ(456u, custom_reason);
        EXPECT_EQ("farewell", description);
        run_loop.Quit();
      }));

  pending_receiver.ResetWithReason(456u, "farewell");
  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest, AssociatedRequestResetWithReason) {
  PendingAssociatedReceiver<IntegerSender> pending_receiver;
  PendingAssociatedRemote<IntegerSender> pending_remote;
  CreateIntegerSender(&pending_remote, &pending_receiver);

  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  base::RunLoop run_loop;
  remote.set_disconnect_with_reason_handler(base::BindLambdaForTesting(
      [&](uint32_t custom_reason, const std::string& description) {
        EXPECT_EQ(789u, custom_reason);
        EXPECT_EQ("long time no see", description);
        run_loop.Quit();
      }));

  pending_receiver.ResetWithReason(789u, "long time no see");

  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest, SharedAssociatedRemote) {
  Remote<IntegerSenderConnection> connection_remote;
  IntegerSenderConnectionImpl connection(
      connection_remote.BindNewPipeAndPassReceiver());

  PendingAssociatedRemote<IntegerSender> pending_remote;
  connection_remote->GetSender(
      pending_remote.InitWithNewEndpointAndPassReceiver());

  SharedAssociatedRemote<IntegerSender> shared_sender(
      std::move(pending_remote));

  {
    // Test the thread safe pointer can be used from the interface ptr thread.
    base::RunLoop run_loop;
    shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                          EXPECT_EQ(123, value);
                          run_loop.Quit();
                        }));
    run_loop.Run();
  }

  // Test the thread safe pointer can be used from another thread.
  base::RunLoop run_loop;

  auto sender_task_runner = base::ThreadPool::CreateSequencedTaskRunner({});
  auto quit_closure = run_loop.QuitClosure();
  sender_task_runner->PostTask(
      FROM_HERE, base::BindLambdaForTesting([&] {
        shared_sender->Echo(
            123, base::BindLambdaForTesting([&](int32_t value) {
              EXPECT_EQ(123, value);
              EXPECT_TRUE(sender_task_runner->RunsTasksInCurrentSequence());
              std::move(quit_closure).Run();
            }));
      }));

  // Block until the method callback is called on the background thread.
  run_loop.Run();
}

struct ForwarderTestContext {
  Remote<IntegerSenderConnection> connection_remote;
  std::unique_ptr<IntegerSenderConnectionImpl> interface_impl;
  PendingAssociatedReceiver<IntegerSender> sender_receiver;
};

TEST_F(AssociatedInterfaceTest, SharedAssociatedRemoteWithTaskRunner) {
  const scoped_refptr<base::SequencedTaskRunner> other_thread_task_runner =
      base::ThreadPool::CreateSequencedTaskRunner({});

  ForwarderTestContext* context = new ForwarderTestContext();
  PendingAssociatedRemote<IntegerSender> pending_remote;
  base::WaitableEvent sender_bound_event(
      base::WaitableEvent::ResetPolicy::MANUAL,
      base::WaitableEvent::InitialState::NOT_SIGNALED);
  other_thread_task_runner->PostTask(
      FROM_HERE, base::BindLambdaForTesting([&] {
        context->interface_impl = std::make_unique<IntegerSenderConnectionImpl>(
            context->connection_remote.BindNewPipeAndPassReceiver());
        context->connection_remote->GetSender(
            pending_remote.InitWithNewEndpointAndPassReceiver());
        sender_bound_event.Signal();
      }));

  sender_bound_event.Wait();

  // Create a SharedAssociatedRemote that binds on the background thread and is
  // associated with |connection_remote| there.
  SharedAssociatedRemote<IntegerSender> shared_sender(std::move(pending_remote),
                                                      other_thread_task_runner);

  // Issue a call on the shared remote immediately. Note that this may happen
  // before the interface is bound on the background thread, and that must be
  // OK.
  base::RunLoop run_loop;
  shared_sender->Echo(123, base::BindLambdaForTesting([&](int32_t value) {
                        EXPECT_EQ(123, value);
                        run_loop.Quit();
                      }));
  run_loop.Run();

  other_thread_task_runner->DeleteSoon(FROM_HERE, context);

  shared_sender.reset();
}

class DiscardingAssociatedPingProviderProvider
    : public AssociatedPingProviderProvider {
 public:
  void GetPingProvider(
      PendingAssociatedReceiver<AssociatedPingProvider> receiver) override {}
};

TEST_F(AssociatedInterfaceTest, CloseWithoutBindingAssociatedReceiver) {
  DiscardingAssociatedPingProviderProvider ping_provider_provider;
  mojo::Receiver<AssociatedPingProviderProvider> receiver(
      &ping_provider_provider);
  Remote<AssociatedPingProviderProvider> provider_provider;
  receiver.Bind(provider_provider.BindNewPipeAndPassReceiver());
  AssociatedRemote<AssociatedPingProvider> provider;
  provider_provider->GetPingProvider(provider.BindNewEndpointAndPassReceiver());
  AssociatedRemote<PingService> ping;
  provider->GetPing(ping.BindNewEndpointAndPassReceiver());
  base::RunLoop run_loop;
  ping.set_disconnect_handler(run_loop.QuitClosure());
  run_loop.Run();
}

TEST_F(AssociatedInterfaceTest, AssociateWithDisconnectedPipe) {
  AssociatedRemote<IntegerSender> sender;
  AssociateWithDisconnectedPipe(
      sender.BindNewEndpointAndPassReceiver().PassHandle());
  sender->Send(42);
}

TEST_F(AssociatedInterfaceTest, AsyncErrorHandlersWhenClosingPrimaryInterface) {
  // Ensures that associated interface error handlers are not invoked
  // synchronously when the primary interface pipe is closed. Regression test
  // for https://crbug.com/864731.

  Remote<IntegerSenderConnection> connection_remote;
  IntegerSenderConnectionImpl connection(
      connection_remote.BindNewPipeAndPassReceiver());

  base::RunLoop loop;
  bool error_handler_invoked = false;
  AssociatedRemote<IntegerSender> sender0;
  connection_remote->GetSender(sender0.BindNewEndpointAndPassReceiver());
  sender0.set_disconnect_handler(base::BindLambdaForTesting([&] {
    error_handler_invoked = true;
    loop.Quit();
  }));

  // This should not trigger the error handler synchronously...
  connection_remote.reset();
  EXPECT_FALSE(error_handler_invoked);

  // ...but it should be triggered once we spin the scheduler.
  loop.Run();
  EXPECT_TRUE(error_handler_invoked);
}

TEST_F(AssociatedInterfaceTest, AssociatedReceiverReportBadMessage) {
  PendingAssociatedReceiver<IntegerSender> pending_receiver;
  PendingAssociatedRemote<IntegerSender> pending_remote;
  CreateIntegerSender(&pending_remote, &pending_receiver);

  IntegerSenderImpl impl(std::move(pending_receiver));
  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  bool called = false;
  base::RunLoop run_loop;
  remote.set_disconnect_handler(base::BindLambdaForTesting([&] {
    called = true;
    run_loop.Quit();
  }));

  std::string received_error;
  SetDefaultProcessErrorHandler(base::BindLambdaForTesting(
      [&](const std::string& error) { received_error = error; }));

  remote->Echo(-1, IntegerSenderImpl::EchoCallback());
  EXPECT_FALSE(called);
  run_loop.Run();
  EXPECT_TRUE(called);
  EXPECT_EQ("Reporting bad message for value == -1", received_error);

  SetDefaultProcessErrorHandler(base::NullCallback());
}

TEST_F(AssociatedInterfaceTest, AssociatedReceiverDedicatedPipe) {
  PendingAssociatedRemote<IntegerSender> pending_remote;
  PendingAssociatedReceiver<IntegerSender> pending_receiver =
      pending_remote.InitWithNewEndpointAndPassReceiver();
  pending_receiver.EnableUnassociatedUsage();
  IntegerSenderImpl impl(std::move(pending_receiver));
  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  {
    base::RunLoop run_loop;
    impl.set_notify_send_method_called(
        base::BindLambdaForTesting([&](int32_t x) {
          EXPECT_EQ(88, x);
          run_loop.Quit();
        }));

    remote->Send(88);
    run_loop.Run();
  }

  {
    base::RunLoop run_loop;
    remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) {
                   EXPECT_EQ(888, x);
                   run_loop.Quit();
                 }));
  }
}

TEST_F(AssociatedInterfaceTest, AssociatedRemoteDedicatedPipe) {
  PendingAssociatedRemote<IntegerSender> pending_remote;
  PendingAssociatedReceiver<IntegerSender> pending_receiver =
      pending_remote.InitWithNewEndpointAndPassReceiver();
  IntegerSenderImpl impl(std::move(pending_receiver));
  pending_remote.EnableUnassociatedUsage();
  AssociatedRemote<IntegerSender> remote(std::move(pending_remote));

  {
    base::RunLoop run_loop;
    impl.set_notify_send_method_called(
        base::BindLambdaForTesting([&](int32_t x) {
          EXPECT_EQ(88, x);
          run_loop.Quit();
        }));

    remote->Send(88);
    run_loop.Run();
  }

  {
    base::RunLoop run_loop;
    remote->Echo(888, base::BindLambdaForTesting([&](int32_t x) {
                   EXPECT_EQ(888, x);
                   run_loop.Quit();
                 }));
  }
}

class ClumsyBinderImpl : public mojom::ClumsyBinder {
 public:
  explicit ClumsyBinderImpl(PendingReceiver<mojom::ClumsyBinder> receiver)
      : receiver_(this, std::move(receiver)) {}
  ~ClumsyBinderImpl() override = default;

  // mojom::ClumsyBinder:
  void DropAssociatedBinder(
      PendingAssociatedReceiver<mojom::AssociatedBinder> receiver) override {
    // Nothing to do but drop the receiver so it's closed.
  }

 private:
  Receiver<mojom::ClumsyBinder> receiver_;
};

TEST_F(AssociatedInterfaceTest, CloseSerializedAssociatedEndpoints) {
  // Regression test for https://crbug.com/331636067. Verifies that endpoint
  // lifetime is properly managed when associated endpoints are serialized into
  // a message that gets dropped before transmission.

  Remote<mojom::ClumsyBinder> binder;
  ClumsyBinderImpl binder_impl(binder.BindNewPipeAndPassReceiver());

  AssociatedRemote<mojom::AssociatedBinder> associated_binder;
  binder->DropAssociatedBinder(
      associated_binder.BindNewEndpointAndPassReceiver());

  // Wait for disconnection to be observed. This way we know any subsequent
  // outgoing messages on `associated_binder` will not be sent.
  base::RunLoop loop1;
  associated_binder.set_disconnect_handler(loop1.QuitClosure());
  loop1.Run();

  // Send another endpoint over. This receiver will be dropped, and the remote
  // should be properly notified of peer closure to terminate this loop.
  base::RunLoop loop2;
  AssociatedRemote<mojom::AssociatedBinder> another_binder;
  associated_binder->Bind(another_binder.BindNewEndpointAndPassReceiver());
  another_binder.set_disconnect_handler(loop2.QuitClosure());
  loop2.Run();
}

class TestSyncImpl : public TestSync {
 public:
  void Ping(PingCallback callback) override { std::move(callback).Run(); }
  void NoInterruptPing(NoInterruptPingCallback callback) override {
    std::move(callback).Run();
  }
  void Echo(int32_t value, EchoCallback callback) override {
    std::move(callback).Run(value);
  }
  void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); }
};

class TestSyncPrimaryImpl : public TestSyncPrimary, public TestSync {
 public:
  explicit TestSyncPrimaryImpl(PendingReceiver<TestSyncPrimary> receiver)
      : receiver_(this, std::move(receiver)), test_sync_receiver_(this) {}
  ~TestSyncPrimaryImpl() override = default;

  static constexpr int32_t kReceivedPing = 0b001;
  static constexpr int32_t kSyncCallWasAborted = 0b010;
  static constexpr int32_t kSyncCall2WasAborted = 0b100;
  static constexpr int32_t kNoInterruptPingReplied = 0b1000;

  void Ping(PingCallback callback) override {
    result_ |= kReceivedPing;
    std::move(callback).Run();
  }
  void NoInterruptPing(NoInterruptPingCallback) override { NOTREACHED(); }
  void Echo(int32_t value, EchoCallback callback) override {
    std::move(callback).Run(result_);
  }
  void AsyncEcho(int32_t, AsyncEchoCallback) override { NOTREACHED(); }

  void SendRemote(PendingAssociatedRemote<TestSync> remote) override {
    if (!test_sync_remote_.is_bound()) {
      test_sync_remote_.Bind(std::move(remote));
    } else {
      test_sync_remote2_.Bind(std::move(remote));
    }
    CHECK(!test_sync_receiver_.is_bound());
  }

  void SendReceiver(PendingAssociatedReceiver<TestSync> receiver) override {
    test_sync_receiver_.Bind(std::move(receiver));
    {
      base::ScopedAllowBaseSyncPrimitivesForTesting allow_sync;
      DoTest();
    }
  }

  virtual void DoTest() {
    CHECK(test_sync_remote_.is_bound());
    int reply = -1;
    // If we got a second test sync remote, make the first sync call on that
    // remote to verify behavior when the peer closed event comes in while
    // we're blocked on a different interface.
    if (test_sync_remote2_.is_bound()) {
      if (test_sync_remote2_->NoInterruptPing()) {
        result_ |= kNoInterruptPingReplied;
      }
      // Make another sync call on the secondary remote. This doesn't change
      // anything for most test cases, but is important for
      // TestHangOnDisconnectWithSignaledWatcher to make sure
      // SequenceLocalSyncEventWatcher has cleared out its "ready" watchers.
      test_sync_remote2_->Ping();
    }
    bool call_result = test_sync_remote_->Echo(123, &reply);
    if (!call_result) {
      result_ |= kSyncCallWasAborted;
    }
    // Make a second sync call to make sure that one gets correctly aborted as
    // well.
    call_result = test_sync_remote_->Echo(456, &reply);
    if (!call_result) {
      result_ |= kSyncCall2WasAborted;
    }
  }

 protected:
  Receiver<TestSyncPrimary> receiver_;
  AssociatedRemote<TestSync> test_sync_remote_;
  AssociatedRemote<TestSync> test_sync_remote2_;
  AssociatedReceiver<TestSync> test_sync_receiver_;
  int32_t result_ = 0;
};

// Regression test for https://crbug.com/435493653 and
// https://crbug.com/436965298. Verifies that sync calls made on an associated
// remote are correctly aborted if the receiver endpoint is closed even if other
//  messages are queued on the same message pipe first.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnect) {
  Remote<TestSyncPrimary> primary_remote;
  base::SequenceBound<TestSyncPrimaryImpl> primary_impl(
      base::ThreadPool::CreateSequencedTaskRunner({}),
      primary_remote.BindNewPipeAndPassReceiver());

  TestSyncImpl sync_impl;
  AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
  AssociatedRemote<TestSync> sync_remote;
  primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
  primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());

  sync_remote->Ping(base::DoNothing());
  sync_remote.reset();

  sync_receiver.reset();

  base::test::TestFuture<int32_t> result;
  primary_remote->Echo(0, result.GetCallback());
  EXPECT_EQ(TestSyncPrimaryImpl::kReceivedPing |
                TestSyncPrimaryImpl::kSyncCallWasAborted |
                TestSyncPrimaryImpl::kSyncCall2WasAborted,
            result.Get());

  primary_impl.SynchronouslyResetForTest();
}

// Slight variation of the above test, where the peer disconnect happens while
// blocked on a different interface than the one being disconnected.
// Additionally this test makes sure the disconnect event arrives while blocked
// on a NoInterrupt sync call, since that code path is slightly more
// complicated.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectDifferentEndpoint) {
  Remote<TestSyncPrimary> primary_remote;
  base::SequenceBound<TestSyncPrimaryImpl> primary_impl(
      base::ThreadPool::CreateSequencedTaskRunner({}),
      primary_remote.BindNewPipeAndPassReceiver());

  TestSyncImpl sync_impl;
  AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
  AssociatedReceiver<TestSync> sync_receiver2(&sync_impl);
  AssociatedRemote<TestSync> sync_remote;
  primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
  primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote());
  primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());

  sync_remote.reset();

  sync_receiver.reset();

  base::test::TestFuture<int32_t> result;
  primary_remote->Echo(0, result.GetCallback());
  EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted |
                TestSyncPrimaryImpl::kSyncCall2WasAborted |
                TestSyncPrimaryImpl::kNoInterruptPingReplied,
            result.Get());

  primary_impl.SynchronouslyResetForTest();
}

// Variation of TestHangOnDisconnectDifferentEndpoint, that additionally sets
// things up such that MultiplexRouter::EndPoint will have its sync_watcher_
// populated at the time the peer closed event is handled, by carefully ordering
// the messages and events sent over the message pipe.
TEST_F(AssociatedInterfaceTest, TestHangOnDisconnectWithSignaledWatcher) {
  Remote<TestSyncPrimary> primary_remote;
  // By making one extra sync call before the rest of the test body we can make
  // sure that the `sync_watcher_` field for the relevant end point has been
  // initialized.
  class TestSyncPrimaryImplWithExtraSyncCall : public TestSyncPrimaryImpl {
   public:
    using TestSyncPrimaryImpl::TestSyncPrimaryImpl;
    void DoTest() override {
      test_sync_remote_->Ping();
      TestSyncPrimaryImpl::DoTest();
    }
  };

  // For this test we need to make sure to disconnect the interface that will
  // hang while a sync call on a secondary interface is being made. So override
  // NoInterruptPing to do that disconnect.
  class TestSyncImplWithCallback : public TestSyncImpl {
   public:
    void NoInterruptPing(NoInterruptPingCallback callback) override {
      if (no_interrupt_ping_callback_) {
        std::move(no_interrupt_ping_callback_).Run();
      }
      std::move(callback).Run();
    }

    base::OnceClosure no_interrupt_ping_callback_;
  };

  base::SequenceBound<TestSyncPrimaryImplWithExtraSyncCall> primary_impl(
      base::ThreadPool::CreateSequencedTaskRunner({}),
      primary_remote.BindNewPipeAndPassReceiver());

  TestSyncImplWithCallback sync_impl;
  AssociatedReceiver<TestSync> sync_receiver(&sync_impl);
  AssociatedReceiver<TestSync> sync_receiver2(&sync_impl);

  sync_impl.no_interrupt_ping_callback_ =
      base::BindLambdaForTesting([&]() { sync_receiver.reset(); });

  AssociatedRemote<TestSync> sync_remote;
  primary_remote->SendRemote(sync_receiver.BindNewEndpointAndPassRemote());
  primary_remote->SendRemote(sync_receiver2.BindNewEndpointAndPassRemote());
  primary_remote->SendReceiver(sync_remote.BindNewEndpointAndPassReceiver());

  base::test::TestFuture<int32_t> result;
  primary_remote->Echo(0, result.GetCallback());
  EXPECT_EQ(TestSyncPrimaryImpl::kSyncCallWasAborted |
                TestSyncPrimaryImpl::kSyncCall2WasAborted |
                TestSyncPrimaryImpl::kNoInterruptPingReplied,
            result.Get());

  primary_impl.SynchronouslyResetForTest();
}

}  // namespace
}  // namespace associated_interface_unittest
}  // namespace test
}  // namespace mojo