#include <sys/socket.h>
#include "base/compiler_specific.h"
#include "base/files/scoped_file.h"
#include "base/functional/bind.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_pump_for_io.h"
#include "base/posix/eintr_wrapper.h"
#include "base/run_loop.h"
#include "base/task/current_thread.h"
#include "base/test/gtest_util.h"
#include "base/test/scoped_feature_list.h"
#include "base/test/task_environment.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#if BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS) || BUILDFLAG(IS_ANDROID)
#include "base/message_loop/message_pump_epoll.h"
#endif
namespace base {
namespace {
class FdWatchControllerPosixTest : public testing::Test,
public testing::WithParamInterface<bool> {
public:
FdWatchControllerPosixTest() = default;
FdWatchControllerPosixTest(const FdWatchControllerPosixTest&) = delete;
FdWatchControllerPosixTest& operator=(const FdWatchControllerPosixTest&) =
delete;
void SetUp() override {
#if BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS) || BUILDFLAG(IS_ANDROID)
features_.InitWithFeatureStates(
{{kUsePollForMessagePumpEpoll, GetParam()}});
MessagePumpEpoll::InitializeFeatures();
#endif
int pipefds[2];
int err = pipe(pipefds);
ASSERT_EQ(0, err);
read_fd_ = ScopedFD(pipefds[0]);
write_fd_ = ScopedFD(pipefds[1]);
}
void TriggerReadEvent() {
char c = '\0';
EXPECT_EQ(1, HANDLE_EINTR(write(write_fd_.get(), &c, 1)));
}
protected:
ScopedFD read_fd_;
ScopedFD write_fd_;
base::test::ScopedFeatureList features_;
};
class TestHandler : public MessagePumpForIO::FdWatcher {
public:
void OnFileCanReadWithoutBlocking(int fd) override {
watcher_to_delete_ = nullptr;
is_readable_ = true;
loop_->QuitWhenIdle();
}
void OnFileCanWriteWithoutBlocking(int fd) override {
watcher_to_delete_ = nullptr;
is_writable_ = true;
loop_->QuitWhenIdle();
}
void set_run_loop(base::RunLoop* loop) { loop_ = loop; }
bool is_readable_ = false;
bool is_writable_ = false;
raw_ptr<base::RunLoop> loop_;
std::unique_ptr<MessagePumpForIO::FdWatchController> watcher_to_delete_;
};
class CallClosureHandler : public MessagePumpForIO::FdWatcher {
public:
CallClosureHandler(OnceClosure read_closure, OnceClosure write_closure)
: read_closure_(std::move(read_closure)),
write_closure_(std::move(write_closure)) {}
~CallClosureHandler() override {
EXPECT_TRUE(read_closure_.is_null());
EXPECT_TRUE(write_closure_.is_null());
}
void SetReadClosure(OnceClosure read_closure) {
EXPECT_TRUE(read_closure_.is_null());
read_closure_ = std::move(read_closure);
}
void SetWriteClosure(OnceClosure write_closure) {
EXPECT_TRUE(write_closure_.is_null());
write_closure_ = std::move(write_closure);
}
void OnFileCanReadWithoutBlocking(int fd) override {
char c;
int result = HANDLE_EINTR(read(fd, &c, 1));
if (result == -1) {
PLOG(ERROR) << "read";
FAIL();
}
EXPECT_EQ(result, 1);
ASSERT_FALSE(read_closure_.is_null());
std::move(read_closure_).Run();
}
void OnFileCanWriteWithoutBlocking(int fd) override {
ASSERT_FALSE(write_closure_.is_null());
std::move(write_closure_).Run();
}
private:
OnceClosure read_closure_;
OnceClosure write_closure_;
};
TEST_P(FdWatchControllerPosixTest, FileDescriptorWatcherOutlivesMessageLoop) {
TestHandler handler;
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
{
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
CurrentIOThread::Get()->WatchFileDescriptor(write_fd_.get(), true,
MessagePumpForIO::WATCH_WRITE,
&watcher, &handler);
}
ASSERT_FALSE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
}
TEST_P(FdWatchControllerPosixTest, FileDescriptorWatcherDoubleStop) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
{
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
TestHandler handler;
CurrentIOThread::Get()->WatchFileDescriptor(write_fd_.get(), true,
MessagePumpForIO::WATCH_WRITE,
&watcher, &handler);
ASSERT_TRUE(watcher.StopWatchingFileDescriptor());
ASSERT_TRUE(watcher.StopWatchingFileDescriptor());
}
}
TEST_P(FdWatchControllerPosixTest, FileDescriptorWatcherDeleteInCallback) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
handler.watcher_to_delete_ =
std::make_unique<MessagePumpForIO::FdWatchController>(FROM_HERE);
CurrentIOThread::Get()->WatchFileDescriptor(
write_fd_.get(), true, MessagePumpForIO::WATCH_WRITE,
handler.watcher_to_delete_.get(), &handler);
loop.Run();
}
class ReaderWriterHandler : public MessagePumpForIO::FdWatcher {
public:
enum Action {
kStopWatching,
kDelete,
};
enum ActWhen {
kOnReadEvent,
kOnWriteEvent,
};
ReaderWriterHandler(Action action,
ActWhen when,
OnceClosure idle_quit_closure)
: action_(action),
when_(when),
controller_(FROM_HERE),
idle_quit_closure_(std::move(idle_quit_closure)) {}
ReaderWriterHandler(const ReaderWriterHandler&) = delete;
ReaderWriterHandler& operator=(const ReaderWriterHandler&) = delete;
void OnFileCanReadWithoutBlocking(int fd) override {
if (when_ == kOnReadEvent) {
DoAction();
} else {
char c;
EXPECT_EQ(1, HANDLE_EINTR(read(fd, &c, 1)));
}
}
void OnFileCanWriteWithoutBlocking(int fd) override {
if (when_ == kOnWriteEvent) {
DoAction();
} else {
char c = '\0';
EXPECT_EQ(1, HANDLE_EINTR(write(fd, &c, 1)));
}
}
MessagePumpForIO::FdWatchController* controller() { return &controller_; }
private:
void DoAction() {
OnceClosure idle_quit_closure = std::move(idle_quit_closure_);
if (action_ == kDelete) {
delete this;
} else if (action_ == kStopWatching) {
controller_.StopWatchingFileDescriptor();
}
std::move(idle_quit_closure).Run();
}
Action action_;
ActWhen when_;
MessagePumpForIO::FdWatchController controller_;
OnceClosure idle_quit_closure_;
};
class MessageLoopForIoPosixReadAndWriteTest
: public testing::TestWithParam<ReaderWriterHandler::Action> {
protected:
bool CreateSocketPair(ScopedFD* one, ScopedFD* two) {
int fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
return false;
}
one->reset(fds[0]);
two->reset(fds[1]);
return true;
}
};
INSTANTIATE_TEST_SUITE_P(StopWatchingOrDelete,
MessageLoopForIoPosixReadAndWriteTest,
testing::Values(ReaderWriterHandler::kStopWatching,
ReaderWriterHandler::kDelete));
TEST_P(MessageLoopForIoPosixReadAndWriteTest, AfterRead) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
ScopedFD one, two;
ASSERT_TRUE(CreateSocketPair(&one, &two));
RunLoop run_loop;
ReaderWriterHandler* handler =
new ReaderWriterHandler(GetParam(), ReaderWriterHandler::kOnReadEvent,
run_loop.QuitWhenIdleClosure());
char c = '\0';
EXPECT_EQ(1, HANDLE_EINTR(write(two.get(), &c, 1)));
CurrentIOThread::Get()->WatchFileDescriptor(
one.get(), true, MessagePumpForIO::WATCH_READ_WRITE,
handler->controller(), handler);
run_loop.Run();
if (GetParam() == ReaderWriterHandler::kStopWatching) {
delete handler;
}
}
TEST_P(MessageLoopForIoPosixReadAndWriteTest, AfterWrite) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
ScopedFD one, two;
ASSERT_TRUE(CreateSocketPair(&one, &two));
RunLoop run_loop;
ReaderWriterHandler* handler =
new ReaderWriterHandler(GetParam(), ReaderWriterHandler::kOnWriteEvent,
run_loop.QuitWhenIdleClosure());
char c = '\0';
EXPECT_EQ(1, HANDLE_EINTR(write(two.get(), &c, 1)));
EXPECT_EQ(1, HANDLE_EINTR(write(two.get(), &c, 1)));
CurrentIOThread::Get()->WatchFileDescriptor(
one.get(), true, MessagePumpForIO::WATCH_READ_WRITE,
handler->controller(), handler);
run_loop.Run();
if (GetParam() == ReaderWriterHandler::kStopWatching) {
delete handler;
}
}
INSTANTIATE_TEST_SUITE_P(All, FdWatchControllerPosixTest, ::testing::Bool());
TEST_P(FdWatchControllerPosixTest, WatchReadable) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
loop.RunUntilIdle();
ASSERT_FALSE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
TriggerReadEvent();
loop.Run();
ASSERT_TRUE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
}
TEST_P(FdWatchControllerPosixTest, WatchReadableTwiceSameWatcher) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
loop.RunUntilIdle();
ASSERT_FALSE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
TriggerReadEvent();
loop.Run();
ASSERT_TRUE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
char c;
int result = HANDLE_EINTR(read(read_fd_.get(), &c, 1));
if (result == -1) {
PLOG(ERROR) << "read";
FAIL();
}
TestHandler handler2;
base::RunLoop loop2;
handler2.set_run_loop(&loop2);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler2));
loop2.RunUntilIdle();
ASSERT_FALSE(handler2.is_readable_);
ASSERT_FALSE(handler2.is_writable_);
TriggerReadEvent();
loop2.Run();
ASSERT_TRUE(handler2.is_readable_);
ASSERT_FALSE(handler2.is_writable_);
}
TEST_P(FdWatchControllerPosixTest, WatchReadableTwiceDifferentWatcher) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
{
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
loop.RunUntilIdle();
ASSERT_FALSE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
TriggerReadEvent();
loop.Run();
ASSERT_TRUE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
}
char c;
int result = HANDLE_EINTR(read(read_fd_.get(), &c, 1));
if (result == -1) {
PLOG(ERROR) << "read";
FAIL();
}
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
TestHandler handler2;
base::RunLoop loop2;
handler2.set_run_loop(&loop2);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler2));
loop2.RunUntilIdle();
ASSERT_FALSE(handler2.is_readable_);
ASSERT_FALSE(handler2.is_writable_);
TriggerReadEvent();
loop2.Run();
ASSERT_TRUE(handler2.is_readable_);
ASSERT_FALSE(handler2.is_writable_);
}
TEST_P(FdWatchControllerPosixTest, WatchWritable) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
write_fd_.get(), false, MessagePumpForIO::WATCH_WRITE,
&watcher, &handler));
ASSERT_FALSE(handler.is_readable_);
ASSERT_FALSE(handler.is_writable_);
loop.Run();
ASSERT_FALSE(handler.is_readable_);
ASSERT_TRUE(handler.is_writable_);
}
TEST_P(FdWatchControllerPosixTest, RunUntilIdle) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
TestHandler handler;
base::RunLoop loop;
handler.set_run_loop(&loop);
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
loop.RunUntilIdle();
ASSERT_FALSE(handler.is_readable_);
TriggerReadEvent();
while (!handler.is_readable_) {
loop.RunUntilIdle();
}
}
void StopWatching(MessagePumpForIO::FdWatchController* controller,
RunLoop* run_loop) {
controller->StopWatchingFileDescriptor();
run_loop->Quit();
}
TEST_P(FdWatchControllerPosixTest, StopFromHandler) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
RunLoop run_loop;
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
CallClosureHandler handler(BindOnce(&StopWatching, &watcher, &run_loop),
OnceClosure());
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), true, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
TriggerReadEvent();
run_loop.Run();
TriggerReadEvent();
RunLoop().RunUntilIdle();
}
TEST_P(FdWatchControllerPosixTest, NonPersistentWatcher) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
RunLoop run_loop;
CallClosureHandler handler(run_loop.QuitClosure(), OnceClosure());
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
TriggerReadEvent();
run_loop.Run();
TriggerReadEvent();
RunLoop().RunUntilIdle();
}
TEST_P(FdWatchControllerPosixTest, PersistentWatcher) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
RunLoop run_loop1;
CallClosureHandler handler(run_loop1.QuitClosure(), OnceClosure());
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), true, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
TriggerReadEvent();
run_loop1.Run();
RunLoop run_loop2;
handler.SetReadClosure(run_loop2.QuitClosure());
TriggerReadEvent();
run_loop2.Run();
}
void StopWatchingAndWatchAgain(MessagePumpForIO::FdWatchController* controller,
int fd,
MessagePumpForIO::FdWatcher* new_handler,
RunLoop* run_loop) {
controller->StopWatchingFileDescriptor();
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
fd, true, MessagePumpForIO::WATCH_READ, controller,
new_handler));
run_loop->Quit();
}
TEST_P(FdWatchControllerPosixTest, StopAndRestartFromHandler) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
RunLoop run_loop1;
RunLoop run_loop2;
CallClosureHandler handler2(run_loop2.QuitClosure(), OnceClosure());
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
CallClosureHandler handler1(BindOnce(&StopWatchingAndWatchAgain, &watcher,
read_fd_.get(), &handler2, &run_loop1),
OnceClosure());
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), true, MessagePumpForIO::WATCH_READ,
&watcher, &handler1));
TriggerReadEvent();
run_loop1.Run();
TriggerReadEvent();
run_loop2.Run();
}
TEST_P(FdWatchControllerPosixTest, IoEventThenTimer) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
RunLoop timer_run_loop;
env.GetMainThreadTaskRunner()->PostDelayedTask(
FROM_HERE, timer_run_loop.QuitClosure(), base::Milliseconds(10));
RunLoop watcher_run_loop;
CallClosureHandler handler(watcher_run_loop.QuitClosure(), OnceClosure());
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
TriggerReadEvent();
timer_run_loop.Run();
watcher_run_loop.Run();
}
TEST_P(FdWatchControllerPosixTest, TimerThenIoEvent) {
test::TaskEnvironment env(test::TaskEnvironment::MainThreadType::IO);
env.GetMainThreadTaskRunner()->PostDelayedTask(
FROM_HERE,
BindOnce(&FdWatchControllerPosixTest::TriggerReadEvent, Unretained(this)),
Milliseconds(1));
RunLoop run_loop;
CallClosureHandler handler(run_loop.QuitClosure(), OnceClosure());
MessagePumpForIO::FdWatchController watcher(FROM_HERE);
ASSERT_TRUE(CurrentIOThread::Get()->WatchFileDescriptor(
read_fd_.get(), false, MessagePumpForIO::WATCH_READ,
&watcher, &handler));
run_loop.Run();
}
}
}