910e62b5创建于 1月15日历史提交
// Copyright 2024 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>

#include <array>
#include <memory>
#include <optional>
#include <queue>
#include <string>
#include <string_view>
#include <utility>

#include "base/containers/span.h"
#include "base/files/scoped_file.h"
#include "base/message_loop/io_watcher.h"
#include "base/posix/eintr_wrapper.h"
#include "base/run_loop.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/bind.h"
#include "base/test/task_environment.h"
#include "base/threading/thread.h"
#include "testing/gtest/include/gtest/gtest.h"

#if BUILDFLAG(IS_ANDROID)
#include "base/android/java_handler_thread.h"
#endif

namespace base {
namespace {

// TODO(crbug.com/379190028): Introduce new types here as file descriptor
// support is added.
enum class FdIOCapableMessagePumpType {
  kDefaultIO,
#if BUILDFLAG(IS_ANDROID)
  kAndroid,
#endif
#if defined(USE_GLIB)
  kDefaultUI
#endif
};

std::pair<ScopedFD, ScopedFD> CreateSocketPair() {
  int fds[2];
  CHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == 0);
  PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
  PCHECK(fcntl(fds[1], F_SETFL, O_NONBLOCK) == 0);
  return {ScopedFD(fds[0]), ScopedFD(fds[1])};
}

void WriteToSocket(int fd, std::string_view msg) {
  const ssize_t result = HANDLE_EINTR(write(fd, msg.data(), msg.size()));
  CHECK_EQ(result, static_cast<ssize_t>(msg.size()));
}

void FillSocket(int fd) {
  const std::array<char, 1024> kJunk = {};
  ssize_t result;
  do {
    result = HANDLE_EINTR(write(fd, kJunk.data(), kJunk.size()));
  } while (result > 0);
}

std::string ReadFromSocket(int fd) {
  char buffer[256];
  const ssize_t result = HANDLE_EINTR(read(fd, buffer, std::size(buffer)));
  if (result <= 0) {
    return {};
  }

  const auto contents = span(buffer).first(static_cast<size_t>(result));
  return std::string(contents.begin(), contents.end());
}

template <typename Fn>
void RunOnTaskRunner(scoped_refptr<SequencedTaskRunner> task_runner, Fn fn) {
  RunLoop loop;
  task_runner->PostTask(FROM_HERE,
                        BindLambdaForTesting([&fn, quit = loop.QuitClosure()] {
                          fn();
                          quit.Run();
                        }));
  loop.Run();
}

class TestFdWatcher;

class IOWatcherFdTest
    : public testing::Test,
      public testing::WithParamInterface<FdIOCapableMessagePumpType> {
 public:
  void SetUp() override {
    switch (GetParam()) {
      case FdIOCapableMessagePumpType::kDefaultIO:
        thread_.emplace("IO thread");
        thread_->StartWithOptions(Thread::Options(MessagePumpType::IO, 0));
        io_task_runner_ = thread_->task_runner();
        break;

#if BUILDFLAG(IS_ANDROID)
      case FdIOCapableMessagePumpType::kAndroid:
        java_thread_.emplace("Java thread");
        java_thread_->Start();
        io_task_runner_ = java_thread_->task_runner();
        break;
#endif

#if defined(USE_GLIB)
      case FdIOCapableMessagePumpType::kDefaultUI:
        thread_.emplace("UI thread");
        thread_->StartWithOptions(Thread::Options(MessagePumpType::UI, 0));
        io_task_runner_ = thread_->task_runner();
        break;
#endif
    }
  }

  void TearDown() override {
    thread_.reset();
#if BUILDFLAG(IS_ANDROID)
    if (java_thread_) {
      java_thread_->Stop();
      java_thread_.reset();
    }
#endif
  }

  std::unique_ptr<TestFdWatcher> CreateWatcher();

  // This is useful for ensuring that read and write can be observed at the
  // same time on a socket's peer, since the operations which signal both read
  // and write availability will happen on the same thread that dispatches
  // signals.
  void MakePeerReadableAndWritableFromIOThread(int fd) {
    RunOnTaskRunner(io_task_runner_, [fd] {
      WriteToSocket(fd, "x");
      while (!ReadFromSocket(fd).empty()) {
      }
    });
  }

 private:
  test::TaskEnvironment task_environment_;
  std::optional<Thread> thread_;
#if BUILDFLAG(IS_ANDROID)
  std::optional<android::JavaHandlerThread> java_thread_;
#endif
  scoped_refptr<SequencedTaskRunner> io_task_runner_;
};

class TestFdWatcher : public IOWatcher::FdWatcher {
 public:
  explicit TestFdWatcher(scoped_refptr<SequencedTaskRunner> io_task_runner)
      : io_task_runner_(std::move(io_task_runner)) {}

  ~TestFdWatcher() override { Stop(); }

  int num_events() {
    AutoLock lock(lock_);
    return num_events_;
  }

  void reset_num_events() {
    AutoLock lock(lock_);
    num_events_ = 0;
  }

  void set_cancel_on_read() { cancel_on_read_ = true; }
  void set_cancel_on_write() { cancel_on_write_ = true; }

  void Watch(const ScopedFD& fd,
             IOWatcher::FdWatchDuration duration,
             IOWatcher::FdWatchMode mode) {
    RunOnTaskRunner(io_task_runner_, [this, fd = fd.get(), duration, mode] {
      watch_ = IOWatcher::Get()->WatchFileDescriptor(fd, duration, mode, *this);
    });
  }

  void Stop() {
    RunOnTaskRunner(io_task_runner_, [this] { watch_.reset(); });
  }

  std::string WaitForNextMessage() {
    AutoLock lock(lock_);
    while (messages_.empty()) {
      messages_available_.Wait();
    }
    std::string next_message = messages_.front();
    messages_.pop();
    return next_message;
  }

  void WaitForDisconnect() { disconnect_event_.Wait(); }

  void WaitForWritable() { writable_event_.Wait(); }

  void WaitForReadableOrWritable() { readable_or_writable_event_.Wait(); }

  // IOWatcher::FdWatcher:
  void OnFdReadable(int fd) override {
    bool did_read_something = false;
    {
      AutoLock lock(lock_);
      ++num_events_;
      readable_or_writable_event_.Signal();

      for (;;) {
        std::string message = ReadFromSocket(fd);
        if (message.empty()) {
          break;
        }

        did_read_something = true;
        messages_.push(std::move(message));
        messages_available_.Signal();
      }
    }

    if (!did_read_something) {
      disconnect_event_.Signal();
    }

    if (cancel_on_read_) {
      watch_.reset();
    }
  }

  void OnFdWritable(int fd) override {
    {
      AutoLock lock(lock_);
      ++num_events_;
      writable_event_.Signal();
      readable_or_writable_event_.Signal();
    }

    if (cancel_on_write_) {
      watch_.reset();
    }
  }

 private:
  const scoped_refptr<SequencedTaskRunner> io_task_runner_;

  // The active watch, started by Watch(). Only one at a time and must be
  // created and destroyed on `io_task_runner_`.
  std::unique_ptr<IOWatcher::FdWatch> watch_;

  // Signaled when `watch_` observes writability.
  WaitableEvent writable_event_{WaitableEvent::ResetPolicy::AUTOMATIC};

  // Signaled when `watch_` observes either readability or writability.
  WaitableEvent readable_or_writable_event_{
      WaitableEvent::ResetPolicy::AUTOMATIC};

  // Signaled when `watch_` observes disconnection - i.e., readability when
  // nothing is available to read.
  WaitableEvent disconnect_event_;

  // If set by a test, observing readability will immediately destroy `watch_`.
  bool cancel_on_read_ = false;

  // If set by a test, observing writability will immediately destroy `watch_`.
  bool cancel_on_write_ = false;

  Lock lock_;

  // Message queue accumulated as readability is signaled.
  ConditionVariable messages_available_{&lock_};
  std::queue<std::string> messages_ GUARDED_BY(lock_);

  // Counts the number of observed events of any kind.
  int num_events_ GUARDED_BY(lock_) = 0;
};

std::unique_ptr<TestFdWatcher> IOWatcherFdTest::CreateWatcher() {
  return std::make_unique<TestFdWatcher>(io_task_runner_);
}

TEST_P(IOWatcherFdTest, ReadOnce) {
  // Test that a one-shot read watch sees a single readable event and no more.
  auto [a, b] = CreateSocketPair();
  auto watcher1 = CreateWatcher();
  watcher1->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                  IOWatcher::FdWatchMode::kRead);
  WriteToSocket(a.get(), "ping");
  EXPECT_EQ("ping", watcher1->WaitForNextMessage());

  auto watcher2 = CreateWatcher();
  watcher2->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                  IOWatcher::FdWatchMode::kRead);
  WriteToSocket(a.get(), "pong");
  EXPECT_EQ("pong", watcher2->WaitForNextMessage());
  EXPECT_EQ(1, watcher1->num_events());
}

TEST_P(IOWatcherFdTest, ReadPersistent) {
  // Tests that a persistent read watch can see multiple events.
  auto [a, b] = CreateSocketPair();
  auto watcher = CreateWatcher();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
                 IOWatcher::FdWatchMode::kRead);
  WriteToSocket(a.get(), "ping");
  EXPECT_EQ("ping", watcher->WaitForNextMessage());
  WriteToSocket(a.get(), "pong");
  EXPECT_EQ("pong", watcher->WaitForNextMessage());
  EXPECT_EQ(2, watcher->num_events());
  a.reset();
  watcher->WaitForDisconnect();
}

TEST_P(IOWatcherFdTest, StopWatch) {
  // Tests that a stopped watch doesn't continue dispatching events.
  auto [a, b] = CreateSocketPair();
  auto watcher = CreateWatcher();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
                 IOWatcher::FdWatchMode::kRead);
  WriteToSocket(a.get(), "ping");
  EXPECT_EQ("ping", watcher->WaitForNextMessage());
  WriteToSocket(a.get(), "pong");
  EXPECT_EQ("pong", watcher->WaitForNextMessage());
  watcher->Stop();
  watcher->reset_num_events();

  WriteToSocket(a.get(), "abc");
  WriteToSocket(a.get(), "123");
  EXPECT_EQ(0, watcher->num_events());
}

TEST_P(IOWatcherFdTest, Write) {
  // Tests basic one-shot write watching.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto watcher = CreateWatcher();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                 IOWatcher::FdWatchMode::kWrite);
  MakePeerReadableAndWritableFromIOThread(a.get());
  watcher->WaitForWritable();
  WriteToSocket(b.get(), "x");
}

#if BUILDFLAG(IS_ANDROID) || BUILDFLAG(IS_LINUX)
TEST_P(IOWatcherFdTest, WatchSameFdForWriteSignal) {
  // Tests that the same FD can be watched twice for the write signal. We can
  // fall back to OS transports during IPC. Then, if sockets are too small or
  // messages being sent are too large we can set up an FdWatcher for the same
  // FD write signal during OnFdWritable callbacks due to multiple consecutive
  // EAGAINs in channel_posix.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto first_watcher = CreateWatcher();
  first_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                       IOWatcher::FdWatchMode::kWrite);
  // A watcher already exists for `fd` b waiting on a write signal, try to watch
  // it again.
  auto second_watcher = CreateWatcher();
  second_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                        IOWatcher::FdWatchMode::kWrite);
  // A new FdWatcher has been made. Stop the current one to mimic current
  // channel_posix behaviour.
  first_watcher->Stop();
  MakePeerReadableAndWritableFromIOThread(a.get());
  // Ensure watcher still gets notified and socket is writable.
  second_watcher->WaitForWritable();
  EXPECT_EQ(1, second_watcher->num_events());
  EXPECT_EQ(0, first_watcher->num_events());
  WriteToSocket(b.get(), "x");
}
#endif  // BUILDFLAG(IS_ANDROID) || BUILDFLAG(IS_LINUX)

TEST_P(IOWatcherFdTest, ReadWriteUnifiedOneShot) {
  // Tests that a one-shot read-write watch will observe at most one event
  // even if the watched object becomes both readable and writable.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto watcher = CreateWatcher();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                 IOWatcher::FdWatchMode::kReadWrite);
  MakePeerReadableAndWritableFromIOThread(a.get());
  watcher->WaitForReadableOrWritable();
  EXPECT_EQ(1, watcher->num_events());
}

TEST_P(IOWatcherFdTest, ReadWriteSeparateOneShot) {
  // Tests that separate one-shot read and write watches can observe the same
  // descriptor concurrently.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto read_watcher = CreateWatcher();
  auto write_watcher = CreateWatcher();
  read_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                      IOWatcher::FdWatchMode::kRead);
  write_watcher->Watch(b, IOWatcher::FdWatchDuration::kOneShot,
                       IOWatcher::FdWatchMode::kWrite);
  MakePeerReadableAndWritableFromIOThread(a.get());
  EXPECT_EQ("x", read_watcher->WaitForNextMessage());
  write_watcher->WaitForWritable();
}

TEST_P(IOWatcherFdTest, CancelDuringRead) {
  // Tests that the watcher behaves safely when watching both read and write
  // with a persistent watch which is cancelled while handling a read.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto watcher = CreateWatcher();
  watcher->set_cancel_on_read();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
                 IOWatcher::FdWatchMode::kReadWrite);
  MakePeerReadableAndWritableFromIOThread(a.get());
  EXPECT_EQ("x", watcher->WaitForNextMessage());
  EXPECT_LE(watcher->num_events(), 2);
}

TEST_P(IOWatcherFdTest, CancelDuringWrite) {
  // Tests that the watcher behaves safely when watching both read and write
  // with a persistent watch which is cancelled while handling a write.
  auto [a, b] = CreateSocketPair();
  FillSocket(b.get());
  auto watcher = CreateWatcher();
  watcher->set_cancel_on_write();
  watcher->Watch(b, IOWatcher::FdWatchDuration::kPersistent,
                 IOWatcher::FdWatchMode::kReadWrite);
  MakePeerReadableAndWritableFromIOThread(a.get());
  EXPECT_LE(watcher->num_events(), 2);
}

INSTANTIATE_TEST_SUITE_P(,
                         IOWatcherFdTest,
                         testing::Values(
#if BUILDFLAG(IS_ANDROID)
                             FdIOCapableMessagePumpType::kAndroid,
#endif
#if defined(USE_GLIB)
                             FdIOCapableMessagePumpType::kDefaultUI,
#endif
                             FdIOCapableMessagePumpType::kDefaultIO));

}  // namespace
}  // namespace base